Apache Airflow ile Azure Databricks işlerini düzenleme

Bu makalede Azure Databricks ile veri işlem hatlarını düzenlemeye yönelik Apache Airflow desteği açıklanır, Airflow'u yerel olarak yükleme ve yapılandırma yönergeleri vardır ve Airflow ile Azure Databricks iş akışı dağıtma ve çalıştırma örneği sağlanır.

Veri işlem hattında iş düzenleme

Veri işleme işlem hattı geliştirmek ve dağıtmak için genellikle görevler arasındaki karmaşık bağımlılıkların yönetilmesi gerekir. Örneğin, işlem hattı bir kaynaktan verileri okuyabilir, verileri temizleyip temizlenen verileri dönüştürebilir ve dönüştürülen verileri bir hedefe yazabilir. İşlem hattını kullanıma hazır hale getirme sırasında oluşan test, zamanlama ve sorun giderme hataları için de desteğe ihtiyacınız vardır.

İş akışı sistemleri görevler arasındaki bağımlılıkları tanımlamanıza, işlem hatlarının ne zaman çalıştırılacağını zamanlamanıza ve iş akışlarını izlemenize olanak tanıyarak bu zorlukları ele alır. Apache Airflow, veri işlem hatlarını yönetmek ve zamanlamak için açık kaynak bir çözümdür. Hava akışı, veri işlem hatlarını işlemlerin yönlendirilmiş döngüsel grafikleri (DAG) olarak temsil eder. Python dosyasında bir iş akışı tanımlarsınız ve zamanlama ve yürütmeyi Airflow yönetir. Airflow Azure Databricks bağlantısı, Airflow zamanlama özellikleriyle Azure Databricks tarafından sunulan iyileştirilmiş Spark altyapısından yararlanmanızı sağlar.

Gereksinimler

  • Airflow ile Azure Databricks arasındaki tümleştirme için Airflow sürüm 2.5.0 ve üzeri gerekir. Bu makaledeki örnekler Airflow sürüm 2.6.1 ile test edilmiştir.
  • Hava akışı için Python 3.8, 3.9, 3.10 veya 3.11 gerekir. Bu makaledeki örnekler Python 3.8 ile test edilmiştir.
  • Airflow'u yüklemek ve çalıştırmak için bu makaledeki yönergeler, python sanal ortamı oluşturmak için pipenv gerektirir.

Databricks için hava akışı işleçleri

Airflow DAG, her görevin bir Airflow Operatörü çalıştırdığı görevlerden oluşur. Databricks tümleştirmesini destekleyen hava akışı operatörleri Databricks sağlayıcısında uygulanır.

Databricks sağlayıcısı verileri tabloya aktarma, SQL sorguları çalıştırma ve Databricks Git klasörleriyle çalışma dahil olmak üzere Azure Databricks çalışma alanında bir dizi görevi çalıştırmaya yönelik işleçler içerir.

Databricks sağlayıcısı işleri tetikleme için iki işleç uygular:

Yeni bir Azure Databricks işi oluşturmak veya var olan bir işi sıfırlamak için Databricks sağlayıcısı DatabricksCreateJobsOperator'ı uygular. POST DatabricksCreateJobsOperator /api/2.1/jobs/create ve POST /api/2.1/jobs/reset API isteklerini kullanır. ile kullanarak DatabricksRunNowOperator bir iş oluşturabilir ve çalıştırabilirsinizDatabricksCreateJobsOperator.

Not

Databricks işleçlerini kullanarak bir işi tetikleme, Databricks bağlantı yapılandırmasında kimlik bilgilerinin sağlanmasını gerektirir. Bkz . Airflow için Azure Databricks kişisel erişim belirteci oluşturma.

Databricks Airflow operatörleri, iş çalıştırma sayfası URL'sini Airflow günlüklerine her polling_period_seconds seferinde yazar (varsayılan değer 30 saniyedir). Daha fazla bilgi için Airflow web sitesindeki apache-airflow-providers-databricks paket sayfasına bakın.

Airflow Azure Databricks tümleştirmesini yerel olarak yükleme

Airflow ve Databricks sağlayıcısını test ve geliştirme amacıyla yerel olarak yüklemek için aşağıdaki adımları kullanın. Üretim kurulumu oluşturma da dahil olmak üzere diğer Airflow kurulum seçenekleri için Airflow belgelerine bakın.

Bir terminal açın ve aşağıdaki komutları çalıştırın:

mkdir airflow
cd airflow
pipenv --python 3.8
pipenv shell
export AIRFLOW_HOME=$(pwd)
pipenv install apache-airflow
pipenv install apache-airflow-providers-databricks
mkdir dags
airflow db init
airflow users create --username admin --firstname <firstname> --lastname <lastname> --role Admin --email <email>

, <lastname>ve <email> yerine kullanıcı adınızı ve e-postanızı yazın<firstname>. Yönetici kullanıcı için bir parola girmeniz istenir. Airflow kullanıcı arabiriminde oturum açmak gerektiğinden bu parolayı kaydettiğinizden emin olun.

Bu betik aşağıdaki adımları gerçekleştirir:

  1. adlı airflow bir dizin oluşturur ve bu dizinde değişir.
  2. Python sanal ortamı oluşturmak ve oluşturmak için kullanılır pipenv . Databricks, bu ortama yönelik paket sürümlerini ve kod bağımlılıklarını yalıtmak için python sanal ortamı kullanılmasını önerir. Bu yalıtım beklenmeyen paket sürümü uyuşmazlıklarını ve kod bağımlılığı çakışmalarını azaltmaya yardımcı olur.
  3. dizinin yoluna airflow set adlı AIRFLOW_HOME bir ortam değişkeni başlatır.
  4. Airflow ve Airflow Databricks sağlayıcı paketlerini yükler.
  5. Bir airflow/dags dizin oluşturur. Airflow, DAG tanımlarını depolamak için dizinini dags kullanır.
  6. Airflow'un meta verileri izlemek için kullandığı bir SQLite veritabanı başlatır. Bir üretim Airflow dağıtımında Airflow'ı standart bir veritabanıyla yapılandırabilirsiniz. Airflow dağıtımınız için SQLite veritabanı ve varsayılan yapılandırma dizinde airflow başlatılır.
  7. Airflow için bir yönetici kullanıcı oluşturur.

İpucu

Databricks sağlayıcısının yüklenmesini onaylamak için Airflow yükleme dizininde aşağıdaki komutu çalıştırın:

airflow providers list

Airflow web sunucusunu ve zamanlayıcısını başlatma

Airflow web sunucusu, Airflow kullanıcı arabirimini görüntülemek için gereklidir. Web sunucusunu başlatmak için Airflow yükleme dizininde bir terminal açın ve aşağıdaki komutları çalıştırın:

Not

Airflow web sunucusu bir bağlantı noktası çakışması nedeniyle başlatılamazsa, Airflow yapılandırmasında varsayılan bağlantı noktasını değiştirebilirsiniz.

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow webserver

Zamanlayıcı, DAG'leri zamanlayan Airflow bileşenidir. Zamanlayıcıyı başlatmak için Airflow yükleme dizininde yeni bir terminal açın ve aşağıdaki komutları çalıştırın:

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow scheduler

Airflow kurulumunu test edin

Airflow yüklemesini doğrulamak için Airflow ile birlikte verilen örnek DAG'lerden birini çalıştırabilirsiniz:

  1. Tarayıcı penceresinde dosyasını açın http://localhost:8080/home. Airflow'u yüklerken oluşturduğunuz kullanıcı adı ve parolayla Airflow kullanıcı arabiriminde oturum açın. Airflow DAG'leri sayfası görüntülenir.
  2. Örnek DAG'lerden birini (örneğin, example_python_operator) kapatmak için DAG'yi Duraklat/Çıkar iki durumlu düğmesini tıklatın.
  3. DaG Tetikle düğmesine tıklayarak örnek DAG'yi tetikleme.
  4. DAG'nin çalıştırma durumu da dahil olmak üzere ayrıntıları görüntülemek için DAG adına tıklayın.

Airflow için Azure Databricks kişisel erişim belirteci oluşturma

Airflow, Azure Databricks kişisel erişim belirteci (PAT) kullanarak Databricks'e bağlanır. PAT oluşturmak için:

  1. Azure Databricks çalışma alanınızda üst çubukta Azure Databricks kullanıcı adınıza tıklayın ve açılan listeden Ayarlar seçin.
  2. Geliştirici'ye tıklayın.
  3. Erişim belirteçleri'nin yanındaki Yönet'e tıklayın.
  4. Yeni belirteç oluştur'a tıklayın.
  5. (İsteğe bağlı) Gelecekte bu belirteci tanımlamanıza yardımcı olacak bir açıklama girin ve belirtecin varsayılan 90 günlük ömrünü değiştirin. Yaşam süresi olmayan bir belirteç oluşturmak için (önerilmez), Yaşam Süresi (gün) kutusunu boş (boş) bırakın.
  6. Generate (Oluştur) düğmesine tıklayın.
  7. Görüntülenen belirteci güvenli bir konuma kopyalayın ve bitti'ye tıklayın.

Not

Kopyalanan belirteci güvenli bir konuma kaydettiğinizden emin olun. Kopyalanan belirtecinizi başkalarıyla paylaşmayın. Kopyalanan belirteci kaybederseniz, tam olarak aynı belirteci yeniden oluşturamazsınız. Bunun yerine, yeni bir belirteç oluşturmak için bu yordamı yinelemeniz gerekir. Kopyalanan belirteci kaybederseniz veya belirtecin gizliliğinin ihlal edildiğini düşünüyorsanız Databricks, Erişim belirteçleri sayfasındaki belirtecin yanındaki çöp kutusu (İptal Et) simgesine tıklayarak bu belirteci çalışma alanınızdan hemen silmenizi kesinlikle önerir.

Çalışma alanınızda belirteç oluşturamıyor veya kullanamıyorsanız, bunun nedeni çalışma alanı yöneticinizin belirteçleri devre dışı bırakmış olması veya size belirteç oluşturma veya kullanma izni vermemiş olması olabilir. Çalışma alanı yöneticinize veya aşağıdakilere bakın:

Not

En iyi güvenlik uygulaması olarak otomatik araçlar, sistemler, betikler ve uygulamalarla kimlik doğrulaması yaptığınızda Databricks, çalışma alanı kullanıcıları yerine hizmet sorumlularına ait kişisel erişim belirteçlerini kullanmanızı önerir. Hizmet sorumlularına yönelik belirteçler oluşturmak için bkz . Hizmet sorumlusu için belirteçleri yönetme.

Microsoft Entra Id (eski adıYla Azure Active Directory) belirtecini kullanarak Azure Databricks'de kimlik doğrulaması da yapabilirsiniz. Airflow belgelerinde Databricks Bağlan ion bölümüne bakın.

Azure Databricks bağlantısını yapılandırma

Airflow yüklemeniz Azure Databricks için varsayılan bir bağlantı içeriyor. Yukarıda oluşturduğunuz kişisel erişim belirtecini kullanarak çalışma alanınıza bağlanmak üzere bağlantıyı güncelleştirmek için:

  1. Tarayıcı penceresinde dosyasını açın http://localhost:8080/connection/list/. Oturum açmanız istenirse yönetici kullanıcı adınızı ve parolanızı girin.
  2. Conn Id altında databricks_default bulun ve Kaydı düzenle düğmesine tıklayın.
  3. Konak alanındaki değeri Azure Databricks dağıtımınızın çalışma alanı örneği adıyladeğiştirin; örneğin, https://adb-123456789.cloud.databricks.com.
  4. Parola alanına Azure Databricks kişisel erişim belirtecinizi girin.
  5. Kaydet'e tıklayın.

Microsoft Entra ID belirteci kullanıyorsanız, kimlik doğrulamasını yapılandırma hakkında bilgi için Airflow belgelerindeki Databricks Bağlan ion bölümüne bakın.

Örnek: Azure Databricks işini çalıştırmak için Airflow DAG oluşturma

Aşağıdaki örnekte, yerel makinenizde çalışan ve Azure Databricks'te çalıştırmaları tetikleyen örnek bir DAG dağıtan basit bir Airflow dağıtımının nasıl oluşturulacağı gösterilmektedir. Bu örnekte şunları yapacaksınız:

  1. Yeni bir not defteri oluşturun ve yapılandırılmış parametreyi temel alan bir selamlama yazdırmak için kod ekleyin.
  2. Not defterini çalıştıran tek bir görevle bir Azure Databricks işi oluşturun.
  3. Azure Databricks çalışma alanınıza airflow bağlantısı yapılandırın.
  4. Not defteri işini tetikleyen bir Airflow DAG oluşturun. DAG'yi kullanarak DatabricksRunNowOperatorbir Python betiğinde tanımlarsınız.
  5. DAG'yi tetikleyip çalıştırma durumunu görüntülemek için Airflow kullanıcı arabirimini kullanın.

Not defteri oluşturma

