Oktatóanyag: Adatok kinyerése, átalakítása és betöltése az Azure Databricks használatával

Ebben az oktatóanyagban egy ETL-műveletet (adatok kinyerése, átalakítása és betöltése) hajt végre az Azure Databricks használatával. Adatokat nyerhet ki Azure Data Lake Storage Gen2 az Azure Databricksbe, átalakításokat futtathat az Adatokon az Azure Databricksben, és betöltheti az átalakított adatokat a Azure Synapse Analyticsbe.

Az oktatóanyag lépései az Azure Databricks Azure Synapse-összekötőjével továbbítják az adatokat az Azure Databricksbe. Ez az összekötő viszont Azure Blob Storage ideiglenes tárolóként használja az Azure Databricks-fürt és a Azure Synapse között átvitt adatokhoz.

Az alábbi ábrán az alkalmazásfolyam látható:

Azure Databricks with Data Lake Store and Azure Synapse

Ez az oktatóanyag a következő feladatokat mutatja be:

  • Azure Databricks-szolgáltatás létrehozása.
  • Spark-fürt létrehozása az Azure Databricksben.
  • Hozzon létre egy fájlrendszert a Data Lake Storage Gen2-fiókban.
  • Töltse fel a mintaadatokat a Azure Data Lake Storage Gen2-fiókba.
  • Hozzon létre egy szolgáltatásnevet.
  • Adatok kinyerése a Azure Data Lake Storage Gen2-fiókból.
  • Adatok átalakítása az Azure Databricksben.
  • Adatok betöltése Azure Synapse.

Ha még nincs Azure-előfizetése, kezdés előtt hozzon létre egy ingyenes fiókot.

Megjegyzés

Ez az oktatóanyag nem végezhető el ingyenes Azure-próbaverziós előfizetéssel. Ha ingyenes fiókkal rendelkezik, nyissa meg a profilját, és módosítsa előfizetését használatalapú fizetésre. További információkért lásd az ingyenes Azure-fiókot ismertető cikket. Ezután távolítsa el a költségkeretet, és kérje a régióban lévő vCPU-k kvótanövelését . Az Azure Databricks-munkaterület létrehozásakor kiválaszthatja a Próbaverzió (Prémium – 14 napos ingyenes DBU-k) tarifacsomagot, hogy a munkaterület 14 napig ingyenes Prémium Azure Databricks DBU-khoz biztosítson hozzáférést.

Előfeltételek

Az oktatóanyag megkezdése előtt végezze el ezeket a feladatokat:

Gyűjtse össze a szükséges információkat

Győződjön meg arról, hogy teljesítette az oktatóanyag előfeltételeit.

Mielőtt hozzákezdene, rendelkeznie kell az alábbi információkkal:

✔️ A Azure Synapse adatbázis-neve, adatbázis-kiszolgálójának neve, felhasználóneve és jelszava.

✔️ A Blob Storage-fiók hozzáférési kulcsa.

✔️ A Data Lake Storage Gen2 tárfiók neve.

✔️ Az előfizetés bérlőazonosítója.

✔️ A Azure Active Directory (Azure AD) szolgáltatásban regisztrált alkalmazás alkalmazásazonosítója.

✔️ Az Azure AD-ben regisztrált alkalmazás hitelesítési kulcsa.

Azure Databricks-szolgáltatás létrehozása

Ebben a szakaszban egy Azure Databricks-szolgáltatást hoz létre a Azure Portal használatával.

  1. Az Azure Portal menüjében válassza az Erőforrás létrehozása elemet.

    Create a resource on Azure portal

    Ezután válassza az AnalyticsAzure>Databricks lehetőséget.

    Create Azure Databricks on Azure portal

  2. Az Azure Databricks Service alatt adja meg a következő értékeket a Databricks-szolgáltatás létrehozásához:

    Tulajdonság Leírás
    Munkaterület neve Adja meg a Databricks-munkaterület nevét.
    Előfizetés Válassza ki a legördülő menüből a saját Azure-előfizetését.
    Erőforráscsoport Adja meg, hogy új erőforráscsoportot kíván-e létrehozni, vagy egy meglévőt szeretne használni. Az erőforráscsoport olyan tároló, amely egy adott Azure-megoldás kapcsolódó erőforrásait tartalmazza. További információért olvassa el az Azure-erőforráscsoportok áttekintését.
    Hely Válassza az USA 2. nyugati régióját. A további elérhető régiókért tekintse meg az elérhető Azure-szolgáltatások régiók szerinti bontását.
    Tarifacsomag Válassza a Standard lehetőséget.
  3. A fiók létrehozása eltarthat néhány percig. A művelet állapotának figyeléséhez tekintse meg a folyamatjelző sávot a tetején.

  4. Válassza a Rögzítés az irányítópulton, majd a Létrehozás lehetőséget.

