Öğretici: Uçtan uca lakehouse analytics işlem hattı çalıştırma

Bu öğreticide, Azure Databricks lakehouse için uçtan uca analiz işlem hattı ayarlama adımları gösterilmektedir.

Önemli

Bu öğreticide, Unity Kataloğu etkinleştirilmiş kümelerde Python'daki yaygın ETL görevlerini tamamlamak için etkileşimli not defterleri kullanılır. Unity Kataloğu kullanmıyorsanız bkz . Azure Databricks'te ilk ETL iş yükünüzü çalıştırma.

Bu öğreticideki görevler

Bu makalenin sonunda kendinizi rahat hissedeceksiniz:

  1. Unity Kataloğu etkinleştirilmiş bir işlem kümesini başlatma.
  2. Databricks not defteri oluşturma.
  3. Unity Kataloğu dış konumundan veri yazma ve okuma.
  4. Otomatik Yükleyici ile Unity Kataloğu tablosuna artımlı veri alımını yapılandırma.
  5. Verileri işlemek, sorgulamak ve önizlemek için not defteri hücrelerini yürütme.
  6. Bir not defterini Databricks işi olarak zamanlama.
  7. Databricks SQL'den Unity Kataloğu tablolarını sorgulama

Azure Databricks, veri uzmanlarının ayıklama, dönüştürme ve yükleme (ETL) işlem hatlarını hızla geliştirmesine ve dağıtmasına olanak sağlayan üretime hazır araçlar sunar. Unity Kataloğu, veri yöneticilerinin kuruluş genelindeki kullanıcılar için depolama kimlik bilgilerini, dış konumları ve veritabanı nesnelerini yapılandırmalarına ve güvenliğini sağlamalarına olanak tanır. Databricks SQL, analistlerin üretim ETL iş yüklerinde kullanılan tablolarda SQL sorguları çalıştırmasına olanak tanıyarak büyük ölçekte gerçek zamanlı iş zekası sağlar.

DELTA Live Tablolarını ETL işlem hatları oluşturmak için de kullanabilirsiniz. Databricks, üretim ETL işlem hatlarını oluşturma, dağıtma ve bakımının karmaşıklığını azaltmak için Delta Live Tables oluşturdu. Bkz . Öğretici: İlk Delta Live Tables işlem hattınızı çalıştırma.

Gereksinimler

Not

Küme denetimi ayrıcalıklarınız yoksa, bir kümeye erişiminiz olduğu sürece aşağıdaki adımların çoğunu yine de tamamlayabilirsiniz.

1. Adım: Küme oluşturma

Keşif veri analizi ve veri mühendisliği yapmak için, komutları yürütmek için gereken işlem kaynaklarını sağlamak üzere bir küme oluşturun.

  1. Kenar çubuğunda İşlem'e tıklayınişlem simgesi.
  2. Kenar çubuğunda Yeni'ye tıklayın Yeni Simgeve küme'yi seçin. Bu, Yeni Küme/İşlem sayfasını açar.
  3. Küme için benzersiz bir ad belirtin.
  4. Tek düğüm radyo düğmesini seçin.
  5. Erişim modu açılan listesinden Tek Kullanıcı'ya tıklayın.
  6. E-posta adresinizin Tek Kullanıcı alanında görünür olduğundan emin olun.
  7. Unity Kataloğu'nu kullanmak için istenen Databricks çalışma zamanı sürümünü (11.1 veya üzeri) seçin.
  8. Kümeyi oluşturmak için İşlem oluştur'a tıklayın.

Databricks kümeleri hakkında daha fazla bilgi edinmek için bkz . İşlem.

2. Adım: Databricks not defteri oluşturma

Azure Databricks'te etkileşimli kod yazmaya ve yürütmeye başlamak için bir not defteri oluşturun.

  1. Yeni SimgeKenar çubuğunda Yeni'ye ve ardından Not Defteri'ne tıklayın.
  2. Not Defteri Oluştur sayfasında:
    • Not defteriniz için benzersiz bir ad belirtin.
    • Varsayılan dilin Python olarak ayarlandığından emin olun.
    • Küme açılan menüsünden 1. adımda oluşturduğunuz kümeyi seçmek için Bağlan açılan menüsünü kullanın.

Not defteri boş bir hücreyle açılır.

Not defterlerini oluşturma ve yönetme hakkında daha fazla bilgi edinmek için bkz . Not defterlerini yönetme.

