教學課程:Azure Data Lake Storage Gen2、Azure Databricks 和 Spark

本教學課程說明如何將 Azure Databricks 叢集連線至已啟用 Azure Data Lake Storage Gen2 功能之 Azure 儲存體帳戶中所儲存的資料。 此連線可讓您以原生方式從叢集對資料執行查詢和分析。

在此教學課程中,您需要:

  • 將非結構化的資料內嵌到儲存體帳戶
  • 在 Blob 儲存體中對資料執行分析

如果您沒有 Azure 訂用帳戶,請在開始前建立免費帳戶

必要條件

建立 Azure Databricks 工作區、叢集和筆記本

  1. 建立 Azure Databricks 工作區。 請參閱建立 Azure Databricks 工作區

  2. 建立叢集。 請參閱建立叢集

  3. 建立筆記本。 請參閱建立筆記本。 選擇 Python 做為筆記本的預設語言。

讓您的筆記本保持開啟狀態。 您會在下列幾節中使用此筆記本。

下載航班資料

本教學課程會使用來自運輸統計局的 2016 年 1 月準點率航班資料,示範如何執行 ETL 作業。 您必須下載這項資料,才能完成本教學課程。

  1. 下載 On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2016_1.zip 檔案。 此檔案包含發行小眾測試版資料。

  2. 將 ZIP 檔案的內容解壓縮,並記下檔案名稱和檔案路徑。 稍後的步驟將會需要這項資訊。

如果您想要了解在準點報告效能資料中擷取的資訊,您可以在運輸統計局網站上看到欄位描述

內嵌資料

在本節中,您會將 .csv 航班資料上傳至 Azure Data Lake Storage Gen2 帳戶,然後將儲存體帳戶掛接至 Databricks 叢集。 最後,您會使用 Databricks 讀取 .csv 航班資料,並以 Apache Parquet 格式將其寫回儲存體。

將航班資料上傳至您的儲存體帳戶

使用 AzCopy 將您的 .csv 檔案複製到 Azure Data Lake Storage Gen2 帳戶。 您會使用 azcopy make 命令,在儲存體帳戶中建立 Blob 容器。 然後,使用 azcopy copy 命令來複製您剛下載至該容器中目錄的 csv 資料。

在下列步驟中,您必須輸入您要建立的容器名稱,以及容器中您要將航班資料上傳至其中的目錄和 Blob。 您可以在每個步驟中使用建議的名稱,或遵守容器、目錄和 Blob 的命名慣例來指定自己的名稱。

  1. 開啟命令提示字元視窗,並輸入下列命令以登入 Azure Active Directory,來存取您的儲存體帳戶。

    azcopy login
    

    請依照命令提示字元視窗中顯示的指示,來驗證您的使用者帳戶。

  2. 若要在您的儲存體帳戶中建立容器來儲存航班資料,請輸入下列命令:

    azcopy make  "https://<storage-account-name>.dfs.core.windows.net/<container-name>" 
    
    • 使用您的儲存體帳戶名稱取代 <storage-account-name> 預留位置值。

    • <container-name> 預留位置取代為您想要建立以儲存 csv 資料的容器名稱,例如,flight-data-container

  3. 若要將 csv 資料上傳 (複製) 至您的儲存體帳戶,請輸入下列命令。

    azcopy copy "<csv-folder-path>" https://<storage-account-name>.dfs.core.windows.net/<container-name>/<directory-name>/On_Time.csv
    
    • <csv-folder-path> 預留位置值更換為 .csv 檔案的名稱。

    • 使用您的儲存體帳戶名稱取代 <storage-account-name> 預留位置值。

    • <container-name> 預留位置取代為您儲存體帳戶中的容器名稱。

    • <directory-name> 預留位置取代為容器中要儲存您資料的目錄名稱,例如,jan2016

將您的儲存體帳戶掛接至 Databricks 叢集

在本節中,您會將 Azure Data Lake Storage Gen2 雲端物件儲存體掛接至 Databricks 檔案系統 (DBFS)。 您會使用先前建立的 Azure AD 服務原則,搭配儲存體帳戶進行驗證。 如需詳細資訊,請參閱在 Azure Databricks 上掛接雲端物件儲存體

  1. 將您的筆記本連結至叢集。

    1. 在您先前建立的筆記本中,選取筆記本工具列右上角的 [連線] 按鈕。 此按鈕會開啟計算選取器。 (如果您已將筆記本連線至叢集,該叢集的名稱會顯示在按鈕文字中,代替 [連線])。

    2. 在叢集下拉式清單中,選取您先前建立的叢集。

    3. 請注意,叢集選取器中的文字會變更為「啟動中」。 等候叢集完成啟動,且叢集的名稱出現在按鈕中,然後再繼續。

  2. 將下列程式碼區塊複製並貼到第一個資料格中,但先不要執行此程式碼。

    configs = {"fs.azure.account.auth.type": "OAuth",
           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
           "fs.azure.account.oauth2.client.id": "<appId>",
           "fs.azure.account.oauth2.client.secret": "<clientSecret>",
           "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<tenantId>/oauth2/token",
           "fs.azure.createRemoteFileSystemDuringInitialization": "true"}
    
    dbutils.fs.mount(
    source = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<directory-name>",
    mount_point = "/mnt/flightdata",
    extra_configs = configs)
    
  3. 在下列程式碼區塊中:

    • configs 中,將 <appId><clientSecret><tenantId> 預留位置值取代為您在必要條件中建立服務主體時所複製的應用程式識別碼、用戶端密碼和租用戶識別碼。

    • source URI 中,將 <storage-account-name><container-name><directory-name> 預留位置值取代為 Azure Data Lake Storage Gen2 儲存體帳戶的名稱,以及您在將航班資料上傳至儲存體帳戶時所指定的容器和目錄名稱。

      注意

      URI abfss 中的配置識別碼會告知 Databricks 搭配傳輸層安全性 (TLS) 使用 Azure Blob File System 驅動程式。 若要深入了解 URI,請參閱使用 Azure Data Lake Storage Gen2 URI

  4. 請確定您的叢集已完成啟動,再繼續進行。

  5. SHIFT + ENTER 鍵以執行此區塊中的程式碼。

儲存體帳戶中您將航班資料上傳至其中的容器和目錄,現在可在筆記本中透過掛接點 /mnt/flightdata 存取。

使用 Databricks Notebook 將 CSV 轉換成 Parquet

既然 csv 航班資料可透過 DBFS 掛接點存取,您就可以使用 Apache Spark DataFrame 將其載入工作區,並以 Apache Parquet 格式將其寫回 Azure Data Lake Storage Gen2 物件儲存體。

  • Spark DataFrame 是二維標籤資料結構,其中具有潛在不同類型的資料行。 您可以使用 DataFrame,以各種支援的格式輕鬆地讀取和寫入資料。 透過 DataFrame,您可以從雲端物件儲存體載入資料,並在計算叢集內對此資料執行分析和轉換,而不會影響雲端物件儲存體中的基礎資料。 若要深入了解,請參閱在 Azure Databricks 上使用 PySpark DataFrames

  • Apache parquet 是一種單欄案檔格式,具有加速查詢的最佳化。 這是比 CSV 或 JSON 更有效率的檔案格式。 若要深入了解,請參閱 Parquet 檔案

在筆記本中,新增資料格並將下列程式碼貼入其中。

# Use the previously established DBFS mount point to read the data.
# Create a DataFrame to read the csv data.
# The header option specifies that the first row of data should be used as the DataFrame column names
# The inferschema option specifies that the column data types should be inferred from the data in the file
flight_df = spark.read.format('csv').options(
    header='true', inferschema='true').load("/mnt/flightdata/*.csv")

# Read the airline csv file and write the output to parquet format for easy query.
flight_df.write.mode("append").parquet("/mnt/flightdata/parquet/flights")
print("Done")

SHIFT + ENTER 鍵以執行此區塊中的程式碼。

繼續進行下一節之前,請確定所有 Parquet 資料都已寫入,且「完成」出現在輸出中。

探索資料

在本節中,您會使用 Databricks 檔案系統公用程式,以使用您在上一節中建立的 DBFS 掛接點來探索 Azure Data Lake Storage Gen2 物件儲存體。

