使用 Azure Data Factory 中的 Spark 活動來轉換雲端中的資料

適用於:Azure Data Factory Azure Synapse Analytics

提示

試用 Microsoft Fabric 中的 Data Factory,這是適用於企業的全方位分析解決方案。 Microsoft Fabric 涵蓋從資料移動到資料科學、即時分析、商業智慧和報告的所有項目。 了解如何免費開始新的試用

在本教學課程中,您會使用 Azure 入口網站來建立 Azure Data Factory 管線。 此管線使用 Spark 活動和隨選 Azure HDInsight 連結服務來轉換資料。

您會在本教學課程中執行下列步驟:

  • 建立資料處理站。
  • 建立使用 Spark 活動的管線。
  • 觸發管線執行。
  • 監視管道執行。

如果您沒有 Azure 訂用帳戶,請在開始前建立免費帳戶

必要條件

注意

建議您使用 Azure Az PowerShell 模組來與 Azure 互動。 請參閱安裝 Azure PowerShell 以開始使用。 若要了解如何移轉至 Az PowerShell 模組,請參閱將 Azure PowerShell 從 AzureRM 移轉至 Az

  • Azure 儲存體帳戶。 您需要建立 Python 指令碼和輸入檔案,並上傳至 Azure 儲存體。 Spark 程式的輸出會儲存在這個儲存體帳戶中。 隨選 Spark 叢集與其主要儲存體是使用相同的儲存體帳戶。

注意

HdInsight 僅支援標準層的一般用途儲存體帳戶。 請確定帳戶不是進階或僅限 Blob 儲存體帳戶。

將 Python 指令碼上傳至 Blob 儲存體帳戶

  1. 使用下列內容建立名為 WordCount_Spark.py 的 Python 檔案:

    import sys
    from operator import add
    
    from pyspark.sql import SparkSession
    
    def main():
        spark = SparkSession\
            .builder\
            .appName("PythonWordCount")\
            .getOrCreate()
    
        lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0])
        counts = lines.flatMap(lambda x: x.split(' ')) \
            .map(lambda x: (x, 1)) \
            .reduceByKey(add)
        counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount")
    
        spark.stop()
    
    if __name__ == "__main__":
        main()
    
  2. 以您的 Azure 儲存體帳戶名稱取代 <storageAccountName>。 然後儲存檔案。

  3. 在 Azure Blob 儲存體中,建立名為 adftutorial 的容器 (如果不存在)。

  4. 建立名為 spark 的資料夾。

  5. spark 資料夾下,建立名為 script 的子資料夾。

  6. WordCount_Spark.py 檔案上傳至 script 子資料夾。

上傳輸入檔案

  1. 建立名為 minecraftstory.txt 的檔案並填入一些文字。 Spark 程式會計算這段文字中的字數。
  2. spark 資料夾下,建立名為 inputfiles 的子資料夾。
  3. minecraftstory.txt 檔案上傳至 inputfiles 子資料夾。

建立資料處理站

要是您還沒有使用資料處理站,請遵循快速入門:使用 Azure 入口網站建立資料處理站一文中的步驟,建立資料處理站。

建立連結服務

在本節中,您會撰寫兩個連結服務:

  • 將 Azure 儲存體帳戶連結至資料處理站的 Azure 儲存體連結服務。 隨選 HDInsight 叢集會使用此儲存體。 而且也包含要執行的 Spark 指令碼。
  • 隨選 HDInsight 連結服務。 Azure Data Factory 會自動建立 HDInsight 叢集並執行 Spark 程式。 然後在 HDInsight 叢集的閒置時間達到預先設定的時間後,系統就會刪除該叢集。

建立 Azure 儲存體連結服務

  1. 在首頁,切換至左側面板中的 [管理] 索引標籤。

    Screenshot that shows the Manage tab.

  2. 選取視窗底部的 [連線],然後選取 [+ 新增]

    Buttons for creating a new connection

  3. 在 [新增連結服務] 視窗中,選取 [資料存放區]>[Azure Blob 儲存體],然後選取 [繼續]

    Selecting the "Azure Blob Storage" tile

  4. 針對 [儲存體帳戶名稱],從清單中選取名稱,然後選取 [儲存]

    Box for specifying the storage account name

建立隨選 HDInsight 連結服務

  1. 再次選取 [+新增] 按鈕以建立另一個連結服務。

  2. 在 [新增連結服務] 視窗中,選取 [計算]>[Azure HDInsight],然後選取 [繼續]

    Selecting the "Azure HDInsight" tile

  3. 在 [新增連結服務] 視窗中,完成下列步驟:

    a. 針對 [名稱],輸入 AzureHDInsightLinkedService

    b. 針對 [類型],確認已選取 [隨選 HDInsight]

    c. 針對 [Azure 儲存體連結服務],選取 [AzureBlobStorage1]。 您之前已建立此連結服務。 如果您使用不同的名稱,請在此指定正確的名稱。

    d. 針對 [叢集類型],選取 [spark]

    e. 針對 [服務主體識別碼],輸入有權建立 HDInsight 叢集的服務主體識別碼。

    此服務主體必須是訂用帳戶的參與者角色成員,或建立叢集所在的資源群組成員。 如需詳細資訊,請參閱建立 Microsoft Entra 應用程式和服務主體服務主體識別碼相當於「應用程式識別碼」,而服務主體金鑰則相當於「用戶端密碼」的值。

    f. 針對 [服務主體金鑰],輸入金鑰。

    .g 針對 [資源群組],選取建立資料處理站時使用的相同資源群組。 將在此資源群組中建立 Spark 叢集。

    h. 展開 [OS 類型]

    i. 針對叢集使用者名稱輸入名稱。

    j. 輸入使用者的叢集密碼

    k. 選取 [完成]

    HDInsight linked service settings

注意

Azure HDInsight 會限制您在其支援的每個 Azure 區域中可使用的核心總數。 對於隨選 HDInsight 連結服務,用來建立 HDInsight 叢集的 Azure 儲存體位置與用來作為其主要儲存體的位置相同。 請確定您有足夠的核心配額,才能成功建立叢集。 如需詳細資訊,請參閱使用 Hadoop、Spark 及 Kafka 等在 HDInsight 中設定叢集

建立新管線

  1. 選取 + (加號) 按鈕,然後選取功能表上的 [管線]

    Buttons for creating a new pipeline

  2. 在 [活動] 工具箱中,展開 [HDInsight]。 將 [活動] 工具箱中的 [Spark] 活動拖到管線設計工具介面。

    Dragging the Spark activity

  3. 在 [Spark] 活動視窗底部的屬性中,完成下列步驟:

    a. 切換至 [HDI 叢集] 索引標籤。

    b. 選取您在上一個程序中建立的 AzureHDInsightLinkedService

    Specifying the HDInsight linked service

  4. 切換至 [指令碼/Jar] 索引標籤並完成下列步驟:

    a. 針對 [作業連結服務],選取 [AzureBlobStorage1]

    b. 選取 [瀏覽儲存體]

    Specifying the Spark script on the "Script/Jar" tab

    c. 瀏覽至 adftutorial/spark/script 資料夾,選取 WordCount_Spark.py,然後選取 [完成]

  5. 若要驗證管線,選取工具列上的 [驗證] 按鈕。 選取 >> (右箭頭) 按鈕以關閉驗證視窗。

    "Validate" button

  6. 選取 [全部發佈]。 Data Factory UI 會將實體 (連結服務和管線) 發佈至 Azure Data Factory 服務。

    "Publish All" button

觸發管線執行

選取工具列上的 [新增觸發程序],然後選取 [立即觸發]

"Trigger" and "Trigger Now" buttons

監視管道執行

  1. 切換至 [監視] 索引標籤。確認您看到管線執行。 建立 Spark 叢集需要約 20 分鐘。

  2. 定期選取 [重新整理] 以檢查管線執行的狀態。

    Tab for monitoring pipeline runs, with "Refresh" button

  3. 若要檢視與管線執行相關聯的活動執行,請選取 [動作] 資料行中的 [檢視活動執行]

    Pipeline run status

    您可以選取頂端的 [所有管線執行] 連結,切換回管線執行檢視。

    "Activity Runs" view

驗證輸出

確認已在 adftutorial 容器的 spark/otuputfiles/wordcount 資料夾中建立輸出檔案。

Location of the output file

這個檔案應該有輸入文字檔中的每個字組,以及字組在檔案中出現的次數。 例如:

(u'This', 1)
(u'a', 1)
(u'is', 1)
(u'test', 1)
(u'file', 1)

此範例中的管線會使用 Spark 活動和隨選 HDInsight 連結服務來轉換資料。 您已了解如何︰

  • 建立資料處理站。
  • 建立使用 Spark 活動的管線。
  • 觸發管線執行。
  • 監視管道執行。

若要了解如何在虛擬網路中的 Azure HDInsight 叢集上執行 Hive 指令碼來轉換資料,請進入下一個教學課程: