Zelfstudie: Gegevens extraheren, transformeren en laden met behulp van Azure Databricks

In deze zelfstudie voert u een ETL-bewerking (Extraction, Transformation, and Loading) uit met behulp van Azure Databricks. U haalt gegevens op uit Azure Data Lake Storage Gen2 en brengt deze over naar Azure Databricks, voert transformaties uit op de gegevens in Azure Databricks, waarna u de getransformeerde gegevens in Azure Synapse Analytics laadt.

Voor de stappen in deze zelfstudie wordt gebruikgemaakt van de Azure Synapse-connector voor Azure Databricks om gegevens over te brengen naar Azure Databricks. Op zijn beurt gebruikt deze connector Azure Blob Storage als tijdelijke opslag voor de gegevens die worden overgebracht tussen een Azure Databricks-cluster en Azure Synapse.

In de volgende afbeelding wordt de stroom van de toepassing weergegeven:

Azure Databricks with Data Lake Store and Azure Synapse

Deze zelfstudie bestaat uit de volgende taken:

  • Een Azure Databricks-service maken.
  • Een Apache Spark-cluster in Azure Databricks maken.
  • Een bestandssysteem in uw Data Lake Storage Gen2-account maken.
  • Voorbeeldgegevens in het Azure Data Lake Storage Gen2-account uploaden.
  • Een service-principal maken.
  • Gegevens uit het Azure Data Lake Storage Gen2-account extraheren.
  • Gegevens transformeren in Azure Databricks.
  • Gegevens laden in Azure Synapse.

Als u geen Azure-abonnement hebt, maakt u een gratis account voordat u begint.

Notitie

Deze zelfstudie kan niet worden uitgevoerd met behulp van een gratis Azure-proefabonnement. Als u een gratis account hebt, gaat u naar uw profiel en wijzigt u uw abonnement in Betalen per gebruik. Zie Gratis Azure-account voor meer informatie. Vervolgens moet u de bestedingslimiet verwijderen en een quotumverhoging aanvragen voor vCPU's in uw regio. Wanneer u uw Azure Databricks-werkruimte maakt, kunt u de prijscategorie Proefversie (Premium - 14 dagen gratis DBU’s) selecteren om de werkruimte 14 dagen lang toegang te geven tot gratis Premium Azure Databricks DBU’s.

Vereisten

Voltooi deze taken voordat u aan deze zelfstudie begint:

Verzamel de benodigde informatie

Zorg dat u over alle vereisten voor deze zelfstudie beschikt.

Voor u begint, moet u de volgende gegevens hebben:

✔️ De databasenaam, databaseservernaam, gebruikersnaam en wachtwoord van uw Azure Synapse.

✔️ De toegangssleutel van uw Blob Storage-account.

✔️ De naam van uw Data Lake Storage Gen2-opslagaccount.

✔️ De tenant-id van uw abonnement.

✔️ De toepassings-id van de app die u hebt geregistreerd bij Microsoft Entra ID (voorheen Azure Active Directory).

✔️ De verificatiesleutel voor de app die u hebt geregistreerd bij Microsoft Entra ID (voorheen Azure Active Directory).

Een Azure Databricks-service maken

In dit gedeelte gaat u een Azure Databricks-service maken met behulp van de Azure-portal.

  1. Selecteer Een resource maken in het menu van Azure Portal.

    Create a resource on Azure portal

    Selecteer vervolgens Analytics>Azure Databricks.

    Create Azure Databricks on Azure portal

  2. Geef bij Azure Databricks Service de volgende waarden op voor het maken van een Databricks-service:

    Eigenschappen Beschrijving
    Werkruimtenaam Geef een naam op voor de Databricks-werkruimte.
    Abonnement Selecteer uw Azure-abonnement in de vervolgkeuzelijst.
    Resourcegroep Geef aan of u een nieuwe resourcegroep wilt maken of een bestaande groep wilt gebruiken. Een resourcegroep is een container met gerelateerde resources voor een Azure-oplossing. Zie Overzicht van Azure Resource Manager voor meer informatie.
    Location Selecteer VS - west 2. Zie Producten beschikbaar per regio voor andere beschikbare regio's.
    Prijscategorie Selecteer Standaard.
  3. Het duurt enkele minuten om het account te maken. Bekijk de voortgangsbalk bovenaan om de bewerkingsstatus te volgen.

  4. Selecteer Vastmaken aan dashboard en selecteer Maken.