3. Adım: Unity Kataloğu tarafından yönetilen bir dış konumdan veri yazma ve okuma

Databricks, artımlı veri alımı için Otomatik Yükleyici'nin kullanılmasını önerir. Otomatik Yükleyici, bulut nesne depolama alanına ulaşan yeni dosyaları otomatik olarak algılar ve işler.

Dış konumlara güvenli erişimi yönetmek için Unity Kataloğu'nu kullanın. Dış konumda izinleri olan READ FILES kullanıcılar veya hizmet sorumluları verileri almak için Otomatik Yükleyici'yi kullanabilir.

Normalde, diğer sistemlerden yazma işlemleri nedeniyle veriler bir dış konuma ulaşır. Bu tanıtımda, JSON dosyalarını bir dış konuma yazarak veri gelişi simülasyonu yapabilirsiniz.

Aşağıdaki kodu not defteri hücresine kopyalayın. için catalog dize değerini ve USE CATALOG izinleriyle bir kataloğun adıyla CREATE CATALOG değiştirin. için external_location dize değerini, bir dış konumun yoluyla , WRITE FILESve CREATE EXTERNAL TABLE izinleriyle READ FILESdeğiştirin.

Dış konumlar tüm depolama kapsayıcısı olarak tanımlanabilir, ancak genellikle kapsayıcıda iç içe yerleştirilmiş bir dizine işaret eder.

Dış konum yolu için doğru biçim şeklindedir "abfss://container_name@storage_account.dfs.core.windows.net/path/to/external_location".


 external_location = "<your-external-location>"
 catalog = "<your-catalog>"

 dbutils.fs.put(f"{external_location}/filename.txt", "Hello world!", True)
 display(dbutils.fs.head(f"{external_location}/filename.txt"))
 dbutils.fs.rm(f"{external_location}/filename.txt")

 display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))

Bu hücrenin yürütülmesi 12 baytlık bir satır yazdırmalı, "Merhaba dünya!" dizesini yazdırmalı ve sağlanan katalogda bulunan tüm veritabanlarını görüntülemelidir. Bu hücreyi çalıştıramıyorsanız Unity Kataloğu etkinleştirilmiş bir çalışma alanında olduğunuzu onaylayın ve bu öğreticiyi tamamlamak için çalışma alanı yöneticinizden uygun izinleri isteyin.

Aşağıdaki Python kodu, sağlanan katalogda benzersiz bir veritabanı ve sağlanan dış konumda benzersiz bir depolama konumu oluşturmak için e-posta adresinizi kullanır. Bu hücrenin yürütülmesi, bu öğreticiyle ilişkili tüm verileri kaldırarak bu örneği aynı anda yürütmenizi sağlar. Bağlı bir sistemden kaynak dış konumunuza gelen veri toplu işlemlerinin benzetimini yapmak için kullanacağınız bir sınıf tanımlanır ve örneği oluşturulur.

Bu kodu not defterinizdeki yeni bir hücreye kopyalayın ve ortamınızı yapılandırmak için yürütür.

Not

Bu kodda tanımlanan değişkenler, mevcut çalışma alanı varlıklarıyla veya diğer kullanıcılarla çakışma riski olmadan bunu güvenli bir şekilde yürütmenize olanak sağlamalıdır. Kısıtlı ağ veya depolama izinleri bu kodu yürütürken hatalara neden olur; bu kısıtlamaları gidermek için çalışma alanı yöneticinize başvurun.


from pyspark.sql.functions import col

# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"{catalog}.e2e_lakehouse_{username}_db"
source = f"{external_location}/e2e-lakehouse-source"
table = f"{database}.target_table"
checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"

spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")

spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")

# Clear out data from previous demo execution
dbutils.fs.rm(source, True)
dbutils.fs.rm(checkpoint_path, True)

# Define a class to load batches of data to source
class LoadData:

    def __init__(self, source):
        self.source = source

    def get_date(self):
        try:
            df = spark.read.format("json").load(source)
        except:
            return "2016-01-01"
        batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
        if batch_date.month == 3:
            raise Exception("Source data exhausted")
        return batch_date

    def get_batch(self, batch_date):
        return (
            spark.table("samples.nyctaxi.trips")
            .filter(col("tpep_pickup_datetime").cast("date") == batch_date)
        )

    def write_batch(self, batch):
        batch.write.format("json").mode("append").save(self.source)

    def land_batch(self):
        batch_date = self.get_date()
        batch = self.get_batch(batch_date)
        self.write_batch(batch)

