Tutorial: Azure Data Lake Storage Gen2, Azure Databricks en Spark

In deze zelfstudie ziet u hoe u een Azure Databricks-cluster kunt verbinden met gegevens die zijn opgeslagen in een Azure-opslagaccount waarvoor Azure Data Lake Storage Gen2 is ingeschakeld. Deze verbinding stelt u in staat om systeemeigen query’s en analyses van uw cluster uit te voeren op uw gegevens.

In deze zelfstudie leert u het volgende:

  • Niet-gestructureerde gegevens opnemen in een opslagaccount
  • Analyse uitvoeren op gegevens in Blob-opslag

Als u geen Azure-abonnement hebt, maakt u een gratis account voordat u begint.

Vereisten

Een Azure Databricks-werkruimte, -cluster en -notebook maken

  1. Een Azure Databricks-werkruimte maken. Zie Een Azure Databricks-werkruimte maken.

  2. Een cluster maken. Zie Een cluster maken.

  3. Maak een notebook. Zie Een notitieblok maken. Kies Python als de standaardtaal van het notebook.

Houd uw notitieblok geopend. U gebruikt deze in de volgende secties.

De vluchtgegevens downloaden

In deze zelfstudie wordt gebruikgemaakt van on-time vluchtgegevens voor januari 2016 van het Bureau of Transportation Statistics om te laten zien hoe u een ETL-bewerking uitvoert. U moet deze gegevens downloaden om de zelfstudie te kunnen voltooien.

  1. Download het bestand On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2016_1.zip . Dit bestand bevat de vluchtgegevens.

  2. Pak de inhoud van het zipbestand uit en noteer de bestandsnaam en het pad van het bestand. U hebt deze informatie in een latere stap nodig.

Als u meer wilt weten over de informatie die is vastgelegd in de prestatiegegevens van de on-time rapportage, kunt u de veldbeschrijvingen bekijken op de website van Bureau of Transportation Statistics.

Gegevens opnemen

In deze sectie uploadt u de .csv-vluchtgegevens naar uw Azure Data Lake Storage Gen2-account en koppelt u het opslagaccount vervolgens aan uw Databricks-cluster. Ten slotte gebruikt u Databricks om de .csv-vluchtgegevens te lezen en terug te schrijven naar de opslag in Apache Parquet-indeling.

De vluchtgegevens uploaden naar uw opslagaccount

Gebruik AzCopy om uw CSV-bestand te kopiëren naar uw Azure Data Lake Storage Gen2-account. U gebruikt de azcopy make opdracht om een container in uw opslagaccount te maken. Vervolgens gebruikt u de azcopy copy opdracht om de CSV-gegevens te kopiëren die u zojuist hebt gedownload naar een map in die container.

In de volgende stappen moet u namen invoeren voor de container die u wilt maken en de map en blob waarnaar u de vluchtgegevens in de container wilt uploaden. U kunt de voorgestelde namen in elke stap gebruiken of uw eigen naamconventies opgeven voor containers, mappen en blobs.

  1. Open een opdrachtpromptvenster en voer de volgende opdracht in om u aan te melden bij Azure Active Directory voor toegang tot uw opslagaccount.

    azcopy login
    

    Volg de instructies die worden weergegeven in het opdrachtpromptvenster om uw gebruikersaccount te verifiëren.

  2. Als u een container in uw opslagaccount wilt maken om de vluchtgegevens op te slaan, voert u de volgende opdracht in:

    azcopy make  "https://<storage-account-name>.dfs.core.windows.net/<container-name>" 
    
    • Vervang de waarde van de tijdelijke plaatsaanduiding <storage-account-name> door de naam van uw opslagaccount.

    • Vervang de <container-name> tijdelijke aanduiding door een naam voor de container die u wilt maken om de CSV-gegevens op te slaan, bijvoorbeeld flight-data-container.

  3. Als u de CSV-gegevens wilt uploaden (kopiëren) naar uw opslagaccount, voert u de volgende opdracht in.

    azcopy copy "<csv-folder-path>" https://<storage-account-name>.dfs.core.windows.net/<container-name>/<directory-name>/On_Time.csv
    
    • Vervang de waarde van de tijdelijke plaatsaanduiding <csv-folder-path> door het pad naar het csv-bestand.

    • Vervang de waarde van de tijdelijke plaatsaanduiding <storage-account-name> door de naam van uw opslagaccount.

    • Vervang de <container-name> tijdelijke aanduiding door de naam van de container in uw opslagaccount.

    • Vervang de <directory-name> tijdelijke aanduiding door de naam van een map om uw gegevens in de container op te slaan, bijvoorbeeld jan2016.

Uw opslagaccount koppelen aan uw Databricks-cluster

In deze sectie koppelt u uw Azure Data Lake Storage Gen2-cloudobjectopslag aan het Databricks File System (DBFS). U gebruikt het Azure AD-serviceprincipe dat u eerder hebt gemaakt voor verificatie met het opslagaccount. Zie Koppelen van cloudobjectopslag in Azure Databricks voor meer informatie.

  1. Koppel uw notebook aan uw cluster.

    1. Selecteer in het notitieblok dat u eerder hebt gemaakt de knop Verbinding maken in de rechterbovenhoek van de werkbalk van het notitieblok. Met deze knop wordt de rekenkiezer geopend. (Als u uw notebook al hebt verbonden met een cluster, wordt de naam van dat cluster weergegeven in de knoptekst in plaats van in de knoptekstVerbinding maken).

    2. Selecteer in het vervolgkeuzemenu van het cluster het cluster dat u eerder hebt gemaakt.

    3. U ziet dat de tekst in de clusterkiezer wordt gestart. Wacht totdat het cluster is gestart en totdat de naam van het cluster in de knop wordt weergegeven voordat u doorgaat.

  2. Kopieer en plak het volgende codeblok in de eerste cel, maar voer deze code nog niet uit.

    configs = {"fs.azure.account.auth.type": "OAuth",
           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
           "fs.azure.account.oauth2.client.id": "<appId>",
           "fs.azure.account.oauth2.client.secret": "<clientSecret>",
           "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<tenantId>/oauth2/token",
           "fs.azure.createRemoteFileSystemDuringInitialization": "true"}
    
    dbutils.fs.mount(
    source = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<directory-name>",
    mount_point = "/mnt/flightdata",
    extra_configs = configs)
    
  3. In dit codeblok:

    • Vervang configsin , vervang de <appId>waarden <clientSecret>en <tenantId> tijdelijke aanduidingen door de toepassings-id, het clientgeheim en de tenant-id die u hebt gekopieerd toen u de service-principal in de vereisten maakte.

    • Vervang in de source URI de <storage-account-name>waarden en <container-name><directory-name> tijdelijke aanduidingen door de naam van uw Azure Data Lake Storage Gen2-opslagaccount en de naam van de container en map die u hebt opgegeven bij het uploaden van de vluchtgegevens naar het opslagaccount.

      Notitie

      De schema-id in de URI, abfssvertelt Databricks het Azure Blob-bestandssysteemstuurprogramma te gebruiken met Tls (Transport Layer Security). Zie De URI van Azure Data Lake Storage Gen2 gebruiken voor meer informatie over de URI.

  4. Zorg ervoor dat het cluster is gestart voordat u doorgaat.

  5. Druk op de toetsen Shift + Enter om de code in dit blok uit te voeren.

De container en map waar u de vluchtgegevens in uw opslagaccount hebt geüpload, zijn nu toegankelijk in uw notebook via het koppelpunt /mnt/flightdata.

Databricks Notebook gebruiken om CSV te converteren naar Parquet

Nu de csv-vluchtgegevens toegankelijk zijn via een DBFS-koppelpunt, kunt u een Apache Spark DataFrame gebruiken om deze in uw werkruimte te laden en terugschrijven in Apache Parquet-indeling naar uw Azure Data Lake Storage Gen2-objectopslag.

  • Een Spark DataFrame is een tweedimensionale gelabelde gegevensstructuur met kolommen met mogelijk verschillende typen. U kunt een DataFrame gebruiken om eenvoudig gegevens te lezen en te schrijven in verschillende ondersteunde indelingen. Met een DataFrame kunt u gegevens laden uit de opslag van cloudobjecten en analyses en transformaties uitvoeren in uw rekencluster zonder dat dit van invloed is op de onderliggende gegevens in de opslag van cloudobjecten. Zie Werken met PySpark DataFrames in Azure Databricks voor meer informatie.

  • Apache Parquet is een kolombestandsindeling met optimalisaties waarmee query's sneller worden uitgevoerd. Het is een efficiëntere bestandsindeling dan CSV of JSON. Zie Parquet-bestanden voor meer informatie.

Voeg in het notitieblok een nieuwe cel toe en plak de volgende code erin.

# Use the previously established DBFS mount point to read the data.
# Create a DataFrame to read the csv data.
# The header option specifies that the first row of data should be used as the DataFrame column names
# The inferschema option specifies that the column data types should be inferred from the data in the file
flight_df = spark.read.format('csv').options(
    header='true', inferschema='true').load("/mnt/flightdata/*.csv")

# Read the airline csv file and write the output to parquet format for easy query.
flight_df.write.mode("append").parquet("/mnt/flightdata/parquet/flights")
print("Done")

Druk op de toetsen Shift + Enter om de code in dit blok uit te voeren.

Voordat u doorgaat naar de volgende sectie, moet u ervoor zorgen dat alle parquet-gegevens zijn geschreven en 'Gereed' wordt weergegeven in de uitvoer.