Spark-fürt létrehozása az Azure Databricksben

  1. A Azure Portal lépjen a létrehozott Databricks szolgáltatásra, és válassza a Munkaterület indítása lehetőséget.

  2. A rendszer átirányítja az Azure Databricks portálra. A portálon válassza a Fürt elemet.

    Databricks on Azure

  3. Az Új fürt lapon adja meg a fürt létrehozásához szükséges értékeket.

    Create Databricks Spark cluster on Azure

  4. Adjon meg értékeket a következő mezőkben, és fogadja el az alapértelmezett értékeket a többi mezőben:

    • Adjon egy nevet a fürtnek.

    • Jelölje be a Leállítás ennyi perc inaktivitás után jelölőnégyzetet. Ha a fürt nincs használatban, adjon meg egy időtartamot (percben) a fürt leállításához.

    • Válassza a Fürt létrehozása lehetőséget. A fürt futtatása után jegyzetfüzeteket csatolhat a fürthöz, és Spark-feladatokat futtathat.

Fájlrendszer létrehozása a Azure Data Lake Storage Gen2-fiókban

Ebben a szakaszban létrehoz egy jegyzetfüzetet az Azure Databricks-munkaterületen, majd kódrészleteket futtat a tárfiók konfigurálásához

  1. A Azure Portal lépjen a létrehozott Azure Databricks szolgáltatásra, és válassza a Munkaterület indítása lehetőséget.

  2. A bal oldalon válassza a Munkaterület lehetőséget. A Munkaterület legördülő menüből válassza a Létrehozás>Jegyzetfüzet lehetőséget.

    Create a notebook in Databricks

  3. A Jegyzetfüzet létrehozása párbeszédpanelen adja meg a jegyzetfüzet nevét. Válassza a Scala nyelvet, majd válassza ki a korábban létrehozott Spark-fürtöt.

    Provide details for a notebook in Databricks

  4. Válassza a Létrehozás lehetőséget.

  5. Az alábbi kódblokk a Spark-munkamenetben elért bármely ADLS Gen 2-fiók alapértelmezett szolgáltatásnév-hitelesítő adatait állítja be. A második kódblokk hozzáfűzi a fiók nevét a beállításhoz egy adott ADLS Gen 2-fiók hitelesítő adatainak megadásához. Másolja és illessze be bármelyik kódblokkot az Azure Databricks-jegyzetfüzet első cellájába.

    Munkamenet-konfiguráció

    val appID = "<appID>"
    val secret = "<secret>"
    val tenantID = "<tenant-id>"
    
    spark.conf.set("fs.azure.account.auth.type", "OAuth")
    spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
    spark.conf.set("fs.azure.account.oauth2.client.id", "<appID>")
    spark.conf.set("fs.azure.account.oauth2.client.secret", "<secret>")
    spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<tenant-id>/oauth2/token")
    spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")
    

    Fiók konfigurálása

    val storageAccountName = "<storage-account-name>"
    val appID = "<app-id>"
    val secret = "<secret>"
    val fileSystemName = "<file-system-name>"
    val tenantID = "<tenant-id>"
    
    spark.conf.set("fs.azure.account.auth.type." + storageAccountName + ".dfs.core.windows.net", "OAuth")
    spark.conf.set("fs.azure.account.oauth.provider.type." + storageAccountName + ".dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
    spark.conf.set("fs.azure.account.oauth2.client.id." + storageAccountName + ".dfs.core.windows.net", "" + appID + "")
    spark.conf.set("fs.azure.account.oauth2.client.secret." + storageAccountName + ".dfs.core.windows.net", "" + secret + "")
    spark.conf.set("fs.azure.account.oauth2.client.endpoint." + storageAccountName + ".dfs.core.windows.net", "https://login.microsoftonline.com/" + tenantID + "/oauth2/token")
    spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")
    dbutils.fs.ls("abfss://" + fileSystemName  + "@" + storageAccountName + ".dfs.core.windows.net/")
    spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "false")
    
  6. Ebben a kódblokkban cserélje le a <app-id>kódblokkban található , <secret>, <tenant-id>és <storage-account-name> helyőrző értékeket azokra az értékekre, amelyeket az oktatóanyag előfeltételeinek teljesítése során gyűjtött. Cserélje le a <file-system-name> helyőrző értéket a fájlrendszernek adni kívánt névre.

    • A <app-id>, és <secret> abból az alkalmazásból származnak, amelyet egy szolgáltatásnév létrehozása során regisztrált az Active Directoryban.

    • Az <tenant-id> előfizetésből származik.

    • Ez <storage-account-name> a Azure Data Lake Storage Gen2 tárfiók neve.

  7. Nyomja le a SHIFT+ ENTER billentyűkombinációt a blokk kódjának futtatásához.

Mintaadatok betöltése a Azure Data Lake Storage Gen2-fiókba

Mielőtt ehhez a szakaszhoz hozzáfogna, a következő előfeltételeknek kell eleget tennie:

Írja be az alábbi kódot egy jegyzetfüzetcellába:

%sh wget -P /tmp https://raw.githubusercontent.com/Azure/usql/master/Examples/Samples/Data/json/radiowebsite/small_radio_json.json

A cellában nyomja le a SHIFT+ ENTER billentyűkombinációt a kód futtatásához.

Most egy új cellába írja be a következő kódot, és cserélje le a szögletes zárójelben megjelenő értékeket a korábban használt értékekkel:

dbutils.fs.cp("file:///tmp/small_radio_json.json", "abfss://" + fileSystemName + "@" + storageAccountName + ".dfs.core.windows.net/")

A cellában nyomja le a SHIFT+ ENTER billentyűkombinációt a kód futtatásához.

Adatok kinyerése a Azure Data Lake Storage Gen2-fiókból

  1. Most már betöltheti a json-mintafájlt adatkeretként az Azure Databricksben. Illessze be a következő kódot egy új cellába. Cserélje le a szögletes zárójelben látható helyőrzőket az értékekre.

    val df = spark.read.json("abfss://" + fileSystemName + "@" + storageAccountName + ".dfs.core.windows.net/small_radio_json.json")
    
  2. Nyomja le a SHIFT+ ENTER billentyűkombinációt a blokk kódjának futtatásához.

  3. Futtassa a következő kódot az adatkeret tartalmának megtekintéséhez:

    df.show()
    

    Az alábbi kódrészlethez hasonló kimenet jelenik meg:

    +---------------------+---------+---------+------+-------------+----------+---------+-------+--------------------+------+--------+-------------+---------+--------------------+------+-------------+------+
    |               artist|     auth|firstName|gender|itemInSession|  lastName|   length|  level|            location|method|    page| registration|sessionId|                song|status|           ts|userId|
    +---------------------+---------+---------+------+-------------+----------+---------+-------+--------------------+------+--------+-------------+---------+--------------------+------+-------------+------+
    | El Arrebato         |Logged In| Annalyse|     F|            2|Montgomery|234.57914| free  |  Killeen-Temple, TX|   PUT|NextSong|1384448062332|     1879|Quiero Quererte Q...|   200|1409318650332|   309|
    | Creedence Clearwa...|Logged In|   Dylann|     M|            9|    Thomas|340.87138| paid  |       Anchorage, AK|   PUT|NextSong|1400723739332|       10|        Born To Move|   200|1409318653332|    11|
    | Gorillaz            |Logged In|     Liam|     M|           11|     Watts|246.17751| paid  |New York-Newark-J...|   PUT|NextSong|1406279422332|     2047|                DARE|   200|1409318685332|   201|
    ...
    ...
    

    Ezzel kinyerte az adatokat a 2. generációs Azure Data Lake Storage-ből az Azure Databricksbe.

Adatok átalakítása az Azure Databricksben