RawData = LoadData(source)

Artık aşağıdaki kodu bir hücreye kopyalayıp yürüterek bir veri toplu işlemini alabilirsiniz. Yeni veri gelişini tetiklemek için bu hücreyi 60 kereye kadar el ile yürütebilirsiniz.

RawData.land_batch()

4. Adım: Unity Kataloğu'na veri almak için Otomatik Yükleyici'yi yapılandırma

Databricks, Delta Lake ile veri depolamayı önerir. Delta Lake, ACID işlemleri sağlayan ve data lakehouse'a olanak tanıyan açık kaynak bir depolama katmanıdır. Delta Lake, Databricks'te oluşturulan tablolar için varsayılan biçimdir.

Otomatik Yükleyici'yi bir Unity Kataloğu tablosuna veri almak üzere yapılandırmak için aşağıdaki kodu kopyalayıp not defterinizdeki boş bir hücreye yapıştırın:

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .option("mergeSchema", "true")
  .toTable(table))

Otomatik Yükleyici hakkında daha fazla bilgi edinmek için bkz . Otomatik Yükleyici nedir?.

Unity Kataloğu ile Yapılandırılmış Akış hakkında daha fazla bilgi edinmek için bkz . Unity Kataloğu'nu Yapılandırılmış Akış ile Kullanma.

5. Adım: Verileri işleme ve verilerle etkileşim kurma

Not defterleri mantığı hücre hücre yürütür. Hücrenizdeki mantığı yürütmek için şu adımları kullanın:

  1. Önceki adımda tamamladığınız hücreyi çalıştırmak için hücreyi seçin ve SHIFT+ENTER tuşlarına basın.

  2. Yeni oluşturduğunuz tabloyu sorgulamak için, aşağıdaki kodu kopyalayıp boş bir hücreye yapıştırın, ardından SHIFT+ENTER tuşlarına basarak hücreyi çalıştırın.

    df = spark.read.table(table_name)
    
  3. DataFrame'inizdeki verilerin önizlemesini görüntülemek için aşağıdaki kodu kopyalayıp boş bir hücreye yapıştırın, ardından SHIFT+ENTER tuşlarına basarak hücreyi çalıştırın.

    display(df)
    

Verileri görselleştirmeye yönelik etkileşimli seçenekler hakkında daha fazla bilgi edinmek için bkz . Databricks not defterlerindeki görselleştirmeler.

6. Adım: İş zamanlama

Databricks not defterlerini bir Databricks işine görev olarak ekleyerek üretim betikleri olarak çalıştırabilirsiniz. Bu adımda, el ile tetikleyebileceğiniz yeni bir iş oluşturacaksınız.

Not defterinizi görev olarak zamanlamak için:

  1. Üst bilgi çubuğunun sağ tarafındaki Zamanla'ya tıklayın.
  2. İş adı için benzersiz bir ad girin.
  3. El ile'ye tıklayın.
  4. Küme açılan listesinde, 1. adımda oluşturduğunuz kümeyi seçin.
  5. Oluştur’a tıklayın.
  6. Görüntülenen pencerede Şimdi çalıştır'a tıklayın.
  7. İş çalıştırması sonuçlarını görmek için Son çalıştırma zaman damgasının yanındaki simgeye tıklayın.Dış Bağlantı

İşler hakkında daha fazla bilgi için bkz . Azure Databricks İşleri nedir?.

7. Adım: Databricks SQL'den tablo sorgulama

Geçerli katalog üzerinde USE CATALOG izni, USE SCHEMA geçerli şema üzerindeki izni ve SELECT tablodaki izinleri olan herkes, tercih ettiği Databricks API'sinden tablonun içeriğini sorgulayabilir.

Databricks SQL'de sorgu yürütmek için çalışan bir SQL ambarı erişiminiz olmalıdır.

Bu öğreticide daha önce oluşturduğunuz tablonun adı target_tablevardır. İlk hücrede sağladığınız kataloğu ve patern e2e_lakehouse_<your-username>ile veritabanını kullanarak sorgulayabilirsiniz. Oluşturduğunuz veri nesnelerini bulmak için Katalog Gezgini'ni kullanabilirsiniz.

Ek Tümleştirmeler

Azure Databricks ile veri mühendisliğine yönelik tümleştirmeler ve araçlar hakkında daha fazla bilgi edinin: