Databricks'te uçtan uca veri işlem hattı oluşturma

Bu makalede ham verileri alma, verileri dönüştürme ve işlenen veriler üzerinde analiz çalıştırma dahil olmak üzere uçtan uca veri işleme işlem hattı oluşturma ve dağıtma adımları gösterilmektedir.

Not

Bu makalede, databricks not defterlerini ve iş akışını düzenlemeye yönelik bir Azure Databricks işini kullanarak eksiksiz bir veri işlem hattı oluşturmayı gösterse de Databricks, güvenilir, sürdürülebilir ve test edilebilir veri işleme işlem hatları oluşturmaya yönelik bildirim temelli bir arabirim olan Delta Live Tables'ın kullanılmasını önerir.

Veri işlem hattı nedir?

Veri işlem hattı, kaynak sistemlerden veri taşımak, bu verileri gereksinimlere göre dönüştürmek ve verileri bir hedef sistemde depolamak için gereken adımları uygular. Veri işlem hattı, ham verileri kullanıcıların kullanabileceği hazırlanmış verilere dönüştürmek için gereken tüm işlemleri içerir. Örneğin, veri işlem hattı verileri hazırlayarak veri analistlerinin ve veri bilimcilerinin analiz ve raporlama yoluyla verilerden değer çıkarabilmesini sağlayabilir.

Ayıklama, dönüştürme ve yükleme (ETL) iş akışı, veri işlem hattının yaygın bir örneğidir. ETL işlemede veriler kaynak sistemlerden alınır ve hazırlama alanına yazılır, gereksinimlere göre dönüştürülür (veri kalitesini sağlama, kayıtları yinelenenleri kaldırma vb.) ve ardından veri ambarı veya veri gölü gibi bir hedef sisteme yazılır.

Veri işlem hattı adımları

Azure Databricks'te veri işlem hatları oluşturmaya başlamanıza yardımcı olmak için, bu makaledeki örnekte veri işleme iş akışı oluşturma adımları açıklanmaktadır:

  • Ham veri kümesini keşfetmek için Azure Databricks özelliklerini kullanın.
  • Ham kaynak verilerini almak ve ham verileri hedef tabloya yazmak için bir Databricks not defteri oluşturun.
  • Ham kaynak verilerini dönüştürmek ve dönüştürülen verileri bir hedef tabloya yazmak için bir Databricks not defteri oluşturun.
  • Dönüştürülen verileri sorgulamak için bir Databricks not defteri oluşturun.
  • Azure Databricks işiyle veri işlem hattını otomatikleştirme.

Gereksinimler

  • Azure Databricks'te ve Veri Bilimi ve Mühendislik çalışma alanında oturum açtınız.
  • Küme oluşturma veya kümeye erişim iznine sahipsiniz.
  • (İsteğe bağlı) Tabloları Unity Kataloğu'nda yayımlamak için Unity Kataloğu'nda bir katalog ve şema oluşturmanız gerekir.

Örnek: Milyon Şarkı veri kümesi

Bu örnekte kullanılan veri kümesi, çağdaş müzik parçaları için bir özellik ve meta veri koleksiyonu olan Milyon Şarkı Veri Kümesinin bir alt kümesidir. Bu veri kümesi, Azure Databricks çalışma alanınıza dahil edilen örnek veri kümelerinde kullanılabilir.

1. Adım: Küme oluşturma

Bu örnekte veri işleme ve çözümleme gerçekleştirmek için, komutları çalıştırmak için gereken işlem kaynaklarını sağlamak üzere bir küme oluşturun.

Not

Bu örnekte DBFS'de depolanan örnek bir veri kümesi kullanıldığından ve tabloların Unity Kataloğu'nda kalıcı olmasını önerdiği için, tek kullanıcı erişim moduyla yapılandırılmış bir küme oluşturursunuz. Tek kullanıcı erişim modu DBFS'ye tam erişim sağlarken Unity Kataloğu'na erişimi de etkinleştirir. Bkz. DBFS ve Unity Kataloğu için en iyi yöntemler.

  1. Kenar çubuğunda İşlem'e tıklayın.
  2. İşlem sayfasında Küme Oluştur'a tıklayın.
  3. Yeni Küme sayfasında, küme için benzersiz bir ad girin.
  4. Erişim modunda Tek Kullanıcı'yı seçin.
  5. Tek kullanıcı veya hizmet sorumlusu erişimi bölümünde kullanıcı adınızı seçin.
  6. Kalan değerleri varsayılan durumunda bırakın ve Küme Oluştur'a tıklayın.