在新的資料格中,貼上下列程式碼以取得掛接點的檔案清單。 第一個命令會輸出檔案和目錄的清單。 第二個命令會以表格式格式顯示輸出,以方便閱讀。

dbutils.fs.ls("/mnt/flightdata")
display(dbutils.fs.ls("/mnt/flightdata"))

SHIFT + ENTER 鍵以執行此區塊中的程式碼。

請注意,parquet 目錄會出現在清單中。 您在前幾節已將 .csv 航班資料以 Parquet 格式儲存至 parquet/flight 目錄。 若要列出 parquet/flights 目錄中的檔案,請將下列程式碼貼入新的資料格中並加以執行:

display(dbutils.fs.ls("/mnt/flightdata/parquet/flights"))

若要建立新的檔案並將其列出,請將下列程式碼貼入新的資料格中並加以執行:

dbutils.fs.put("/mnt/flightdata/mydirectory/mysubdirectory/1.txt", "Hello, World!", True)
display(dbutils.fs.ls("/mnt/flightdata/mydirectory/mysubdirectory"))

由於您不需要本教學課程中的 1.txt 檔案,因此您可以將下列程式碼貼入資料格中,然後執行此程式碼,以遞迴方式刪除 mydirectoryTrue 參數表示遞迴刪除。

dbutils.fs.rm("/mnt/flightdata/mydirectory", True)

為了方便起見,您可以使用 help 命令來深入了解其他命令。

dbutils.fs.help("rm")

藉由這些程式碼範例,您已探索 HDFS 的階層式本質,方法是使用已啟用 Azure Data Lake Storage Gen2 的儲存體帳戶中所儲存的資料。

查詢資料

接下來,您可以開始查詢您上傳到儲存體帳戶的資料。 將下列每個程式碼區塊輸入至資料格,然後按 SHIFT + ENTER 來執行 Python 指令碼。

DataFrame 提供一組豐富的函式 (選取資料行、篩選、聯結、彙總),可讓您有效率地解決常見的資料分析問題。

若要從先前儲存的 Parquet 航班資料載入 DataFrame,並探索一些支援的功能,請將此指令碼輸入至新的資料格並加以執行。

# Read the existing parquet file for the flights database that was created earlier
flight_df = spark.read.parquet("/mnt/flightdata/parquet/flights")

# Print the schema of the dataframe
flight_df.printSchema()

# Print the flight database size
print("Number of flights in the database: ", flight_df.count())

# Show the first 25 rows (20 is the default)
# To show the first n rows, run: df.show(n)
# The second parameter indicates that column lengths shouldn't be truncated (default is 20 characters)
flight_df.show(25, False)

# You can also use the DataFrame to run simple queries. Results are returned in a DataFrame.
# Show the first 25 rows of the results of a query that returns selected colums for all flights originating from airports in Texas
flight_df.select("FlightDate", "Reporting_Airline", "Flight_Number_Reporting_Airline", "OriginCityName", "DepTime", "DestCityName", "ArrTime", "ArrDelay").filter("OriginState = 'TX'").show(258, False)

# Use display to run visualizations
# Preferably run this in a separate cmd cell
display(flight_df)

在新的資料格中輸入此指令碼,針對資料執行一些基本分析查詢。 您可以選擇執行整個指令碼 (SHIFT + ENTER)、醒目提示每個查詢並使用 CTRL + SHIFT + ENTER 個別執行該查詢,或將每個查詢輸入至個別的資料格並在該處執行該查詢。

# create a temporary sql view for querying flight information
flight_data = spark.read.parquet('/mnt/flightdata/parquet/flights')
flight_data.createOrReplaceTempView('FlightTable')

# Print the total number of flights in Jan 2016 (the number of rows in the flight data).
print("Number of flights in Jan 2016: ", flight_data.count())

# Using spark sql, query the parquet file to return the total flights of each airline
num_flights_by_airline=spark.sql("SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline ORDER BY NumFlights DESC")
num_flights_by_airline.show()

