Azure Data Factory で Spark アクティビティを使用してクラウドのデータを変換するTransform data in the cloud by using Spark activity in Azure Data Factory

このチュートリアルでは、Azure PowerShell を使用して Data Factory パイプラインを作成します。このパイプラインで、Spark アクティビティとオンデマンドの HDInsight のリンクされたサービスを使用してデータを変換します。In this tutorial, you use Azure PowerShell to create a Data Factory pipeline that transforms data using Spark Activity and an on-demand HDInsight linked service. このチュートリアルでは、以下の手順を実行します。You perform the following steps in this tutorial:

  • データ ファクトリを作成します。Create a data factory.
  • リンクされたサービスを作成してデプロイします。Author and deploy linked services.
  • パイプラインを作成してデプロイします。Author and deploy a pipeline.
  • パイプラインの実行を開始します。Start a pipeline run.
  • パイプラインの実行を監視します。Monitor the pipeline run.

Azure サブスクリプションをお持ちでない場合は、開始する前に無料アカウントを作成してください。If you don't have an Azure subscription, create a free account before you begin.

前提条件Prerequisites

注意

この記事は、新しい Azure PowerShell Az モジュールを使用するために更新されました。This article has been updated to use the new Azure PowerShell Az module. AzureRM モジュールはまだ使用でき、少なくとも 2020 年 12 月までは引き続きバグ修正が行われます。You can still use the AzureRM module, which will continue to receive bug fixes until at least December 2020. Az モジュールと AzureRM の互換性の詳細については、「Introducing the new Azure PowerShell Az module (新しい Azure PowerShell Az モジュールの概要)」を参照してください。To learn more about the new Az module and AzureRM compatibility, see Introducing the new Azure PowerShell Az module. Az モジュールのインストール手順については、Azure PowerShell のインストールを参照してください。For Az module installation instructions, see Install Azure PowerShell.

  • Azure Storage アカウントAzure Storage account. Python スクリプトと入力ファイルを作成し、Azure ストレージにアップロードします。You create a python script and an input file, and upload them to the Azure storage. Spark プログラムからの出力は、このストレージ アカウントに格納されます。The output from the spark program is stored in this storage account. オンデマンドの Spark クラスターでは、同じストレージ アカウントがプライマリ ストレージとして使用されます。The on-demand Spark cluster uses the same storage account as its primary storage.
  • Azure PowerShellAzure PowerShell. Azure PowerShell のインストールと構成の方法に関するページに記載されている手順に従います。Follow the instructions in How to install and configure Azure PowerShell.

Python スクリプトを BLOB ストレージ アカウントにアップロードするUpload python script to your Blob Storage account

  1. 次の内容が含まれた、WordCount_Spark.py という名前の Python ファイルを作成します。Create a python file named WordCount_Spark.py with the following content:

    import sys
    from operator import add
    
    from pyspark.sql import SparkSession
    
    def main():
        spark = SparkSession\
            .builder\
            .appName("PythonWordCount")\
            .getOrCreate()
    
        lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0])
        counts = lines.flatMap(lambda x: x.split(' ')) \
            .map(lambda x: (x, 1)) \
            .reduceByKey(add)
        counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount")
    
        spark.stop()
    
    if __name__ == "__main__":
        main()
    
  2. <storageAccountName> を Azure ストレージ アカウントの名前に置き換えます。Replace <storageAccountName> with the name of your Azure Storage account. その後、ファイルを保存します。Then, save the file.

  3. Azure BLOB ストレージで、adftutorial という名前のコンテナーを作成します (存在しない場合)。In your Azure Blob Storage, create a container named adftutorial if it does not exist.

  4. spark という名前のフォルダーを作成します。Create a folder named spark.

  5. spark フォルダーの下に、script という名前のサブフォルダーを作成します。Create a subfolder named script under spark folder.

  6. WordCount_Spark.py ファイルを script サブフォルダーにアップロードします。Upload the WordCount_Spark.py file to the script subfolder.

入力ファイルをアップロードするUpload the input file

  1. minecraftstory.txt という名前のファイルを作成し、任意のテキストを入力しておきます。Create a file named minecraftstory.txt with some text. このテキストの単語数が Spark プログラムによってカウントされます。The spark program counts the number of words in this text.
  2. spark フォルダーに、inputfiles という名前のサブフォルダーを作成します。Create a subfolder named inputfiles in the spark folder.
  3. minecraftstory.txtinputfiles サブフォルダーにアップロードします。Upload the minecraftstory.txt to the inputfiles subfolder.

リンクされたサービスを作成するAuthor linked services