Gegevens verkennen

In deze sectie gebruikt u het hulpprogramma van het Databricks-bestandssysteem om uw Azure Data Lake Storage Gen2-objectopslag te verkennen met behulp van het DBFS-koppelpunt dat u in de vorige sectie hebt gemaakt.

Plak in een nieuwe cel de volgende code om een lijst met de bestanden op het koppelpunt op te halen. Met de eerste opdracht wordt een lijst met bestanden en mappen uitgevoerd. Met de tweede opdracht wordt de uitvoer in tabelvorm weergegeven, zodat u ze gemakkelijker kunt lezen.

dbutils.fs.ls("/mnt/flightdata")
display(dbutils.fs.ls("/mnt/flightdata"))

Druk op de toetsen Shift + Enter om de code in dit blok uit te voeren.

U ziet dat de parquet-map wordt weergegeven in de lijst. U hebt de .csv-vluchtgegevens in parquet-indeling opgeslagen in de map parquet/flights in de vorige sectie. Als u bestanden in de map parquet/flights wilt weergeven, plakt u de volgende code in een nieuwe cel en voert u deze uit:

display(dbutils.fs.ls("/mnt/flightdata/parquet/flights"))

Als u een nieuw bestand wilt maken en een lijst wilt weergeven, plakt u de volgende code in een nieuwe cel en voert u het uit:

dbutils.fs.put("/mnt/flightdata/mydirectory/mysubdirectory/1.txt", "Hello, World!", True)
display(dbutils.fs.ls("/mnt/flightdata/mydirectory/mysubdirectory"))

Omdat u het 1.txt-bestand in deze zelfstudie niet nodig hebt, kunt u de volgende code in een cel plakken en uitvoeren om mijn map recursief te verwijderen. De True parameter geeft een recursieve verwijdering aan.

dbutils.fs.rm("/mnt/flightdata/mydirectory", True)

U kunt de Help-opdracht gebruiken om meer informatie te krijgen over andere opdrachten.

dbutils.fs.help("rm")

Met deze codevoorbeelden hebt u de hiërarchische aard van HDFS verkend met behulp van gegevens die zijn opgeslagen in een opslagaccount waarvoor Azure Data Lake Storage Gen2 is ingeschakeld.

Query’s uitvoeren voor de gegevens

Hierna kunt u beginnen met het doorzoeken van de gegevens die u hebt geüpload in het opslagaccount. Voer elk van de volgende codeblokken in een nieuwe cel in en druk op Shift +Enter om het Python-script uit te voeren.

DataFrames bieden een uitgebreide set functies (select columns, filter, join, aggregate) waarmee u veelvoorkomende problemen met gegevensanalyse efficiënt kunt oplossen.

Als u een DataFrame wilt laden vanuit uw eerder opgeslagen parquet-vluchtgegevens en een deel van de ondersteunde functionaliteit wilt verkennen, voert u dit script in een nieuwe cel in en voert u het uit.

# Read the existing parquet file for the flights database that was created earlier
flight_df = spark.read.parquet("/mnt/flightdata/parquet/flights")

# Print the schema of the dataframe
flight_df.printSchema()

# Print the flight database size
print("Number of flights in the database: ", flight_df.count())

# Show the first 25 rows (20 is the default)
# To show the first n rows, run: df.show(n)
# The second parameter indicates that column lengths shouldn't be truncated (default is 20 characters)
flight_df.show(25, False)

# You can also use the DataFrame to run simple queries. Results are returned in a DataFrame.
# Show the first 25 rows of the results of a query that returns selected colums for all flights originating from airports in Texas
flight_df.select("FlightDate", "Reporting_Airline", "Flight_Number_Reporting_Airline", "OriginCityName", "DepTime", "DestCityName", "ArrTime", "ArrDelay").filter("OriginState = 'TX'").show(258, False)

# Use display to run visualizations
# Preferably run this in a separate cmd cell
display(flight_df)

Voer dit script in een nieuwe cel in om enkele eenvoudige analysequery's uit te voeren op de gegevens. U kunt ervoor kiezen om het hele script (SHIFT + ENTER) uit te voeren, elke query te markeren en afzonderlijk uit te voeren met CTRL + SHIFT + ENTER, of elke query in een afzonderlijke cel in te voeren en daar uit te voeren.

# create a temporary sql view for querying flight information
flight_data = spark.read.parquet('/mnt/flightdata/parquet/flights')
flight_data.createOrReplaceTempView('FlightTable')

# Print the total number of flights in Jan 2016 (the number of rows in the flight data).
print("Number of flights in Jan 2016: ", flight_data.count())

# Using spark sql, query the parquet file to return the total flights of each airline
num_flights_by_airline=spark.sql("SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline ORDER BY NumFlights DESC")
num_flights_by_airline.show()

# List out all the airports in Texas
airports_in_texas = spark.sql(
    "SELECT DISTINCT(OriginCityName) FROM FlightTable WHERE OriginStateName = 'Texas'")
print('Airports in Texas: ', airports_in_texas.count())
airports_in_texas.show(100, False)

# Find all airlines that fly from Texas
airlines_flying_from_texas = spark.sql(
    "SELECT DISTINCT(Reporting_Airline) FROM FlightTable WHERE OriginStateName='Texas'")
print('Airlines that fly to/from Texas: ', airlines_flying_from_texas.count())
airlines_flying_from_texas.show(100, False)

# List airlines by average arrival delay (negative values indicate early flights)
avg_arrival_delay=spark.sql(
    "SELECT Reporting_Airline, count(*) AS NumFlights, avg(DepDelay) AS AverageDepDelay, avg(ArrDelay) AS AverageArrDelay FROM FlightTable GROUP BY Reporting_Airline ORDER BY AverageArrDelay DESC")
print("Airlines by average arrival delay")
avg_arrival_delay.show()

# List airlines by the highest percentage of delayed flights. A delayed flight is one with a  departure or arrival delay that is greater than 15 minutes
spark.sql("DROP VIEW IF EXISTS totalFlights")
spark.sql("DROP VIEW IF EXISTS delayedFlights")
spark.sql(
    "CREATE TEMPORARY VIEW totalFlights AS SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline")
spark.sql(
    "CREATE TEMPORARY VIEW delayedFlights AS SELECT Reporting_Airline, count(*) AS NumDelayedFlights FROM FlightTable WHERE DepDelay>15 or ArrDelay>15 GROUP BY Reporting_Airline")
percent_delayed_flights=spark.sql(
    "SELECT totalFlights.Reporting_Airline, totalFlights.NumFlights, delayedFlights.NumDelayedFlights, delayedFlights.NumDelayedFlights/totalFlights.NumFlights*100 AS PercentFlightsDelayed FROM totalFlights INNER JOIN delayedFlights ON totalFlights.Reporting_Airline = delayedFlights.Reporting_Airline ORDER BY PercentFlightsDelayed DESC")
print("Airlines by percentage of flights delayed")
percent_delayed_flights.show()

Samenvatting

In deze zelfstudie hebt u:

  • U hebt Azure-resources gemaakt, waaronder een Azure Data Lake Storage Gen2-opslagaccount en een Azure AD-service-principal, en toegewezen machtigingen voor toegang tot het opslagaccount.

  • Er is een Azure Databricks-werkruimte, notebook en rekencluster gemaakt.

  • AzCopy gebruikt om ongestructureerde .csv-vluchtgegevens te uploaden naar het Azure Data Lake Storage Gen2-opslagaccount.

  • De hulpprogrammafuncties van Databricks File System worden gebruikt om uw Azure Data Lake Storage Gen2-opslagaccount te koppelen en het hiërarchische bestandssysteem te verkennen.

  • Apache Spark DataFrames gebruikt om uw .csv-vluchtgegevens te transformeren naar de Apache Parquet-indeling en deze weer op te slaan in uw Azure Data Lake Storage Gen2-opslagaccount.

  • DataFrames gebruikt om de vluchtgegevens te verkennen en een eenvoudige query uit te voeren.

  • Apache Spark SQL gebruikt om de vluchtgegevens op te vragen voor het totale aantal vluchten voor elke luchtvaartmaatschappij in januari 2016, de luchthavens in Texas, de luchtvaartmaatschappijen die vanuit Texas vliegen, de gemiddelde aankomstvertraging in minuten voor elke luchtvaartmaatschappij op nationaal niveau en het percentage van de vluchten van elke luchtvaartmaatschappij die vertraagde vertrek- of aankomsten hebben.

Resources opschonen

Als u het notebook wilt behouden en later wilt terugkeren, is het een goed idee om uw cluster af te sluiten (beëindigen) om kosten te voorkomen. Als u het cluster wilt beëindigen, selecteert u het in de rekenkiezer rechtsboven op de werkbalk van het notitieblok, selecteert u Beëindigen in het menu en bevestigt u uw selectie. (Standaard wordt het cluster na 120 minuten inactiviteit automatisch beëindigd.)

Als u afzonderlijke werkruimtebronnen, zoals notebooks en clusters, wilt verwijderen, kunt u dit doen vanuit de linkerzijbalk van de werkruimte. Zie Een cluster verwijderen of Een notebook verwijderen voor gedetailleerde instructies.

Verwijder de resourcegroep en alle gerelateerde resources, wanneer u deze niet meer nodig hebt. Hiervoor selecteert u in Azure Portal de resourcegroep voor het opslagaccount en de werkruimte en selecteert u Verwijderen.

Volgende stappen