A nyers mintaadatok small_radio_json.json fájl rögzíti a rádióállomás célközönségét, és számos oszlopot tartalmaz. Ebben a szakaszban átalakítja az adatokat úgy, hogy csak adott oszlopokat kérjenek le az adathalmazból.

  1. Először csak a firstName, a lastName, a gender, a location és a level oszlopokat kérje le a létrehozott adatkeretből.

    val specificColumnsDf = df.select("firstname", "lastname", "gender", "location", "level")
    specificColumnsDf.show()
    

    A kimenet az alábbi kódrészletben látható módon jelenik meg:

    +---------+----------+------+--------------------+-----+
    |firstname|  lastname|gender|            location|level|
    +---------+----------+------+--------------------+-----+
    | Annalyse|Montgomery|     F|  Killeen-Temple, TX| free|
    |   Dylann|    Thomas|     M|       Anchorage, AK| paid|
    |     Liam|     Watts|     M|New York-Newark-J...| paid|
    |     Tess|  Townsend|     F|Nashville-Davidso...| free|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...| free|
    |     Alan|     Morse|     M|Chicago-Napervill...| paid|
    |Gabriella|   Shelton|     F|San Jose-Sunnyval...| free|
    |   Elijah|  Williams|     M|Detroit-Warren-De...| paid|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...| free|
    |     Tess|  Townsend|     F|Nashville-Davidso...| free|
    |     Alan|     Morse|     M|Chicago-Napervill...| paid|
    |     Liam|     Watts|     M|New York-Newark-J...| paid|
    |     Liam|     Watts|     M|New York-Newark-J...| paid|
    |   Dylann|    Thomas|     M|       Anchorage, AK| paid|
    |     Alan|     Morse|     M|Chicago-Napervill...| paid|
    |   Elijah|  Williams|     M|Detroit-Warren-De...| paid|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...| free|
    |     Alan|     Morse|     M|Chicago-Napervill...| paid|
    |   Dylann|    Thomas|     M|       Anchorage, AK| paid|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...| free|
    +---------+----------+------+--------------------+-----+
    
  2. Az adatok további átalakításához nevezze át a level oszlopot a következőre: subscription_type.

    val renamedColumnsDF = specificColumnsDf.withColumnRenamed("level", "subscription_type")
    renamedColumnsDF.show()
    

    A kimenet az alábbi kódrészletben látható módon jelenik meg.

    +---------+----------+------+--------------------+-----------------+
    |firstname|  lastname|gender|            location|subscription_type|
    +---------+----------+------+--------------------+-----------------+
    | Annalyse|Montgomery|     F|  Killeen-Temple, TX|             free|
    |   Dylann|    Thomas|     M|       Anchorage, AK|             paid|
    |     Liam|     Watts|     M|New York-Newark-J...|             paid|
    |     Tess|  Townsend|     F|Nashville-Davidso...|             free|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...|             free|
    |     Alan|     Morse|     M|Chicago-Napervill...|             paid|
    |Gabriella|   Shelton|     F|San Jose-Sunnyval...|             free|
    |   Elijah|  Williams|     M|Detroit-Warren-De...|             paid|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...|             free|
    |     Tess|  Townsend|     F|Nashville-Davidso...|             free|
    |     Alan|     Morse|     M|Chicago-Napervill...|             paid|
    |     Liam|     Watts|     M|New York-Newark-J...|             paid|
    |     Liam|     Watts|     M|New York-Newark-J...|             paid|
    |   Dylann|    Thomas|     M|       Anchorage, AK|             paid|
    |     Alan|     Morse|     M|Chicago-Napervill...|             paid|
    |   Elijah|  Williams|     M|Detroit-Warren-De...|             paid|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...|             free|
    |     Alan|     Morse|     M|Chicago-Napervill...|             paid|
    |   Dylann|    Thomas|     M|       Anchorage, AK|             paid|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...|             free|
    +---------+----------+------+--------------------+-----------------+
    

Adatok betöltése Azure Synapse

Ebben a szakaszban feltölti az átalakított adatokat Azure Synapse. Az Azure Databricks Azure Synapse-összekötőjével közvetlenül feltölthet egy adatkeretet táblázatként egy Synapse Spark-készletbe.