Een Apache Spark-cluster in Azure Databricks maken

  1. Ga in de Azure-portal naar de Databricks-service die u hebt gemaakt en selecteer Werkruimte starten.

  2. U wordt omgeleid naar de Azure Databricks-portal. Klik in de portal op Cluster.

    Databricks on Azure

  3. Op de pagina Nieuw cluster geeft u de waarden op waarmee een nieuw cluster wordt gemaakt.

    Create Databricks Spark cluster on Azure

  4. Vul de waarden voor de volgende velden in (en laat bij de overige velden de standaardwaarden staan):

    • Voer een naam voor het cluster in.

    • Zorg ervoor dat u het selectievakje Beëindigen inschakelt na __ minuten van inactiviteit . Geef een duur (in minuten) op waarna het cluster moet worden beëindigd als het niet wordt gebruikt.

    • Selecteer Cluster maken. Als het cluster wordt uitgevoerd, kunt u notitieblokken koppelen aan het cluster en Apache Spark-taken uitvoeren.

Een bestandssysteem in uw Azure Data Lake Storage Gen2-account maken

In deze sectie maakt u een notebook in de Azure Databricks-werkruimte en voert u vervolgens codefragmenten uit om het opslagaccount te configureren

  1. Ga in de Azure-portal naar de Azure Databricks-service die u hebt gemaakt en selecteer Werkruimte starten.

  2. Selecteer aan de linkerkant Werkruimte. Selecteer in de Werkruimte-vervolgkeuzelijst, Notitieblok>maken.

    Create a notebook in Databricks

  3. Voer in het dialoogvenster Notitieblok maken een naam voor het notitieblok in. Selecteer Scala als taal en selecteer het Spark-cluster dat u eerder hebt gemaakt.

    Provide details for a notebook in Databricks

  4. Selecteer Maken.

  5. Met het volgende codeblok worden standaardreferenties voor de service-principal ingesteld voor elk ADLS Gen 2-account dat in de Spark-sessie wordt geopend. Met het tweede codeblok wordt de accountnaam toegevoegd aan de instelling om referenties op te geven voor een specifiek ADLS Gen 2-account. Kopieer en plak een van beide codeblokken in de eerste cel van uw Azure Databricks-notitieblok.

    Sessieconfiguratie

    val appID = "<appID>"
    val secret = "<secret>"
    val tenantID = "<tenant-id>"
    
    spark.conf.set("fs.azure.account.auth.type", "OAuth")
    spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
    spark.conf.set("fs.azure.account.oauth2.client.id", "<appID>")
    spark.conf.set("fs.azure.account.oauth2.client.secret", "<secret>")
    spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<tenant-id>/oauth2/token")
    spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")
    

    Accountconfiguratie

    val storageAccountName = "<storage-account-name>"
    val appID = "<app-id>"
    val secret = "<secret>"
    val fileSystemName = "<file-system-name>"
    val tenantID = "<tenant-id>"
    
    spark.conf.set("fs.azure.account.auth.type." + storageAccountName + ".dfs.core.windows.net", "OAuth")
    spark.conf.set("fs.azure.account.oauth.provider.type." + storageAccountName + ".dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
    spark.conf.set("fs.azure.account.oauth2.client.id." + storageAccountName + ".dfs.core.windows.net", "" + appID + "")
    spark.conf.set("fs.azure.account.oauth2.client.secret." + storageAccountName + ".dfs.core.windows.net", "" + secret + "")
    spark.conf.set("fs.azure.account.oauth2.client.endpoint." + storageAccountName + ".dfs.core.windows.net", "https://login.microsoftonline.com/" + tenantID + "/oauth2/token")
    spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")
    dbutils.fs.ls("abfss://" + fileSystemName  + "@" + storageAccountName + ".dfs.core.windows.net/")
    spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "false")
    
  6. In dit codeblok vervangt u de tijdelijke aanduidingen <app-id>, <secret>, <tenant-id> en <storage-account-name> door de waarden die u hebt verzameld bij het uitvoeren van de vereiste stappen voor deze zelfstudie. Vervang de tijdelijke aanduiding <file-system-name> door de naam die u het bestandssysteem wilt geven.

    • De tijdelijke aanduidingen <app-id> en <secret> zijn afkomstig uit de app die u bij Active Directory hebt geregistreerd tijdens het maken van een service-principal.

    • De tijdelijke aanduiding <tenant-id> is afkomstig van uw abonnement.

    • De tijdelijke aanduiding <storage-account-name> is de naam van uw Azure Data Lake Storage Gen2-opslagaccount.

  7. Druk op de toetsen Shift + Enter om de code in dit blok uit te voeren.

