Azure Databricks'te ilk ETL iş yükünüzü çalıştırma

Veri düzenleme için ilk ayıklama, dönüştürme ve yükleme (ETL) işlem hatlarınızı geliştirmek ve dağıtmak için Azure Databricks'in üretime hazır araçlarını kullanmayı öğrenin.

Bu makalenin sonunda kendinizi rahat hissedeceksiniz:

  1. Databricks çok amaçlı işlem kümesini başlatma.
  2. Databricks not defteri oluşturma.
  3. Otomatik Yükleyici ile Delta Lake'e artımlı veri alımını yapılandırma.
  4. Verileri işlemek, sorgulamak ve önizlemek için not defteri hücrelerini yürütme.
  5. Bir not defterini Databricks işi olarak zamanlama.

Bu öğreticide Python veya Scala'daki yaygın ETL görevlerini tamamlamak için etkileşimli not defterleri kullanılır.

DELTA Live Tablolarını ETL işlem hatları oluşturmak için de kullanabilirsiniz. Databricks, üretim ETL işlem hatlarını oluşturma, dağıtma ve bakımının karmaşıklığını azaltmak için Delta Live Tables oluşturdu. Bkz . Öğretici: İlk Delta Live Tables işlem hattınızı çalıştırma.

Bu makalenin kaynaklarını oluşturmak için Databricks Terraform sağlayıcısını da kullanabilirsiniz. Bkz. Terraform ile kümeler, not defterleri ve işler oluşturma.

Gereksinimler

Not

Küme denetimi ayrıcalıklarınız yoksa, bir kümeye erişiminiz olduğu sürece aşağıdaki adımların çoğunu yine de tamamlayabilirsiniz.

1. Adım: Küme oluşturma

Keşif veri analizi ve veri mühendisliği yapmak için, komutları yürütmek için gereken işlem kaynaklarını sağlamak üzere bir küme oluşturun.

  1. Kenar çubuğunda İşlem'e tıklayınişlem simgesi.
  2. İşlem sayfasında Küme Oluştur'a tıklayın. Bu, Yeni Küme sayfasını açar.
  3. Küme için benzersiz bir ad belirtin, kalan değerleri varsayılan durumunda bırakın ve Küme Oluştur'a tıklayın.

Databricks kümeleri hakkında daha fazla bilgi edinmek için bkz . İşlem.

2. Adım: Databricks not defteri oluşturma

Azure Databricks'te etkileşimli kod yazmaya ve yürütmeye başlamak için bir not defteri oluşturun.

  1. Yeni SimgeKenar çubuğunda Yeni'ye ve ardından Not Defteri'ne tıklayın.
  2. Not Defteri Oluştur sayfasında:
    • Not defteriniz için benzersiz bir ad belirtin.
    • Varsayılan dilin Python veya Scala olarak ayarlandığından emin olun.
    • Küme açılan listesinden 1. adımda oluşturduğunuz kümeyi seçin.
    • Oluştur’a tıklayın.

Üst kısmında boş bir hücre bulunan bir not defteri açılır.

Not defterlerini oluşturma ve yönetme hakkında daha fazla bilgi edinmek için bkz . Not defterlerini yönetme.

3. Adım: Verileri Delta Lake'e almak için Otomatik Yükleyici'yi yapılandırma

Databricks, artımlı veri alımı için Otomatik Yükleyici'nin kullanılmasını önerir. Otomatik Yükleyici, bulut nesne depolama alanına ulaşan yeni dosyaları otomatik olarak algılar ve işler.

Databricks, Delta Lake ile veri depolamayı önerir. Delta Lake, ACID işlemleri sağlayan ve data lakehouse'a olanak tanıyan açık kaynak bir depolama katmanıdır. Delta Lake, Databricks'te oluşturulan tablolar için varsayılan biçimdir.

Verileri Delta Lake tablosuna almak üzere Otomatik Yükleyici'yi yapılandırmak için aşağıdaki kodu kopyalayıp not defterinizdeki boş hücreye yapıştırın:

Python

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"

# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .toTable(table_name))

Scala

// Imports
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._

// Define variables used in code below
val file_path = "/databricks-datasets/structured-streaming/events"
val username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first.get(0)
val table_name = s"${username}_etl_quickstart"
val checkpoint_path = s"/tmp/${username}/_checkpoint"

// Clear out data from previous demo execution
spark.sql(s"DROP TABLE IF EXISTS ${table_name}")
dbutils.fs.rm(checkpoint_path, true)

// Configure Auto Loader to ingest JSON data to a Delta table
spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select($"*", $"_metadata.file_path".as("source_file"), current_timestamp.as("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(Trigger.AvailableNow)
  .toTable(table_name)

Not

Bu kodda tanımlanan değişkenler, mevcut çalışma alanı varlıklarıyla veya diğer kullanıcılarla çakışma riski olmadan bunu güvenli bir şekilde yürütmenize olanak sağlamalıdır. Kısıtlı ağ veya depolama izinleri bu kodu yürütürken hatalara neden olur; bu kısıtlamaları gidermek için çalışma alanı yöneticinize başvurun.

Otomatik Yükleyici hakkında daha fazla bilgi edinmek için bkz . Otomatik Yükleyici nedir?.

4. Adım: Verileri işleme ve verilerle etkileşim kurma

Not defterleri mantığı hücre hücre yürütür. Hücrenizdeki mantığı yürütmek için:

  1. Önceki adımda tamamladığınız hücreyi çalıştırmak için hücreyi seçin ve SHIFT+ENTER tuşlarına basın.

  2. Yeni oluşturduğunuz tabloyu sorgulamak için, aşağıdaki kodu kopyalayıp boş bir hücreye yapıştırın, ardından SHIFT+ENTER tuşlarına basarak hücreyi çalıştırın.

    Python

    df = spark.read.table(table_name)
    

    Scala

    val df = spark.read.table(table_name)
    
  3. DataFrame'inizdeki verilerin önizlemesini görüntülemek için aşağıdaki kodu kopyalayıp boş bir hücreye yapıştırın, ardından SHIFT+ENTER tuşlarına basarak hücreyi çalıştırın.

    Python

    display(df)
    

    Scala

    display(df)
    

Verileri görselleştirmeye yönelik etkileşimli seçenekler hakkında daha fazla bilgi edinmek için bkz . Databricks not defterlerindeki görselleştirmeler.

5. Adım: İş zamanlama

Databricks not defterlerini bir Databricks işine görev olarak ekleyerek üretim betikleri olarak çalıştırabilirsiniz. Bu adımda, el ile tetikleyebileceğiniz yeni bir iş oluşturacaksınız.

Not defterinizi görev olarak zamanlamak için:

  1. Üst bilgi çubuğunun sağ tarafındaki Zamanla'ya tıklayın.
  2. İş adı için benzersiz bir ad girin.
  3. El ile'ye tıklayın.
  4. Küme açılan listesinde, 1. adımda oluşturduğunuz kümeyi seçin.
  5. Oluştur’a tıklayın.
  6. Görüntülenen pencerede Şimdi çalıştır'a tıklayın.
  7. İş çalıştırması sonuçlarını görmek için Son çalıştırma zaman damgasının yanındaki simgeye tıklayın.Dış Bağlantı

İşler hakkında daha fazla bilgi için bkz . Azure Databricks İşleri nedir?.

Ek Tümleştirmeler

Azure Databricks ile veri mühendisliğine yönelik tümleştirmeler ve araçlar hakkında daha fazla bilgi edinin: