Orchestrace úloh Azure Databricks pomocí Apache Airflow

Tento článek popisuje podporu Apache Airflow pro orchestraci datových kanálů pomocí Azure Databricks, obsahuje pokyny k instalaci a konfiguraci Airflow místně a poskytuje příklad nasazení a spuštění pracovního postupu Azure Databricks pomocí Airflow.

Orchestrace úloh v datovém kanálu

Vývoj a nasazení kanálu pro zpracování dat často vyžaduje správu složitých závislostí mezi úlohami. Kanál může například číst data ze zdroje, vyčistit data, transformovat vyčištěná data a zapsat transformovaná data do cíle. Potřebujete také podporu pro testování, plánování a řešení chyb při zprovoznění kanálu.

Systémy pracovních postupů tyto problémy řeší tím, že umožňují definovat závislosti mezi úlohami, naplánovat, kdy se kanály spouštějí a monitorují pracovní postupy. Apache Airflow je opensourcové řešení pro správu a plánování datových kanálů. Airflow představuje datové kanály jako řízené acyklické grafy (DAG) operací. Pracovní postup definujete v souboru Pythonu a Airflow spravuje plánování a provádění. Připojení Airflow Azure Databricks umožňuje využívat optimalizovaný modul Spark nabízený službou Azure Databricks s funkcemi plánování Airflow.

Požadavky

  • Integrace mezi Airflow a Azure Databricks vyžaduje Airflow verze 2.5.0 a novější. Příklady v tomto článku jsou testovány s Airflow verze 2.6.1.
  • Airflow vyžaduje Python 3.8, 3.9, 3.10 nebo 3.11. Příklady v tomto článku jsou testovány v Pythonu 3.8.
  • Pokyny v tomto článku k instalaci a spuštění Airflow vyžadují pipenv k vytvoření virtuálního prostředí Pythonu.

Operátoři airflow pro Databricks

DaG airflow se skládá z úkolů, kde každý úkol spouští operátor airflow. Operátory airflow podporující integraci do Databricks jsou implementované ve zprostředkovateli Databricks.

Poskytovatel Databricks zahrnuje operátory pro spouštění několika úloh v pracovním prostoru Azure Databricks, včetně importu dat do tabulky, spouštění dotazů SQL a práce s úložišti Databricks.

Zprostředkovatel Databricks implementuje dva operátory pro aktivaci úloh:

  • DatabricksRunNowOperator vyžaduje existující úlohu Azure Databricks a k aktivaci spuštění používá požadavek ROZHRANÍ API POST /api/2.1/jobs/run-now. Databricks doporučuje použít, DatabricksRunNowOperator protože snižuje duplicitu definic úloh a spuštění úloh aktivovaná tímto operátorem najdete v uživatelském rozhraní úloh.
  • DatabricksSubmitRunOperator nevyžaduje, aby v Azure Databricks existovala úloha a používá post /api/2.1/jobs/run/submit API požadavek k odeslání specifikace úlohy a aktivaci spuštění.

Pokud chcete vytvořit novou úlohu Azure Databricks nebo resetovat existující úlohu, poskytovatel Databricks implementuje DatabricksCreateJobsOperator. Používá DatabricksCreateJobsOperatorpožadavky POST /api/2.1/jobs/create a POST /api/2.1/jobs/reset API. Pomocí této možnosti DatabricksCreateJobsOperatorDatabricksRunNowOperator můžete vytvořit a spustit úlohu.

Poznámka:

Použití operátorů Databricks k aktivaci úlohy vyžaduje zadání přihlašovacích údajů v konfiguraci připojení Databricks. Viz Vytvoření tokenu pat Azure Databricks pro Airflow.

Operátoři Airflow Databricks zapisují adresu URL stránky spuštění úlohy do protokolů Airflow každých polling_period_seconds (výchozí hodnota je 30 sekund). Další informace najdete na stránce balíčku apache-airflow-providers-databricks na webu Airflow.

Místní instalace integrace Azure Databricks s Airflow

Pokud chcete nainstalovat Airflow a poskytovatele Databricks místně pro účely testování a vývoje, postupujte následovně. Další možnosti instalace Airflow, včetně vytvoření produkční instalace, najdete v dokumentaci k Airflow.

Otevřete terminál a spusťte následující příkazy:

mkdir airflow
cd airflow
pipenv --python 3.8
pipenv shell
export AIRFLOW_HOME=$(pwd)
pipenv install apache-airflow
pipenv install apache-airflow-providers-databricks
mkdir dags
airflow db init
airflow users create --username admin --firstname <firstname> --lastname <lastname> --role Admin --email <email>

Nahraďte <firstname>a <lastname><email> zadejte svoje uživatelské jméno a e-mail. Zobrazí se výzva k zadání hesla pro uživatele správce. Nezapomeňte toto heslo uložit, protože je nutné se přihlásit k uživatelskému rozhraní Airflow.

Tento skript provede následující kroky:

  1. Vytvoří adresář pojmenovaný airflow a změní se do daného adresáře.
  2. Používá pipenv se k vytvoření a vytvoření virtuálního prostředí Pythonu. Databricks doporučuje používat virtuální prostředí Pythonu k izolaci verzí balíčků a závislostí kódu do daného prostředí. Tato izolace pomáhá snížit neočekávané neshody verzí balíčků a kolize závislostí kódu.
  3. Inicializuje proměnnou prostředí s názvem AIRFLOW_HOME nastavenou na cestu k adresáři airflow .
  4. Nainstaluje Airflow a balíčky poskytovatele Databricks Airflow.
  5. airflow/dags Vytvoří adresář. Airflow používá dags adresář k ukládání definic DAG.
  6. Inicializuje databázi SQLite, kterou Airflow používá ke sledování metadat. V produkčním nasazení Airflow byste nakonfigurovali Airflow se standardní databází. Databáze SQLite a výchozí konfigurace pro nasazení Airflow se inicializují v airflow adresáři.
  7. Vytvoří uživatele správce pro Airflow.

Tip

Pokud chcete potvrdit instalaci poskytovatele Databricks, spusťte v instalačním adresáři Airflow následující příkaz:

airflow providers list

Spuštění webového serveru a plánovače Airflow

Webový server Airflow se vyžaduje k zobrazení uživatelského rozhraní Airflow. Pokud chcete spustit webový server, otevřete terminál v instalačním adresáři Airflow a spusťte následující příkazy:

Poznámka:

Pokud se webovému serveru Airflow nepodaří spustit kvůli konfliktu portů, můžete změnit výchozí port v konfiguraci Airflow.

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow webserver

Plánovač je komponenta Airflow, která plánuje DAG. Pokud chcete spustit plánovač, otevřete nový terminál v instalačním adresáři Airflow a spusťte následující příkazy:

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow scheduler

Otestování instalace Airflow

Pokud chcete ověřit instalaci Airflow, můžete spustit jednu z ukázkových DAG, které jsou součástí Airflow:

  1. V okně prohlížeče otevřete http://localhost:8080/home. Přihlaste se k uživatelskému rozhraní Airflow pomocí uživatelského jména a hesla, které jste vytvořili při instalaci Airflow. Zobrazí se stránka DAG airflow.
  2. Kliknutím na přepínač Pozastavit/Zrušit zapůjení daG odblokujte některou z ukázkových dag, například .example_python_operator
  3. Spusťte ukázku DAG kliknutím na tlačítko DaG triggeru.
  4. Kliknutím na název DAG zobrazíte podrobnosti, včetně stavu spuštění dag.

Vytvoření tokenu pat pro Azure Databricks pro Airflow

Airflow se připojuje k Databricks pomocí tokenu PAT (Personal Access Token) Azure Databricks. Vytvoření patu:

  1. V pracovním prostoru Azure Databricks klikněte na své uživatelské jméno Azure Databricks v horním panelu a pak v rozevíracím seznamu vyberte Nastavení uživatele.
  2. Klikněte na Vývojář.
  3. Vedle přístupových tokenů klikněte na Spravovat.
  4. Klikněte na Vygenerovat nový token.
  5. (Volitelné) Zadejte komentář, který vám pomůže identifikovat tento token v budoucnu a změnit výchozí životnost tokenu na 90 dnů. Pokud chcete vytvořit token bez životnosti (nedoporučuje se), nechte pole Životnost (dny) prázdné (prázdné).
  6. Klikněte na Vygenerovat.
  7. Zkopírujte zobrazený token do zabezpečeného umístění a klikněte na tlačítko Hotovo.

Poznámka:

Nezapomeňte zkopírovaný token uložit do zabezpečeného umístění. Nesdílejte svůj zkopírovaný token s ostatními. Pokud ztratíte zkopírovaný token, nemůžete tento úplně stejný token znovu vygenerovat. Místo toho musíte tento postup zopakovat, abyste vytvořili nový token. Pokud ztratíte zkopírovaný token nebo se domníváte, že došlo k ohrožení zabezpečení tokenu, databricks důrazně doporučuje tento token okamžitě odstranit z pracovního prostoru kliknutím na ikonu koše (Odvolat) vedle tokenu na stránce Přístupové tokeny .

Pokud v pracovním prostoru nemůžete vytvářet nebo používat tokeny, může to být proto, že správce pracovního prostoru zakázal tokeny nebo vám neudělil oprávnění k vytváření nebo používání tokenů. Obraťte se na správce pracovního prostoru nebo následující:

Poznámka:

Osvědčeným postupem při ověřování pomocí automatizovaných nástrojů, systémů, skriptů a aplikací doporučuje Databricks místo uživatelů pracovního prostoru používat tokeny patního přístupu, které patří instančním objektům . Pokud chcete vytvořit tokeny pro instanční objekty, přečtěte si téma Správa tokenů instančního objektu.

Můžete se také ověřit v Azure Databricks pomocí tokenu Microsoft Entra ID (dříve Azure Active Directory). Viz Připojení ion Databricks v dokumentaci k Airflow.

Konfigurace připojení Azure Databricks

Instalace Airflow obsahuje výchozí připojení pro Azure Databricks. Pokud chcete aktualizovat připojení pro připojení k pracovnímu prostoru pomocí tokenu pat, který jste vytvořili výše:

  1. V okně prohlížeče otevřete http://localhost:8080/connection/list/. Pokud se zobrazí výzva k přihlášení, zadejte uživatelské jméno a heslo správce.
  2. V části Conn ID vyhledejte databricks_default a klikněte na tlačítko Upravit záznam.
  3. Hodnotu v poli Hostitel nahraďte názvem instance pracovního prostoru vašeho nasazení Azure Databricks, https://adb-123456789.cloud.databricks.comnapříklad .
  4. Do pole Heslo zadejte osobní přístupový token Azure Databricks.
  5. Klikněte na Uložit.

Pokud používáte token MICROSOFT Entra ID, přečtěte si informace o konfiguraci ověřování v dokumentaci k Airflow v části Databricks Připojení ion.

Příklad: Vytvoření DAG Airflow pro spuštění úlohy Azure Databricks

Následující příklad ukazuje, jak vytvořit jednoduché nasazení Airflow, které běží na místním počítači, a nasadí ukázkovou sadu DAG pro aktivaci spuštění v Azure Databricks. V tomto příkladu:

  1. Vytvořte nový poznámkový blok a přidejte kód pro tisk pozdravu na základě nakonfigurovaného parametru.
  2. Vytvořte úlohu Azure Databricks s jednou úlohou, která spustí poznámkový blok.
  3. Nakonfigurujte připojení Airflow k pracovnímu prostoru Azure Databricks.
  4. Vytvořte DAG Airflow, který aktivuje úlohu poznámkového bloku. DaG definujete ve skriptu Pythonu pomocí DatabricksRunNowOperator.
  5. Pomocí uživatelského rozhraní Airflow aktivujte DAG a zobrazte stav spuštění.

Vytvoření poznámkového bloku