Voorbeeldgegevens in het Azure Data Lake Storage Gen2-account opnemen

Voordat u met deze sectie begint, moet u eerst aan de volgende vereisten voldoen:

Voer de volgende code in een notitieblokcel in:

%sh wget -P /tmp https://raw.githubusercontent.com/Azure/usql/master/Examples/Samples/Data/json/radiowebsite/small_radio_json.json

Druk in de cel op SHIFT+ENTER om de code uit te voeren.

Voer nu in een cel onder deze cel de volgende code in, en vervang de waarden tussen haakjes door dezelfde waarden die u eerder hebt gebruikt:

dbutils.fs.cp("file:///tmp/small_radio_json.json", "abfss://" + fileSystemName + "@" + storageAccountName + ".dfs.core.windows.net/")

Druk in de cel op SHIFT+ENTER om de code uit te voeren.

Gegevens uit het Azure Data Lake Storage Gen2-account extraheren

  1. U kunt nu het JSON-voorbeeldbestand laden als een dataframe in Azure Databricks. Plak de volgende code in de nieuwe cel. Vervang de tijdelijke aanduidingen tussen haken door uw eigen waarden.

    val df = spark.read.json("abfss://" + fileSystemName + "@" + storageAccountName + ".dfs.core.windows.net/small_radio_json.json")
    
  2. Druk op de toetsen Shift + Enter om de code in dit blok uit te voeren.

  3. Voer de volgende code uit om de inhoud van het dataframe weer te geven:

    df.show()
    

    Het volgende (of een vergelijkbaar) codefragment wordt weergegeven:

    +---------------------+---------+---------+------+-------------+----------+---------+-------+--------------------+------+--------+-------------+---------+--------------------+------+-------------+------+
    |               artist|     auth|firstName|gender|itemInSession|  lastName|   length|  level|            location|method|    page| registration|sessionId|                song|status|           ts|userId|
    +---------------------+---------+---------+------+-------------+----------+---------+-------+--------------------+------+--------+-------------+---------+--------------------+------+-------------+------+
    | El Arrebato         |Logged In| Annalyse|     F|            2|Montgomery|234.57914| free  |  Killeen-Temple, TX|   PUT|NextSong|1384448062332|     1879|Quiero Quererte Q...|   200|1409318650332|   309|
    | Creedence Clearwa...|Logged In|   Dylann|     M|            9|    Thomas|340.87138| paid  |       Anchorage, AK|   PUT|NextSong|1400723739332|       10|        Born To Move|   200|1409318653332|    11|
    | Gorillaz            |Logged In|     Liam|     M|           11|     Watts|246.17751| paid  |New York-Newark-J...|   PUT|NextSong|1406279422332|     2047|                DARE|   200|1409318685332|   201|
    ...
    ...
    

    U hebt de gegevens nu geëxtraheerd uit Azure Data Lake Store Gen2 en geladen in Azure Databricks.

Gegevens transformeren in Azure Databricks

