Share via


Azure Databricks-feladatok vezénylése az Apache Airflow használatával

Ez a cikk azt ismerteti, hogy az Apache Airflow támogatja-e az adatfolyamok Azure Databricks-szel való vezénylését, útmutatást tartalmaz az Airflow helyi telepítéséhez és konfigurálásához, és példát mutat egy Azure Databricks-munkafolyamat üzembe helyezésére és futtatására az Airflow használatával.

Feladat vezénylése egy adatfolyamban

Az adatfeldolgozási folyamatok fejlesztéséhez és üzembe helyezéséhez gyakran összetett függőségek kezelése szükséges a tevékenységek között. Előfordulhat például, hogy egy folyamat adatokat olvas egy forrásból, megtisztítja az adatokat, átalakítja a megtisztított adatokat, és megírja az átalakított adatokat egy célhelyre. A folyamat üzembe helyezésekor a tesztelési, ütemezési és hibaelhárítási hibákhoz is támogatásra van szüksége.

A munkafolyamat-rendszerek úgy oldják meg ezeket a kihívásokat, hogy lehetővé teszik a tevékenységek közötti függőségek meghatározását, a folyamatok futásának ütemezését és a munkafolyamatok monitorozását. Az Apache Airflow egy nyílt forráskód megoldás az adatfolyamok kezelésére és ütemezésére. Az Airflow az adatfolyamokat a műveletek irányított aciklikus gráfjaiként (DAG-k) jelöli. Egy munkafolyamatot definiál egy Python-fájlban, és az Airflow kezeli az ütemezést és a végrehajtást. Az Airflow Azure Databricks-kapcsolat lehetővé teszi az Azure Databricks által kínált optimalizált Spark-motor előnyeit az Airflow ütemezési funkcióival.

Követelmények

  • Az Airflow és az Azure Databricks közötti integrációhoz az Airflow 2.5.0-s és újabb verziója szükséges. A cikkben szereplő példákat az Airflow 2.6.1-es verziójával teszteljük.
  • Az Airflow használatához Python 3.8, 3.9, 3.10 vagy 3.11 szükséges. A cikkben szereplő példákat a Python 3.8 teszteli.
  • Az Airflow telepítéséhez és futtatásához a jelen cikkben szereplő utasítások megkövetelik , hogy a pipenv létrehozhasson egy Python virtuális környezetet.

Airflow operátorok a Databrickshez

Az Airflow DAG feladatokból áll, ahol minden tevékenység egy Airflow-operátort futtat. A Databricksbe való integrációt támogató Airflow-operátorok a Databricks-szolgáltatóban vannak implementálva.

A Databricks-szolgáltató operátorokat tartalmaz, amelyek számos feladatot futtatnak egy Azure Databricks-munkaterületen, beleértve az adatok táblázatba való importálását, AZ SQL-lekérdezések futtatását és a Databricks Git-mappák használatát.

A Databricks-szolgáltató két operátort implementál a feladatok aktiválásához:

Új Azure Databricks-feladat létrehozásához vagy meglévő feladat alaphelyzetbe állításához a Databricks-szolgáltató implementálja a DatabricksCreateJobsOperatort. A DatabricksCreateJobsOperator post /api/2.1/jobs/create és a POST /api/2.1/jobs/reset API-kéréseket használja. A használatával DatabricksRunNowOperator létrehozhat és futtathat DatabricksCreateJobsOperator egy feladatot.

Feljegyzés

A Databricks operátorainak a feladat aktiválásához hitelesítő adatokat kell megadni a Databricks kapcsolatkonfigurációjában. Lásd: Azure Databricks személyes hozzáférési jogkivonat létrehozása az Airflow-hoz.

A Databricks Airflow operátorok minden egyes polling_period_seconds adásban megírják a feladatfuttatási oldal URL-címét az Airflow-naplókba (az alapértelmezett érték 30 másodperc). További információ: apache-airflow-providers-databricks csomag oldal az Airflow webhelyén.

Az Airflow Azure Databricks-integráció helyi telepítése

Az Airflow és a Databricks-szolgáltató helyi telepítéséhez a teszteléshez és fejlesztéshez kövesse az alábbi lépéseket. Az Airflow egyéb telepítési lehetőségeiről, beleértve az éles telepítés létrehozását is, tekintse meg a telepítést az Airflow dokumentációjában.

Nyisson meg egy terminált, és futtassa a következő parancsokat:

mkdir airflow
cd airflow
pipenv --python 3.8
pipenv shell
export AIRFLOW_HOME=$(pwd)
pipenv install apache-airflow
pipenv install apache-airflow-providers-databricks
mkdir dags
airflow db init
airflow users create --username admin --firstname <firstname> --lastname <lastname> --role Admin --email <email>

Cserélje le <firstname>a felhasználónevet <lastname>és <email> az e-mail-címet. A rendszer kérni fogja, hogy adjon meg jelszót a rendszergazda felhasználónak. Mentse ezt a jelszót, mert be kell jelentkeznie az Airflow felhasználói felületére.

Ez a szkript a következő lépéseket hajtja végre:

  1. Létrehoz egy elnevezett airflow könyvtárat, és módosítja az adott könyvtárat.
  2. Python-alapú virtuális környezetek létrehozására és létrehozására használható pipenv . A Databricks egy Python virtuális környezet használatát javasolja a csomagverziók és a környezet kódfüggőségeinek elkülönítéséhez. Ez az elkülönítés segít csökkenteni a csomagverziók váratlan eltéréseit és a kódfüggőség ütközéseit.
  3. Inicializál egy olyan környezeti változót, amely AIRFLOW_HOME a airflow könyvtár elérési útjára van állítva.
  4. Telepíti az Airflow-t és az Airflow Databricks-szolgáltató csomagokat.
  5. Létrehoz egy könyvtárat airflow/dags . Az Airflow a címtár használatával tárolja a dags DAG-definíciókat.
  6. Inicializál egy SQLite-adatbázist, amelyet az Airflow a metaadatok nyomon követésére használ. Éles Airflow-üzemelő példányban az Airflow-t szabványos adatbázissal konfigurálná. Az SQLite-adatbázis és az Airflow-telepítés alapértelmezett konfigurációja inicializálva van a airflow címtárban.
  7. Létrehoz egy rendszergazdai felhasználót az Airflow-hoz.

Tipp.

A Databricks-szolgáltató telepítésének megerősítéséhez futtassa a következő parancsot az Airflow telepítési könyvtárában:

airflow providers list

Az Airflow webkiszolgáló és ütemező indítása

Az Airflow-webkiszolgáló szükséges az Airflow felhasználói felületének megtekintéséhez. A webkiszolgáló elindításához nyisson meg egy terminált az Airflow telepítési könyvtárában, és futtassa a következő parancsokat:

Feljegyzés

Ha az Airflow webkiszolgáló egy portütközés miatt nem indul el, módosíthatja az alapértelmezett portot az Airflow konfigurációjában.

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow webserver

Az ütemező a DAG-ket ütemező Airflow-összetevő. Az ütemező elindításához nyisson meg egy új terminált az Airflow telepítési könyvtárában, és futtassa a következő parancsokat:

pipenv shell
export AIRFLOW_HOME=$(pwd)
airflow scheduler

Az Airflow telepítésének tesztelése