Databricks kümeleri hakkında daha fazla bilgi edinmek için bkz . İşlem.

2. Adım: Kaynak verileri keşfetme

Ham kaynak verileri keşfetmek için Azure Databricks arabirimini kullanmayı öğrenmek için bkz . Veri işlem hattı için kaynak verileri keşfetme. Verileri doğrudan almak ve hazırlamak istiyorsanız 3. Adım: Ham verileri alma bölümüne geçin.

3. Adım: Ham verileri alma

Bu adımda, ham verileri daha fazla işleme için kullanılabilir hale getirmek için bir tabloya yüklersiniz. Databricks platformunda tablolar gibi veri varlıklarını yönetmek için Databricks Unity Kataloğu'nu önerir. Ancak, Unity Kataloğu'nda tabloları yayımlamak için gerekli kataloğu ve şemayı oluşturma izinleriniz yoksa, Hive meta veri deposuna tablo yayımlayarak aşağıdaki adımları yine de tamamlayabilirsiniz.

Databricks, verileri almak için Otomatik Yükleyici'nin kullanılmasını önerir. Otomatik Yükleyici, bulut nesne depolama alanına ulaşan yeni dosyaları otomatik olarak algılar ve işler.

Otomatik Yükleyici'yi, yüklenen verilerin şemasını otomatik olarak algılayarak veri şemasını açıkça bildirmeden tabloları başlatmanıza ve yeni sütunlar kullanıma sunulduğunda tablo şemasını geliştirmenize olanak tanıyacak şekilde yapılandırabilirsiniz. Bu, şema değişikliklerini zaman içinde el ile izleme ve uygulama gereksinimini ortadan kaldırır. Databricks, Otomatik Yükleyici kullanılırken şema çıkarımı önerir. Ancak, veri araştırma adımında görüldüğü gibi şarkı verileri üst bilgi içermez. Üst bilgi verilerle birlikte depolanmadığından, sonraki örnekte gösterildiği gibi şemayı açıkça tanımlamanız gerekir.

  1. Kenar çubuğunda Yeni'ye tıklayın New Iconve menüden Not Defteri'ni seçin. Not Defteri Oluştur iletişim kutusu görüntülenir.

  2. Not defteri için bir ad girin; örneğin, Ingest songs data. Varsayılan olarak:

    • Seçilen dil Python'dır .
    • Not defteri, kullandığınız son kümeye eklenir. Bu durumda, 1. Adım: Küme oluşturma bölümünde oluşturduğunuz küme.
  3. Not defterinin ilk hücresine aşağıdakileri girin:

    from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
    
    # Define variables used in the code below
    file_path = "/databricks-datasets/songs/data-001/"
    table_name = "<table-name>"
    checkpoint_path = "/tmp/pipeline_get_started/_checkpoint/song_data"
    
    schema = StructType(
      [
        StructField("artist_id", StringType(), True),
        StructField("artist_lat", DoubleType(), True),
        StructField("artist_long", DoubleType(), True),
        StructField("artist_location", StringType(), True),
        StructField("artist_name", StringType(), True),
        StructField("duration", DoubleType(), True),
        StructField("end_of_fade_in", DoubleType(), True),
        StructField("key", IntegerType(), True),
        StructField("key_confidence", DoubleType(), True),
        StructField("loudness", DoubleType(), True),
        StructField("release", StringType(), True),
        StructField("song_hotnes", DoubleType(), True),
        StructField("song_id", StringType(), True),
        StructField("start_of_fade_out", DoubleType(), True),
        StructField("tempo", DoubleType(), True),
        StructField("time_signature", DoubleType(), True),
        StructField("time_signature_confidence", DoubleType(), True),
        StructField("title", StringType(), True),
        StructField("year", IntegerType(), True),
        StructField("partial_sequence", IntegerType(), True)
      ]
    )
    
    (spark.readStream
      .format("cloudFiles")
      .schema(schema)
      .option("cloudFiles.format", "csv")
      .option("sep","\t")
      .load(file_path)
      .writeStream
      .option("checkpointLocation", checkpoint_path)
      .trigger(availableNow=True)
      .toTable(table_name)
    )
    

    Unity Kataloğu kullanıyorsanız, alınan kayıtları (örneğindata_pipelines.songs_data.raw_song_data, ) içerecek şekilde öğesini bir katalog, şema ve tablo adıyla değiştirin<table-name>. Aksi takdirde değerini, alınan kayıtları içeren bir tablonun adıyla (örneğin, raw_song_data) değiştirin<table-name>.

    denetim noktası dosyalarını korumak için dbfs'deki bir dizinin yoluyla değiştirin <checkpoint-path> ; örneğin, /tmp/pipeline_get_started/_checkpoint/song_data.

  4. öğesine tıklayın Run Menuve Hücreyi Çalıştır'ı seçin. Bu örnek, içindeki bilgileri READMEkullanarak veri şemasını tanımlar, içinde bulunan file_pathtüm dosyalardan şarkı verilerini alır ve verileri tarafından table_namebelirtilen tabloya yazar.