Bu örnekte iki hücre içeren bir not defteri kullanılır:

  • İlk hücre, varsayılan değerine worldayarlanmış adlı greeting bir değişken tanımlayan databricks Yardımcı Programları metin pencere öğesini içerir.
  • İkinci hücre, tarafından önekli hellodeğişkenin greeting değerini yazdırır.

Not defterini oluşturmak için:

  1. Azure Databricks çalışma alanınıza gidin, kenar çubuğunda Yeni'ye tıklayın Yeni Simgeve Not Defteri'ni seçin.

  2. Not defterinize Hello Airflow gibi bir ad verin ve varsayılan dilin Python olarak ayarlandığından emin olun.

  3. Aşağıdaki Python kodunu kopyalayın ve not defterinin ilk hücresine yapıştırın.

    dbutils.widgets.text("greeting", "world", "Greeting")
    greeting = dbutils.widgets.get("greeting")
    
  4. İlk hücrenin altına yeni bir hücre ekleyin ve aşağıdaki Python kodunu kopyalayıp yeni hücreye yapıştırın:

    print("hello {}".format(greeting))
    

İş oluşturma

  1. Kenar çubuğunda İş Akışları'na tıklayınİşler Simgesi.

  2. İş Oluştur Düğmesi’a tıklayın.

    Görevler sekmesi, görev oluştur iletişim kutusuyla birlikte görüntülenir.

    İlk görev oluştur iletişim kutusu

  3. İşiniz için ad ekle... yerine iş adınızı yazın.

  4. Görev adı alanına görev için bir ad girin; örneğin, greeting-task.

  5. Tür açılan menüsünde Not Defteri'ni seçin.

  6. Kaynak açılan menüsünde Çalışma Alanı'nı seçin.

  7. Yol metin kutusuna tıklayın ve dosya tarayıcısını kullanarak oluşturduğunuz not defterini bulun, not defteri adına tıklayın ve Onayla'ya tıklayın.

  8. Parametreler'in altında Ekle'ye tıklayın. Anahtar alanına giringreeting. Değer alanına girinAirflow user.

  9. Görev oluştur'a tıklayın.

İş ayrıntıları panelinde İş Kimliği değerini kopyalayın. Airflow'dan işi tetikleme için bu değer gereklidir.

İşi çalıştırma

Azure Databricks İş Akışları kullanıcı arabiriminde yeni işinizi test etmek için sağ üst köşeye tıklayın Şimdi Çalıştır Düğmesi . Çalıştırma tamamlandığında, iş çalıştırması ayrıntılarını görüntüleyerek çıkışı doğrulayabilirsiniz.

Yeni bir Airflow DAG oluşturma

Bir Python dosyasında Airflow DAG tanımlarsınız. Örnek not defteri işini tetikleyen bir DAG oluşturmak için:

  1. Bir metin düzenleyicisinde veya IDE'de, aşağıdaki içeriklere sahip adlı databricks_dag.py yeni bir dosya oluşturun:

    from airflow import DAG
    from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
    from airflow.utils.dates import days_ago
    
    default_args = {
      'owner': 'airflow'
    }
    
    with DAG('databricks_dag',
      start_date = days_ago(2),
      schedule_interval = None,
      default_args = default_args
      ) as dag:
    
      opr_run_now = DatabricksRunNowOperator(
        task_id = 'run_now',
        databricks_conn_id = 'databricks_default',
        job_id = JOB_ID
      )
    

    değerini, daha önce kaydedilen iş kimliğinin değeriyle değiştirin JOB_ID .

  2. Dosyayı dizine airflow/dags kaydedin. Airflow, içinde airflow/dags/depolanan DAG dosyalarını otomatik olarak okur ve yükler.

Airflow'da DAG'yi yükleme ve doğrulama

Airflow kullanıcı arabiriminde DAG'yi tetikleyip doğrulamak için:

  1. Tarayıcı penceresinde dosyasını açın http://localhost:8080/home. Airflow DAG'leri ekranı görüntülenir.
  2. DAG'yi duraklatmak/duraklatmak için DAG'yi duraklat/çıkar iki durumlu düğmesini bulun databricks_dag ve tıklayın.
  3. DAG'yi Tetikle düğmesine tıklayarak DAG'yi tetikleme.
  4. Çalıştırmanın durumunu ve ayrıntılarını görüntülemek için Çalıştırmalar sütununda bir çalıştırmaya tıklayın.