このセクションでは、次の 2 つのリンクされたサービスを作成します。You author two Linked Services in this section:

  • Azure ストレージ アカウントをデータ ファクトリにリンクする、Azure Storage のリンクされたサービス。An Azure Storage Linked Service that links an Azure Storage account to the data factory. このストレージは、オンデマンドの HDInsight クラスターによって使用されます。This storage is used by the on-demand HDInsight cluster. ここには、実行される Spark スクリプトも含まれています。It also contains the Spark script to be executed.
  • オンデマンドの HDInsight のリンクされたサービス。An On-Demand HDInsight Linked Service. Azure Data Factory によって自動的に HDInsight クラスターが作成され、Spark プログラムが実行されます。その後、アイドル状態が事前に構成した時間を超えたら、HDInsight クラスターは削除されます。Azure Data Factory automatically creates a HDInsight cluster, run the Spark program, and then deletes the HDInsight cluster after it's idle for a pre-configured time.

Azure Storage のリンクされたサービスAzure Storage linked service

任意のエディターを使用して JSON ファイルを作成し、Azure Storage のリンクされたサービスから次の JSON 定義をコピーして、ファイルを MyStorageLinkedService.json として保存します。Create a JSON file using your preferred editor, copy the following JSON definition of an Azure Storage linked service, and then save the file as MyStorageLinkedService.json.

{
    "name": "MyStorageLinkedService",
    "properties": {
      "type": "AzureStorage",
      "typeProperties": {
        "connectionString": {
          "value": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>",
          "type": "SecureString"
        }
      }
    }
}

<storageAccountName> と <storageAccountKey> を Azure ストレージ アカウントの名前とキーで更新してください。Update the <storageAccountName> and <storageAccountKey> with the name and key of your Azure Storage account.

オンデマンドの HDInsight のリンクされたサービスOn-demand HDInsight linked service

任意のエディターを使用して JSON ファイルを作成し、Azure HDInsight のリンクされたサービスから次の JSON 定義をコピーして、ファイルを MyOnDemandSparkLinkedService.json として保存します。Create a JSON file using your preferred editor, copy the following JSON definition of an Azure HDInsight linked service, and save the file as MyOnDemandSparkLinkedService.json.

{
    "name": "MyOnDemandSparkLinkedService",
    "properties": {
      "type": "HDInsightOnDemand",
      "typeProperties": {
        "clusterSize": 2,
        "clusterType": "spark",
        "timeToLive": "00:15:00",
        "hostSubscriptionId": "<subscriptionID> ",
        "servicePrincipalId": "<servicePrincipalID>",
        "servicePrincipalKey": {
          "value": "<servicePrincipalKey>",
          "type": "SecureString"
        },
        "tenant": "<tenant ID>",
        "clusterResourceGroup": "<resourceGroupofHDICluster>",
        "version": "3.6",
        "osType": "Linux",
        "clusterNamePrefix":"ADFSparkSample",
        "linkedServiceName": {
          "referenceName": "MyStorageLinkedService",
          "type": "LinkedServiceReference"
        }
      }
    }
}

リンクされたサービスの定義で、以下のプロパティの値を更新します。Update values for the following properties in the linked service definition:

  • hostSubscriptionIdhostSubscriptionId. <subscriptionID> は、実際の Azure サブスクリプションの ID に置き換えてください。Replace <subscriptionID> with the ID of your Azure subscription. オンデマンドの HDInsight クラスターは、このサブスクリプション内に作成されます。The on-demand HDInsight cluster is created in this subscription.
  • tenanttenant. <tenantID> は、実際の Azure テナントの ID に置き換えてください。Replace <tenantID> with ID of your Azure tenant.
  • servicePrincipalIdservicePrincipalKeyservicePrincipalId, servicePrincipalKey. <servicePrincipalID> と <servicePrincipalKey> は、Azure Active Directory の実際のサービス プリンシパルの ID とキーに置き換えてください。Replace <servicePrincipalID> and <servicePrincipalKey> with ID and key of your service principal in the Azure Active Directory. このサービス プリンシパルは、サブスクリプションまたはクラスターが作成されるリソース グループの共同作成者ロールのメンバーである必要があります。This service principal needs to be a member of the Contributor role of the subscription or the resource Group in which the cluster is created. 詳細については、Azure Active Directory のアプリケーションとサービス プリンシパルの作成に関するページを参照してください。See create Azure Active Directory application and service principal for details. [サービス プリンシパル ID] は "アプリケーション ID" に、 [サービス プリンシパル キー] は "クライアント シークレット" の値に相当します。The Service principal id is equivalent to the Application ID and a Service principal key is equivalent to the value for a Client secret.
  • clusterResourceGroupclusterResourceGroup. <resourceGroupOfHDICluster> は、HDInsight クラスターの作成先であるリソース グループの名前に置き換えてください。Replace <resourceGroupOfHDICluster> with the name of the resource group in which the HDInsight cluster needs to be created.

注意

Azure HDInsight には、サポートされる各 Azure リージョンで使用できるコアの合計数に制限があります。Azure HDInsight has limitation on the total number of cores you can use in each Azure region it supports. オンデマンドの HDInsight のリンクされるサービスの場合、HDInsight クラスターは、プライマリ ストレージとして使用される Azure ストレージと同じ場所に作成されます。For On-Demand HDInsight Linked Service, the HDInsight cluster will be created in the same location of the Azure Storage used as its primary storage. クラスターを正常に作成するための十分なコア クォータがあることを確認します。Ensure that you have enough core quotas for the cluster to be created successfully. 詳細については、「Hadoop、Spark、Kafka などの HDInsight クラスターをセットアップする」を参照してください。For more information, see Set up clusters in HDInsight with Hadoop, Spark, Kafka, and more.

パイプラインを作成するAuthor a pipeline

この手順では、Spark アクティビティがある新しいパイプラインを作成します。In this step, you create a new pipeline with a Spark activity. アクティビティでは word count サンプルが使用されます。The activity uses the word count sample. まだコンテンツをダウンロードしていない場合は、この場所からダウンロードします。Download the contents from this location if you haven't already done so.

任意のエディターで JSON ファイルを作成し、パイプライン定義から次の JSON 定義をコピーして、MySparkOnDemandPipeline.json として保存します。Create a JSON file in your preferred editor, copy the following JSON definition of a pipeline definition, and save it as MySparkOnDemandPipeline.json.

{
  "name": "MySparkOnDemandPipeline",
  "properties": {
    "activities": [
      {
        "name": "MySparkActivity",
        "type": "HDInsightSpark",
        "linkedServiceName": {
            "referenceName": "MyOnDemandSparkLinkedService",
            "type": "LinkedServiceReference"
        },
        "typeProperties": {
          "rootPath": "adftutorial/spark",
          "entryFilePath": "script/WordCount_Spark.py",
          "getDebugInfo": "Failure",
          "sparkJobLinkedService": {
            "referenceName": "MyStorageLinkedService",
            "type": "LinkedServiceReference"
          }
        }
      }
    ]
  }
}

以下の点に注意してください。Note the following points:

  • rootPath は、adftutorial コンテナーの spark フォルダーを指します。rootPath points to the spark folder of the adftutorial container.
  • entryFilePath は、spark フォルダーの script サブフォルダーの WordCount_Spark.py ファイルを指します。entryFilePath points to the WordCount_Spark.py file in the script sub folder of the spark folder.

Data Factory の作成Create a data factory

JSON ファイルで、リンクされたサービスとパイプライン定義を作成しました。You have authored linked service and pipeline definitions in JSON files. 次に、PowerShell コマンドレットを使用して、データ ファクトリを作成し、リンクされたサービスとパイプライン JSON ファイルをデプロイしましょう。Now, let’s create a data factory, and deploy the linked Service and pipeline JSON files by using PowerShell cmdlets. 以下の PowerShell コマンドを 1 つずつ実行します。Run the following PowerShell commands one by one:

  1. 変数を 1 つずつ設定します。Set variables one by one.

    リソース グループ名Resource Group Name

    $resourceGroupName = "ADFTutorialResourceGroup" 
    

    データ ファクトリ名。名前はグローバルに一意である必要があります。Data Factory Name. Must be globally unique

    $dataFactoryName = "MyDataFactory09102017"
    

    パイプライン名Pipeline name

    $pipelineName = "MySparkOnDemandPipeline" # Name of the pipeline
    
  2. PowerShellを起動します。Launch PowerShell. Azure PowerShell は、このクイックスタートが終わるまで開いたままにしておいてください。Keep Azure PowerShell open until the end of this quickstart. Azure PowerShell を閉じて再度開いた場合は、これらのコマンドをもう一度実行する必要があります。If you close and reopen, you need to run the commands again. 現在 Data Factory が利用できる Azure リージョンの一覧については、次のページで目的のリージョンを選択し、 [分析] を展開して [Data Factory] を探してください。(「リージョン別の利用可能な製品」)。For a list of Azure regions in which Data Factory is currently available, select the regions that interest you on the following page, and then expand Analytics to locate Data Factory: Products available by region. データ ファクトリで使用するデータ ストア (Azure Storage、Azure SQL Database など) やコンピューティング (HDInsight など) は他のリージョンに配置できます。The data stores (Azure Storage, Azure SQL Database, etc.) and computes (HDInsight, etc.) used by data factory can be in other regions.

    次のコマンドを実行して、Azure Portal へのサインインに使用するユーザー名とパスワードを入力します。Run the following command, and enter the user name and password that you use to sign in to the Azure portal:

    Connect-AzAccount
    

    次のコマンドを実行して、このアカウントのすべてのサブスクリプションを表示します。Run the following command to view all the subscriptions for this account:

    Get-AzSubscription
    

    次のコマンドを実行して、使用するサブスクリプションを選択します。Run the following command to select the subscription that you want to work with. SubscriptionId は、実際の Azure サブスクリプションの ID に置き換えてください。Replace SubscriptionId with the ID of your Azure subscription:

    Select-AzSubscription -SubscriptionId "<SubscriptionId>"    
    
  3. リソース グループを作成します。ADFTutorialResourceGroup。Create the resource group: ADFTutorialResourceGroup.

    New-AzResourceGroup -Name $resourceGroupName -Location "East Us" 
    
  4. データ ファクトリを作成します。Create the data factory.

     $df = Set-AzDataFactoryV2 -Location EastUS -Name $dataFactoryName -ResourceGroupName $resourceGroupName
    

    次のコマンドを実行して、出力を表示します。Execute the following command to see the output:

    $df
    
  5. JSON ファイルを作成したフォルダーに移動し、次のコマンドを実行して、Azure Storage のリンクされたサービスをデプロイします。Switch to the folder where you created JSON files, and run the following command to deploy an Azure Storage linked service:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"
    
  6. 次のコマンドを実行し、オンデマンドの Spark のリンクされたサービスをデプロイします。Run the following command to deploy an on-demand Spark linked service:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"
    
  7. 次のコマンドを実行し、パイプラインをデプロイします。Run the following command to deploy a pipeline:

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
    

パイプラインの実行を開始して監視するStart and monitor a pipeline run

  1. パイプラインの実行を開始します。Start a pipeline run. 将来の監視のために、パイプラインの実行の ID もキャプチャされます。It also captures the pipeline run ID for future monitoring.

    $runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName  
    
  2. 次のスクリプトを実行し、パイプラインの実行の状態を、完了するまで継続的にチェックします。Run the following script to continuously check the pipeline run status until it finishes.

    while ($True) {
        $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)
    
        if(!$result) {
            Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow"
        }
        elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) {
            Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow"
        }
        else {
            Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow"
            $result
            break
        }
        ($result | Format-List | Out-String)
        Start-Sleep -Seconds 15
    }
    
    Write-Host "Activity `Output` section:" -foregroundcolor "Yellow"
    $result.Output -join "`r`n"
    
    Write-Host "Activity `Error` section:" -foregroundcolor "Yellow"
    $result.Error -join "`r`n" 
    
  3. サンプル実行の出力結果を次に示します。Here is the output of the sample run:

    Pipeline run status: In Progress
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : 
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : 
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 
    DurationInMs      : 
    Status            : InProgress
    Error             :
    …
    
    Pipeline ' MySparkOnDemandPipeline' run finished. Result:
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : MyDataFactory09102017
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime}
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 9/20/2017 6:46:30 AM
    DurationInMs      : 763466
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    Activity Output section:
    "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.net/"
    "jobId": "0"
    "ExecutionProgress": "Succeeded"
    "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
    Activity Error section:
    "errorCode": ""
    "message": ""
    "failureType": ""
    "target": "MySparkActivity"
    
  4. adftutorial コンテナーの spark フォルダーに outputfiles というフォルダーが作成され、Spark プログラムの出力が含まれていることを確認します。Confirm that a folder named outputfiles is created in the spark folder of adftutorial container with the output from the spark program.

次の手順Next steps

このサンプルのパイプラインは、Azure BLOB ストレージ内のある場所から別の場所にデータをコピーするものです。The pipeline in this sample copies data from one location to another location in an Azure blob storage. 以下の方法について学習しました。You learned how to:

  • データ ファクトリを作成します。Create a data factory.
  • リンクされたサービスを作成してデプロイします。Author and deploy linked services.
  • パイプラインを作成してデプロイします。Author and deploy a pipeline.
  • パイプラインの実行を開始します。Start a pipeline run.
  • パイプラインの実行を監視します。Monitor the pipeline run.

仮想ネットワークにある Azure HDInsight クラスター上で Hive スクリプトを実行してデータを変換する方法については、次のチュートリアルに進んでください。Advance to the next tutorial to learn how to transform data by running Hive script on an Azure HDInsight cluster that is in a virtual network.