Het bestand small_radio_json.json met de onbewerkte voorbeeldgegevensset legt de doelgroep voor een radiozender vast en bestaat uit een aantal kolommen. In deze sectie gaat u de gegevens zodanig transformeren dat alleen bepaalde kolommen uit de gegevensset worden opgehaald.

  1. Haal eerst alleen de kolommen firstName, lastName, gender, location en level op uit het dataframe dat u hebt gemaakt.

    val specificColumnsDf = df.select("firstname", "lastname", "gender", "location", "level")
    specificColumnsDf.show()
    

    De uitvoer die u ontvangt, wordt weergegeven in het volgende codefragment:

    +---------+----------+------+--------------------+-----+
    |firstname|  lastname|gender|            location|level|
    +---------+----------+------+--------------------+-----+
    | Annalyse|Montgomery|     F|  Killeen-Temple, TX| free|
    |   Dylann|    Thomas|     M|       Anchorage, AK| paid|
    |     Liam|     Watts|     M|New York-Newark-J...| paid|
    |     Tess|  Townsend|     F|Nashville-Davidso...| free|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...| free|
    |     Alan|     Morse|     M|Chicago-Napervill...| paid|
    |Gabriella|   Shelton|     F|San Jose-Sunnyval...| free|
    |   Elijah|  Williams|     M|Detroit-Warren-De...| paid|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...| free|
    |     Tess|  Townsend|     F|Nashville-Davidso...| free|
    |     Alan|     Morse|     M|Chicago-Napervill...| paid|
    |     Liam|     Watts|     M|New York-Newark-J...| paid|
    |     Liam|     Watts|     M|New York-Newark-J...| paid|
    |   Dylann|    Thomas|     M|       Anchorage, AK| paid|
    |     Alan|     Morse|     M|Chicago-Napervill...| paid|
    |   Elijah|  Williams|     M|Detroit-Warren-De...| paid|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...| free|
    |     Alan|     Morse|     M|Chicago-Napervill...| paid|
    |   Dylann|    Thomas|     M|       Anchorage, AK| paid|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...| free|
    +---------+----------+------+--------------------+-----+
    
  2. U kunt deze gegevens nog verder transformeren door de naam van de kolom level te wijzigen in subscription_type.

    val renamedColumnsDF = specificColumnsDf.withColumnRenamed("level", "subscription_type")
    renamedColumnsDF.show()
    

    De uitvoer die u ontvangt, wordt weergegeven in het volgende codefragment.

    +---------+----------+------+--------------------+-----------------+
    |firstname|  lastname|gender|            location|subscription_type|
    +---------+----------+------+--------------------+-----------------+
    | Annalyse|Montgomery|     F|  Killeen-Temple, TX|             free|
    |   Dylann|    Thomas|     M|       Anchorage, AK|             paid|
    |     Liam|     Watts|     M|New York-Newark-J...|             paid|
    |     Tess|  Townsend|     F|Nashville-Davidso...|             free|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...|             free|
    |     Alan|     Morse|     M|Chicago-Napervill...|             paid|
    |Gabriella|   Shelton|     F|San Jose-Sunnyval...|             free|
    |   Elijah|  Williams|     M|Detroit-Warren-De...|             paid|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...|             free|
    |     Tess|  Townsend|     F|Nashville-Davidso...|             free|
    |     Alan|     Morse|     M|Chicago-Napervill...|             paid|
    |     Liam|     Watts|     M|New York-Newark-J...|             paid|
    |     Liam|     Watts|     M|New York-Newark-J...|             paid|
    |   Dylann|    Thomas|     M|       Anchorage, AK|             paid|
    |     Alan|     Morse|     M|Chicago-Napervill...|             paid|
    |   Elijah|  Williams|     M|Detroit-Warren-De...|             paid|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...|             free|
    |     Alan|     Morse|     M|Chicago-Napervill...|             paid|
    |   Dylann|    Thomas|     M|       Anchorage, AK|             paid|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...|             free|
    +---------+----------+------+--------------------+-----------------+
    

Gegevens laden in Azure Synapse

In deze sectie uploadt u de getransformeerde gegevens naar Azure Synapse. Gebruik de Azure Synapse-connector voor Azure Databricks om een dataframe rechtstreeks als een tabel te uploaden in een Synapse Spark-pool.