4. Adım: Ham verileri hazırlama

Ham verileri analize hazırlamak için, aşağıdaki adımlar gereksiz sütunları filtreleyerek ve yeni kaydın oluşturulması için zaman damgası içeren yeni bir alan ekleyerek ham şarkı verilerini dönüştürür.

  1. Kenar çubuğunda Yeni'ye tıklayın New Iconve menüden Not Defteri'ni seçin. Not Defteri Oluştur iletişim kutusu görüntülenir.

  2. Not defteri için bir ad girin. Örneğin, Prepare songs data. Varsayılan dili SQL olarak değiştirin.

  3. Not defterinin ilk hücresine aşağıdakileri girin:

    CREATE OR REPLACE TABLE
      <table-name> (
        artist_id STRING,
        artist_name STRING,
        duration DOUBLE,
        release STRING,
        tempo DOUBLE,
        time_signature DOUBLE,
        title STRING,
        year DOUBLE,
        processed_time TIMESTAMP
      );
    
    INSERT INTO
      <table-name>
    SELECT
      artist_id,
      artist_name,
      duration,
      release,
      tempo,
      time_signature,
      title,
      year,
      current_timestamp()
    FROM
      <raw-songs-table-name>
    

    Unity Kataloğu kullanıyorsanız, filtrelenmiş ve dönüştürülen kayıtları (örneğindata_pipelines.songs_data.prepared_song_data, ) içerecek şekilde öğesini bir katalog, şema ve tablo adıyla değiştirin<table-name>. Aksi takdirde, değerini filtrelenmiş ve dönüştürülmüş kayıtları (örneğin) prepared_song_dataiçerecek şekilde bir tablonun adıyla değiştirin<table-name>.

    değerini, önceki adımda alınan ham şarkı kayıtlarını içeren tablonun adıyla değiştirin <raw-songs-table-name> .

  4. öğesine tıklayın Run Menuve Hücreyi Çalıştır'ı seçin.

5. Adım: Dönüştürülen verileri sorgulama

Bu adımda, şarkı verilerini analiz etmek için sorgular ekleyerek işlem hattını genişletirsiniz. Bu sorgular, önceki adımda oluşturulan hazırlanmış kayıtları kullanır.

  1. Kenar çubuğunda Yeni'ye tıklayın New Iconve menüden Not Defteri'ni seçin. Not Defteri Oluştur iletişim kutusu görüntülenir.

  2. Not defteri için bir ad girin. Örneğin, Analyze songs data. Varsayılan dili SQL olarak değiştirin.

  3. Not defterinin ilk hücresine aşağıdakileri girin:

    -- Which artists released the most songs each year?
    SELECT
      artist_name,
      count(artist_name)
    AS
      num_songs,
      year
    FROM
      <prepared-songs-table-name>
    WHERE
      year > 0
    GROUP BY
      artist_name,
      year
    ORDER BY
      num_songs DESC,
      year DESC
    

    değerini, hazırlanan verileri içeren tablonun adıyla değiştirin <prepared-songs-table-name> . Örneğin, data_pipelines.songs_data.prepared_song_data.

  4. Hücre eylemleri menüsüne tıklayınDown Caret, Altına Hücre Ekle'yi seçin ve yeni hücreye aşağıdakileri girin:

     -- Find songs for your DJ list
     SELECT
       artist_name,
       title,
       tempo
     FROM
       <prepared-songs-table-name>
     WHERE
       time_signature = 4
       AND
       tempo between 100 and 140;
    

    değerini önceki adımda oluşturulan hazırlanmış tablonun adıyla değiştirin <prepared-songs-table-name> . Örneğin, data_pipelines.songs_data.prepared_song_data.

  5. Sorguları çalıştırmak ve çıkışı görüntülemek için Tümünü çalıştır'a tıklayın.

6. Adım: İşlem hattını çalıştırmak için bir Azure Databricks işi oluşturma

Azure Databricks işini kullanarak veri alımı, işleme ve analiz adımlarını çalıştırmayı otomatikleştirmek için bir iş akışı oluşturabilirsiniz.

  1. Veri Bilimi ve Mühendislik çalışma alanınızda aşağıdakilerden birini yapın:
    • Kenar çubuğunda İş Akışları'na ve öğesine tıklayınCreate Job Button.Jobs Icon
    • Kenar çubuğunda Yeni'ye tıklayın New Iconve İş'i seçin.
  2. Görevler sekmesindeki görev iletişim kutusunda, İşiniz için ad ekle... öğesini iş adınız ile değiştirin. Örneğin, "Şarkı iş akışı".
  3. Görev adı alanına ilk görev için bir ad girin; örneğin, Ingest_songs_data.
  4. Tür'de Not Defteri görev türünü seçin.
  5. Kaynak'ta Çalışma Alanı'yı seçin.
  6. Veri alımı not defterini bulmak için dosya tarayıcısını kullanın, not defteri adına tıklayın ve Onayla'ya tıklayın.
  7. Küme'de Shared_job_cluster veya adımda oluşturduğunuz kümeyi Create a cluster seçin.
  8. Oluştur’a tıklayın.
  9. Yeni oluşturduğunuz görevin altına tıklayın Add Task Button ve Not Defteri'ni seçin.
  10. Görev adı alanına görev için bir ad girin, örneğin, Prepare_songs_data.
  11. Tür'de Not Defteri görev türünü seçin.
  12. Kaynak'ta Çalışma Alanı'yı seçin.
  13. Veri hazırlama not defterini bulmak için dosya tarayıcısını kullanın, not defteri adına tıklayın ve Onayla'ya tıklayın.
  14. Küme'de Shared_job_cluster veya adımda oluşturduğunuz kümeyi Create a cluster seçin.
  15. Oluştur’a tıklayın.
  16. Yeni oluşturduğunuz görevin altına tıklayın Add Task Button ve Not Defteri'ni seçin.
  17. Görev adı alanına görev için bir ad girin, örneğin, Analyze_songs_data.
  18. Tür'de Not Defteri görev türünü seçin.
  19. Kaynak'ta Çalışma Alanı'yı seçin.
  20. Veri analizi not defterini bulmak için dosya tarayıcısını kullanın, not defteri adına tıklayın ve Onayla'ya tıklayın.
  21. Küme'de Shared_job_cluster veya adımda oluşturduğunuz kümeyi Create a cluster seçin.
  22. Oluştur’a tıklayın.
  23. İş akışını çalıştırmak için öğesine tıklayın Run Now Button. Çalıştırmanın ayrıntılarını görüntülemek için, iş çalıştırmalarıgörünümünde çalıştırmanın Başlangıç zamanı sütunundaki bağlantıya tıklayın. Görev çalıştırmasının ayrıntılarını görüntülemek için her göreve tıklayın.
  24. İş akışı tamamlandığında sonuçları görüntülemek için son veri çözümleme görevine tıklayın. Çıktı sayfası görüntülenir ve sorgu sonuçlarını görüntüler.

7. Adım: Veri işlem hattı işini zamanlama

Not

Zamanlanmış bir iş akışını düzenlemeye yönelik bir Azure Databricks işi kullanmayı göstermek için, bu başlangıç örneği alım, hazırlık ve analiz adımlarını ayrı not defterlerine ayırır ve her not defteri daha sonra işte bir görev oluşturmak için kullanılır. tüm işlemler tek bir not defterinde yer alırsa, doğrudan Azure Databricks not defteri kullanıcı arabiriminden not defterini kolayca zamanlayabilirsiniz. Bkz. Zamanlanmış not defteri işlerini oluşturma ve yönetme.

Veri işlem hattını zamanlanmış olarak çalıştırmak yaygın bir gereksinimdir. İşlem hattını çalıştıran iş için bir zamanlama tanımlamak için:

  1. Kenar çubuğunda İş Akışları'na tıklayınJobs Icon.
  2. Ad sütununda iş adına tıklayın. Yan panelde İş ayrıntıları görüntülenir.
  3. İş ayrıntıları panelinde Tetikleyici ekle'ye tıklayın ve Tetikleyici türünde Zamanlanmış'ıseçin.
  4. Dönemi, başlangıç saatini ve saat dilimini belirtin. İsteğe bağlı olarak, Quartz Cron Söz Diziminde zamanlamayı görüntülemek ve düzenlemek için Cron Söz Dizimini Göster onay kutusunu seçin.
  5. Kaydet'e tıklayın.

Daha fazla bilgi edinin