V tomto příkladu se používá poznámkový blok obsahující dvě buňky:

  • První buňka obsahuje textový widget nástrojů Databricks definující proměnnou s názvem greeting nastavenou na výchozí hodnotu world.
  • Druhá buňka vytiskne hodnotu greeting proměnné s předponou hello.

Vytvoření poznámkového bloku:

  1. Přejděte do pracovního prostoru Azure Databricks, na bočním panelu klikněte na New IconNový a vyberte Poznámkový blok.

  2. Pojmenujte poznámkový blok, například Hello Airflow, a ujistěte se, že je výchozí jazyk nastavený na Python.

  3. Zkopírujte následující kód Pythonu a vložte ho do první buňky poznámkového bloku.

    dbutils.widgets.text("greeting", "world", "Greeting")
    greeting = dbutils.widgets.get("greeting")
    
  4. Přidejte novou buňku pod první buňku a zkopírujte do nové buňky následující kód Pythonu:

    print("hello {}".format(greeting))
    

Vytvoření úlohy

  1. Na bočním panelu klikněte na Jobs IconPracovní postupy.

  2. Klikněte na Create Job Button.

    Zobrazí se karta Úkoly s dialogovým oknem vytvořit úkol.

    Create first task dialog

  3. Nahraďte název vaší úlohy... názvem vaší úlohy.

  4. Do pole Název úkolu zadejte název úkolu, například pozdrav-úkol.

  5. V rozevírací nabídce Typ vyberte Poznámkový blok.

  6. V rozevírací nabídce Zdroj vyberte Pracovní prostor.

  7. Klikněte na textové pole Cesta a v prohlížeči souborů vyhledejte poznámkový blok, který jste vytvořili, klikněte na název poznámkového bloku a klikněte na Potvrdit.

  8. Klikněte na Přidat v části Parametry. Do pole Klíč zadejte greeting. Do pole Hodnota zadejte Airflow user.

  9. Klikněte na Vytvořit úkol.

Na panelu Podrobnosti úlohy zkopírujte hodnotu ID úlohy. Tato hodnota se vyžaduje k aktivaci úlohy z Airflow.

Spuštění úlohy

Pokud chcete otestovat novou úlohu v uživatelském rozhraní pracovních postupů Azure Databricks, klikněte Run Now Button v pravém horním rohu. Po dokončení spuštění můžete výstup ověřit zobrazením podrobností o spuštění úlohy.

Vytvoření nového DAG Airflow

V souboru Pythonu definujete DAG Airflow. Vytvoření DAG pro aktivaci ukázkové úlohy poznámkového bloku:

  1. V textovém editoru nebo integrovaném vývojovém prostředí vytvořte nový soubor s názvem databricks_dag.py s následujícím obsahem:

    from airflow import DAG
    from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
    from airflow.utils.dates import days_ago
    
    default_args = {
      'owner': 'airflow'
    }
    
    with DAG('databricks_dag',
      start_date = days_ago(2),
      schedule_interval = None,
      default_args = default_args
      ) as dag:
    
      opr_run_now = DatabricksRunNowOperator(
        task_id = 'run_now',
        databricks_conn_id = 'databricks_default',
        job_id = JOB_ID
      )
    

    Nahraďte JOB_ID hodnotou ID úlohy, které jste uložili dříve.

  2. Uložte soubor do airflow/dags adresáře. Airflow automaticky čte a instaluje soubory DAG uložené v airflow/dags/.

Instalace a ověření DAG v Airflow

Aktivace a ověření DAG v uživatelském rozhraní Airflow:

  1. V okně prohlížeče otevřete http://localhost:8080/home. Zobrazí se obrazovka DAG Airflow.
  2. Vyhledejte databricks_dag a klikněte na přepínač Pozastavit nebo zrušit pozastavení DAG a zrušte tak pozastavení DAG .
  3. Spusťte DAG kliknutím na tlačítko DaG triggeru.
  4. Kliknutím na spuštění ve sloupci Spuštění zobrazíte stav a podrobnosti spuštění.