Zoals eerder vermeld, maakt de Azure Synapse-connector gebruik van Azure Blob-opslag als tijdelijke opslag voor het uploaden van gegevens tussen Azure Databricks en Azure Synapse. U begint met het opgeven van de configuratie om verbinding te maken met het opslagaccount. U moet het account al hebben gemaakt als onderdeel van de vereisten voor dit artikel.

  1. Geef de configuratie op voor toegang tot het Azure Storage-account vanuit Azure Databricks.

    val blobStorage = "<blob-storage-account-name>.blob.core.windows.net"
    val blobContainer = "<blob-container-name>"
    val blobAccessKey =  "<access-key>"
    
  2. Geef een tijdelijke map op die wordt gebruikt tijdens het verplaatsen van gegevens tussen Azure Databricks en Azure Synapse.

    val tempDir = "wasbs://" + blobContainer + "@" + blobStorage +"/tempDirs"
    
  3. Voer het volgende codefragment uit om toegangssleutels voor Azure Blob-opslag op te slaan in de configuratie. Deze actie zorgt ervoor dat u de toegangssleutel niet als gewone tekst in het notebook hoeft te bewaren.

    val acntInfo = "fs.azure.account.key."+ blobStorage
    sc.hadoopConfiguration.set(acntInfo, blobAccessKey)
    
  4. Geef de waarden op om verbinding te maken met de instantie van Azure Synapse. Het is een vereiste dat u al een Azure Synapse Analytics-service hebt gemaakt. Gebruik de volledig gekwalificeerde servernaam voor dwServer. Bijvoorbeeld: <servername>.database.windows.net.

    //Azure Synapse related settings
    val dwDatabase = "<database-name>"
    val dwServer = "<database-server-name>"
    val dwUser = "<user-name>"
    val dwPass = "<password>"
    val dwJdbcPort =  "1433"
    val dwJdbcExtraOptions = "encrypt=true;trustServerCertificate=true;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
    val sqlDwUrl = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass + ";$dwJdbcExtraOptions"
    val sqlDwUrlSmall = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass
    
  5. Voer het volgende codefragment uit om het getransformeerde dataframe, renamedColumnsDf, als tabel te laden in Azure Synapse. Met dit fragment wordt een tabel met de naam SampleTable gemaakt in de SQL-database.

    spark.conf.set(
        "spark.sql.parquet.writeLegacyFormat",
        "true")
    
    renamedColumnsDF.write.format("com.databricks.spark.sqldw").option("url", sqlDwUrlSmall).option("dbtable", "SampleTable")       .option( "forward_spark_azure_storage_credentials","True").option("tempdir", tempDir).mode("overwrite").save()
    

    Notitie

    In dit voorbeeld wordt de vlag forward_spark_azure_storage_credentials gebruikt, waardoor Azure Synapse toegang heeft tot gegevens in de blob-opslag met behulp van een toegangssleutel. Dit is de enige ondersteunde verificatiemethode.

    Als uw Azure Blob Storage is beperkt tot bepaalde virtuele netwerken, vereist Azure Synapse Managed Service Identity in plaats van toegangssleutels. Dit veroorzaakt de fout dat deze aanvraag niet gemachtigd is om deze bewerking uit te voeren.

  6. Maak verbinding met de SQL-database en controleer of u de database SampleTable ziet.

    Verify the sample table

  7. Voer een Select-query uit om de inhoud van de tabel te controleren. De tabel moet dezelfde gegevens bevatten als het dataframe renamedColumnsDF.

    Verify the sample table content

Resources opschonen

Nadat u de zelfstudie hebt voltooid, kunt u het cluster beëindigen. Selecteer links Clusters vanuit de Azure Databricks-werkruimte. Als u het cluster wilt beëindigen, wijst u onder Acties het beletselteken (...) aan en selecteert u het pictogram Beëindigen.

Stop a Databricks cluster

Als u het cluster niet handmatig beëindigt, stopt het automatisch, mits u het selectievakje Beëindigen hebt ingeschakeld na __ minuten van inactiviteit toen u het cluster maakte. In dat geval stopt het cluster automatisch als het gedurende de opgegeven tijd inactief is geweest.

Volgende stappen

In deze zelfstudie heeft u het volgende geleerd:

  • Een Azure Databricks-service maken
  • Een Apache Spark-cluster in Azure Databricks maken
  • Een notitieblok maken in Azure Databricks
  • Gegevens extraheren uit een Data Lake Storage Gen2-account
  • Gegevens transformeren in Azure Databricks
  • Gegevens laden in Azure Synapse