Ahogy korábban említettük, a Azure Synapse-összekötő az Azure Blob Storage-ot használja ideiglenes tárolóként az Azure Databricks és Azure Synapse közötti adatok feltöltéséhez. Ezért első lépésként adja meg a tárfiókhoz való csatlakozáshoz szükséges konfigurációt. A jelen cikk előfeltételeinek részeként már létre kell hoznia a fiókot.

  1. Adja meg az Azure Storage-fiók Azure Databricksből való eléréséhez szükséges konfigurációt.

    val blobStorage = "<blob-storage-account-name>.blob.core.windows.net"
    val blobContainer = "<blob-container-name>"
    val blobAccessKey =  "<access-key>"
    
  2. Adjon meg egy ideiglenes mappát, amelyet az adatok Azure Databricks és Azure Synapse közötti áthelyezésekor szeretne használni.

    val tempDir = "wasbs://" + blobContainer + "@" + blobStorage +"/tempDirs"
    
  3. Futtassa az alábbi kódrészletet az Azure Blob Storage hozzáférési kulcsainak a konfigurációban való tárolásához. Ez a művelet biztosítja, hogy ne kelljen a jegyzetfüzetben lévő hozzáférési kulcsot egyszerű szövegben tárolnia.

    val acntInfo = "fs.azure.account.key."+ blobStorage
    sc.hadoopConfiguration.set(acntInfo, blobAccessKey)
    
  4. Adja meg a Azure Synapse példányhoz való csatlakozáshoz szükséges értékeket. Előfeltételként létre kell hoznia egy Azure Synapse Analytics-szolgáltatást. Használja a dwServer teljes kiszolgálónevét. Például: <servername>.database.windows.net.

    //Azure Synapse related settings
    val dwDatabase = "<database-name>"
    val dwServer = "<database-server-name>"
    val dwUser = "<user-name>"
    val dwPass = "<password>"
    val dwJdbcPort =  "1433"
    val dwJdbcExtraOptions = "encrypt=true;trustServerCertificate=true;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
    val sqlDwUrl = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass + ";$dwJdbcExtraOptions"
    val sqlDwUrlSmall = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass
    
  5. Futtassa az alábbi kódrészletet az átalakított, renamedColumnsDF nevű adatkeret táblázatként való betöltéséhez Azure Synapse. Ez a kódrészlet létrehoz egy SampleTable nevű táblát az SQL-adatbázisban.

    spark.conf.set(
        "spark.sql.parquet.writeLegacyFormat",
        "true")
    
    renamedColumnsDF.write.format("com.databricks.spark.sqldw").option("url", sqlDwUrlSmall).option("dbtable", "SampleTable")       .option( "forward_spark_azure_storage_credentials","True").option("tempdir", tempDir).mode("overwrite").save()
    

    Megjegyzés

    Ez a minta a forward_spark_azure_storage_credentials jelölőt használja, ami miatt Azure Synapse hozzáférési kulcs használatával fér hozzá a blobtárolóból származó adatokhoz. Ez az egyetlen támogatott hitelesítési módszer.

    Ha a Azure Blob Storage virtuális hálózatokra van korlátozva, Azure Synapse hozzáférési kulcsok helyett felügyeltszolgáltatás-identitásra van szükség. Ez a "Ez a kérés nem jogosult a művelet végrehajtására" hibaüzenetet okozza.

  6. Csatlakozás a SQL adatbázishoz, és ellenőrizze, hogy megjelenik-e a SampleTable nevű adatbázis.

    Verify the sample table

  7. Futtasson egy választó lekérdezést a tábla tartalmának ellenőrzéséhez. A táblának ugyanazokkal az adatokkal kell rendelkeznie, mint az átnevezettColumnsDF adatkeretnek.

    Verify the sample table content

Az erőforrások eltávolítása

Az oktatóanyag befejezése után leállíthatja a fürtöt. Az Azure Databricks-munkaterület bal oldalán válassza a Fürtök lehetőséget. Ahhoz, hogy a fürt leálljon, a Műveletek területen mutasson a három pontra (...), és válassza a Leállítás ikont.

Stop a Databricks cluster

Ha nem állítja le manuálisan a fürtöt, az automatikusan leáll, feltéve, hogy a fürt létrehozásakor bejelölte a Leállítás __ perc inaktivitás után jelölőnégyzetet. Ilyen esetben a fürt automatikusan leáll, ha a megadott ideig inaktív volt.

További lépések

Ez az oktatóanyag bemutatta, hogyan végezheti el az alábbi műveleteket:

  • Azure Databricks-szolgáltatás létrehozása
  • Spark-fürt létrehozása az Azure Databricksben
  • Jegyzetfüzet létrehozása az Azure Databricksben
  • Adatok kinyerása Data Lake Storage Gen2-fiókból
  • Adatok átalakítása az Azure Databricksben
  • Adatok betöltése Azure Synapse

Folytassa a következő oktatóanyaggal, amely azt ismerteti, hogyan streamelhetők valós időben az adatok az Azure Databricksbe az Azure Event Hubs használatával.