Az Airflow telepítésének ellenőrzéséhez futtassa az Airflow-hoz tartozó példa DAG-k egyikét:

  1. Nyissa meg http://localhost:8080/homea böngészőablakban. Jelentkezzen be az Airflow felhasználói felületére az Airflow telepítésekor létrehozott felhasználónévvel és jelszóval. Megjelenik az Airflow DAG-lap .
  2. Kattintson a Szüneteltetés/A DAG feloldása váltógombra az egyik példa DAG-k( például a example_python_operator.
  3. A DAG példa aktiválásához kattintson a TRIGGER DAG gombra.
  4. A DAG nevére kattintva megtekintheti a részleteket, beleértve a DAG futtatási állapotát is.

Azure Databricks személyes hozzáférési jogkivonat létrehozása az Airflow-hoz

Az Airflow egy Azure Databricks személyes hozzáférési jogkivonat (PAT) használatával csatlakozik a Databrickshez. PAT létrehozása:

  1. Az Azure Databricks-munkaterületen kattintson az Azure Databricks-felhasználónevére a felső sávon, majd válassza Gépház a legördülő menüből.
  2. Kattintson a Fejlesztőeszközök elemre.
  3. Az Access-jogkivonatok mellett kattintson a Kezelés gombra.
  4. Kattintson az Új jogkivonat létrehozása elemre.
  5. (Nem kötelező) Írjon be egy megjegyzést, amely segít azonosítani a jogkivonatot a jövőben, és módosíthatja a jogkivonat alapértelmezett 90 napos élettartamát. Élettartam nélküli (nem ajánlott) jogkivonat létrehozásához hagyja üresen az Élettartam (nap) mezőt (üres).
  6. Kattintson a Létrehozás lehetőségre.
  7. Másolja a megjelenített jogkivonatot egy biztonságos helyre, majd kattintson a Kész gombra.

Feljegyzés

Ügyeljen arra, hogy a másolt jogkivonatot biztonságos helyre mentse. Ne ossza meg másokkal a másolt jogkivonatot. Ha elveszíti a másolt jogkivonatot, nem tudja pontosan ugyanazt a jogkivonatot újragenerálni. Ehelyett meg kell ismételnie ezt az eljárást egy új jogkivonat létrehozásához. Ha elveszíti a másolt jogkivonatot, vagy úgy véli, hogy a jogkivonat sérült, a Databricks határozottan javasolja, hogy azonnal törölje a jogkivonatot a munkaterületről az Access-jogkivonatok lapon a jogkivonat melletti kuka (Visszavonás) ikonra kattintva.

Ha nem tud jogkivonatokat létrehozni vagy használni a munkaterületen, ennek az lehet az oka, hogy a munkaterület rendszergazdája letiltotta a jogkivonatokat, vagy nem adott engedélyt a jogkivonatok létrehozására vagy használatára. Tekintse meg a munkaterület rendszergazdáját vagy a következőket:

Feljegyzés

Ajánlott biztonsági eljárásként, ha automatizált eszközökkel, rendszerekkel, szkriptekkel és alkalmazásokkal hitelesít, a Databricks azt javasolja, hogy munkaterület-felhasználók helyett a szolgáltatásnevekhez tartozó személyes hozzáférési jogkivonatokat használja. A szolgáltatásnevek jogkivonatainak létrehozásáról a szolgáltatásnév jogkivonatainak kezelése című témakörben olvashat.

Az Azure Databricksben a Microsoft Entra ID (korábbi nevén Azure Active Directory) jogkivonat használatával is hitelesíthet. Lásd a Databricks Csatlakozás iont az Airflow dokumentációjában.

Azure Databricks-kapcsolat konfigurálása

Az Airflow telepítése alapértelmezett kapcsolatot tartalmaz az Azure Databrickshez. A kapcsolat frissítése a munkaterülethez való csatlakozáshoz a fent létrehozott személyes hozzáférési jogkivonat használatával:

  1. Nyissa meg http://localhost:8080/connection/list/a böngészőablakban. Ha a rendszer kéri a bejelentkezést, adja meg a rendszergazdai felhasználónevét és jelszavát.
  2. A Conn-azonosító alatt keresse meg a databricks_default, majd kattintson a Rekord szerkesztése gombra.
  3. Cserélje le a Gazdagép mező értékét az Azure Databricks-üzembe helyezés munkaterület-példányának nevére , https://adb-123456789.cloud.databricks.compéldául.
  4. A Jelszó mezőbe írja be az Azure Databricks személyes hozzáférési jogkivonatát.
  5. Kattintson a Mentés gombra.

Ha Microsoft Entra ID-jogkivonatot használ, a hitelesítés konfigurálásával kapcsolatos információkért tekintse meg a Databricks Csatlakozás iont az Airflow dokumentációjában.

Példa: Airflow DAG létrehozása Azure Databricks-feladat futtatásához

Az alábbi példa bemutatja, hogyan hozhat létre egy egyszerű Airflow-üzembe helyezést, amely a helyi gépen fut, és üzembe helyez egy példa DAG-t az Azure Databricksben való futtatások aktiválásához. Ebben a példában a következőt fogja:

  1. Hozzon létre egy új jegyzetfüzetet, és adjon hozzá kódot a megszólítás nyomtatásához egy konfigurált paraméter alapján.
  2. Hozzon létre egy Azure Databricks-feladatot egyetlen, a jegyzetfüzetet futtató feladattal.
  3. Airflow-kapcsolat konfigurálása az Azure Databricks-munkaterülethez.
  4. Hozzon létre egy Airflow DAG-t a jegyzetfüzet-feladat aktiválásához. A DAG definiálása Python-szkriptben a következő használatával DatabricksRunNowOperatortörténik: .
  5. Az Airflow felhasználói felületén aktiválhatja a DAG-t, és megtekintheti a futtatás állapotát.

Jegyzetfüzet létrehozása

Ez a példa egy két cellát tartalmazó jegyzetfüzetet használ:

  • Az első cella tartalmaz egy Databricks Utilities szöveg widgetet, amely egy alapértelmezett értékre worldbeállított változót greeting határoz meg.
  • A második cella a változó előtagjának greeting értékét nyomtatja helloki.

A jegyzetfüzet létrehozása:

  1. Nyissa meg az Azure Databricks-munkaterületet, kattintson az Új gombra Új ikonaz oldalsávon, és válassza a Jegyzetfüzet lehetőséget.

  2. Adjon nevet a jegyzetfüzetnek, például a Hello Airflow-nak, és győződjön meg arról, hogy az alapértelmezett nyelv Pythonra van állítva.

  3. Másolja ki a következő Python-kódot, és illessze be a jegyzetfüzet első cellájába.

    dbutils.widgets.text("greeting", "world", "Greeting")
    greeting = dbutils.widgets.get("greeting")
    
  4. Vegyen fel egy új cellát az első cella alá, és másolja és illessze be a következő Python-kódot az új cellába:

    print("hello {}".format(greeting))
    

Feladat létrehozása

  1. Kattintson Munkafolyamatok ikona Munkafolyamatok elemre az oldalsávon.

  2. Kattintson a Feladat létrehozása gomb parancsra.

    A Feladatok lap a Feladat létrehozása párbeszédpanelen jelenik meg.

    Az első feladat létrehozása párbeszédpanel

  3. Cserélje le a feladat nevét a feladat nevére.

  4. A Tevékenységnév mezőben adja meg a tevékenység nevét, például a megszólítást.

  5. A Típus legördülő menüben válassza a Jegyzetfüzet lehetőséget.

  6. A Forrás legördülő menüben válassza a Munkaterület lehetőséget.

  7. Kattintson az Elérési út szövegmezőre, és a fájlböngészővel keresse meg a létrehozott jegyzetfüzetet, kattintson a jegyzetfüzet nevére, majd a Megerősítés gombra.

  8. Kattintson a Hozzáadás gombra a Paraméterek területen. A Kulcs mezőbe írja be a következőtgreeting: Az Érték mezőbe írja be a következőtAirflow user:

  9. Kattintson a Feladat létrehozása gombra.

A Feladat részletei panelen másolja ki a feladatazonosító értékét. Ez az érték szükséges a feladat Airflow-ból való aktiválásához.

A feladat futtatása

Ha tesztelni szeretné az új feladatot az Azure Databricks-munkafolyamatok felhasználói felületén, kattintson Futtatás gomb a jobb felső sarokban. A futtatás befejezése után a feladatfuttatás részleteinek megtekintésével ellenőrizheti a kimenetet.

Új Airflow DAG létrehozása

Egy Airflow DAG-t definiál egy Python-fájlban. Dag létrehozása a példajegyzetfüzet-feladat aktiválásához:

  1. Szövegszerkesztőben vagy IDE-ben hozzon létre egy új fájlt databricks_dag.py a következő tartalommal:

    from airflow import DAG
    from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
    from airflow.utils.dates import days_ago
    
    default_args = {
      'owner': 'airflow'
    }
    
    with DAG('databricks_dag',
      start_date = days_ago(2),
      schedule_interval = None,
      default_args = default_args
      ) as dag:
    
      opr_run_now = DatabricksRunNowOperator(
        task_id = 'run_now',
        databricks_conn_id = 'databricks_default',
        job_id = JOB_ID
      )
    

    Cserélje le JOB_ID a korábban mentett feladatazonosító értékét.

  2. Mentse a fájlt a airflow/dags könyvtárban. Az Airflow automatikusan beolvassa és telepíti a fájlban airflow/dags/tárolt DAG-fájlokat.

A DAG telepítése és ellenőrzése az Airflow-ban

A DAG aktiválása és ellenőrzése az Airflow felhasználói felületén:

  1. Nyissa meg http://localhost:8080/homea böngészőablakban. Megjelenik az Airflow DAGs képernyő.
  2. Keresse meg databricks_dag és kattintson a Dag szüneteltetése/feloldása váltógombra a DAG feloldásához.
  3. A DAG aktiválásához kattintson a TRIGGER DAG gombra.
  4. Kattintson egy futtatásra a Futtatások oszlopban a futtatás állapotának és részleteinek megtekintéséhez.