# List out all the airports in Texas
airports_in_texas = spark.sql(
    "SELECT DISTINCT(OriginCityName) FROM FlightTable WHERE OriginStateName = 'Texas'")
print('Airports in Texas: ', airports_in_texas.count())
airports_in_texas.show(100, False)

# Find all airlines that fly from Texas
airlines_flying_from_texas = spark.sql(
    "SELECT DISTINCT(Reporting_Airline) FROM FlightTable WHERE OriginStateName='Texas'")
print('Airlines that fly to/from Texas: ', airlines_flying_from_texas.count())
airlines_flying_from_texas.show(100, False)

# List airlines by average arrival delay (negative values indicate early flights)
avg_arrival_delay=spark.sql(
    "SELECT Reporting_Airline, count(*) AS NumFlights, avg(DepDelay) AS AverageDepDelay, avg(ArrDelay) AS AverageArrDelay FROM FlightTable GROUP BY Reporting_Airline ORDER BY AverageArrDelay DESC")
print("Airlines by average arrival delay")
avg_arrival_delay.show()

# List airlines by the highest percentage of delayed flights. A delayed flight is one with a  departure or arrival delay that is greater than 15 minutes
spark.sql("DROP VIEW IF EXISTS totalFlights")
spark.sql("DROP VIEW IF EXISTS delayedFlights")
spark.sql(
    "CREATE TEMPORARY VIEW totalFlights AS SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline")
spark.sql(
    "CREATE TEMPORARY VIEW delayedFlights AS SELECT Reporting_Airline, count(*) AS NumDelayedFlights FROM FlightTable WHERE DepDelay>15 or ArrDelay>15 GROUP BY Reporting_Airline")
percent_delayed_flights=spark.sql(
    "SELECT totalFlights.Reporting_Airline, totalFlights.NumFlights, delayedFlights.NumDelayedFlights, delayedFlights.NumDelayedFlights/totalFlights.NumFlights*100 AS PercentFlightsDelayed FROM totalFlights INNER JOIN delayedFlights ON totalFlights.Reporting_Airline = delayedFlights.Reporting_Airline ORDER BY PercentFlightsDelayed DESC")
print("Airlines by percentage of flights delayed")
percent_delayed_flights.show()

摘要

在本教學課程中,您已:

  • 已建立 Azure 資源,包括 Azure Data Lake Storage Gen2 儲存體帳戶和 Azure AD 服務主體,以及已指派權限來存取儲存體帳戶。

  • 已建立 Azure Databricks 工作區、筆記本和計算叢集。

  • 已使用 AzCopy 將非結構化 .csv 航班資料上傳至 Azure Data Lake Storage Gen2 儲存體帳戶。

  • 已使用 Databricks 檔案系統公用程式功能,來掛接您的 Azure Data Lake Storage Gen2 儲存體帳戶,並探索其階層式檔案系統。

  • 已使用 Apache Spark DataFrame 將 .csv 航班資料轉換為 Apache Parquet 格式,並將其儲存回 Azure Data Lake Storage Gen2 儲存體帳戶。

  • 已使用 DataFrame 來探索航班資料並執行簡易查詢。

  • 已使用 Apache Spark SQL 來查詢航班資料,以取得 2016 年 1 月每家航空公司的航班總數、德州的機場、從德州起飛的航線、全國各航空公司的平均抵達延誤時間 (以分鐘為單位),以及每家航空公司有多少百分比航班延誤起飛或抵達。

清除資源

如果您想要保留筆記本,稍後再回到其中,最好關閉 (終止) 您的叢集以避免產生費用。 若要終止您的叢集,請在筆記本工具列右上方的計算選取器中選取您的叢集、從功能表中選取 [終止],然後確認您的選取項目。 (根據預設,叢集會在無活動 120 分鐘後自動終止。)

如果想要刪除筆記本和叢集等個別工作區資源,您可以從工作區的資訊看板執行此動作。 如需詳細指示,請參閱刪除叢集刪除筆記本

當已不再需要資源時,請刪除資源群組及所有相關資源。 若要在 Azure 入口網站這麼做,請選取儲存體帳戶和工作區的資源群組,然後選取 [刪除]

下一步