Преобразование данных в облаке с помощью действия Spark в фабрике данных Azure

Область применения:Фабрика данных Azure Azure Synapse Analytics

Совет

Попробуйте использовать фабрику данных в Microsoft Fabric, решение для аналитики с одним интерфейсом для предприятий. Microsoft Fabric охватывает все, от перемещения данных до обработки и анализа данных в режиме реального времени, бизнес-аналитики и отчетности. Узнайте, как бесплатно запустить новую пробную версию !

В этом руководстве вы используете Azure PowerShell для создания конвейера фабрики данных, который преобразовывает данные с помощью действия Spark и служба, связанная по запросу HDInsight. В этом руководстве вы выполните следующие шаги:

  • Создали фабрику данных.
  • Создали и развернули эти связанные службы.
  • Создание и развертывание конвейера.
  • Запуск конвейера.
  • Осуществили мониторинг выполнения конвейера.

Если у вас нет подписки Azure, создайте бесплатную учетную запись, прежде чем приступить к работе.

Предварительные требования

Примечание.

Мы рекомендуем использовать модуль Azure Az PowerShell для взаимодействия с Azure. Чтобы начать работу, см. статью Установка Azure PowerShell. Дополнительные сведения см. в статье Перенос Azure PowerShell с AzureRM на Az.

  • Учетная запись хранения Azure. Нужно создать скрипт Python и входной файл и отправить их в хранилище Azure. Выходные данные программы Spark хранятся в этой учетной записи хранения. Кластер Spark по запросу использует ту же учетную запись хранения, что и его основное хранилище.
  • Azure PowerShell. Следуйте инструкциям по установке и настройке Azure PowerShell.

Отправка скрипта Python в учетную запись хранилища BLOB-объектов

  1. Создайте файл Python с именем WordCount_Spark.py со следующим содержимым:

    import sys
    from operator import add
    
    from pyspark.sql import SparkSession
    
    def main():
        spark = SparkSession\
            .builder\
            .appName("PythonWordCount")\
            .getOrCreate()
    
        lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0])
        counts = lines.flatMap(lambda x: x.split(' ')) \
            .map(lambda x: (x, 1)) \
            .reduceByKey(add)
        counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount")
    
        spark.stop()
    
    if __name__ == "__main__":
        main()
    
  2. Замените свойство storageAccountName<> именем своей учетной записи хранения Azure. Затем сохраните файл.

  3. В хранилище BLOB-объектов Azure создайте контейнер с именем adftutorial, если он не существует.

  4. Создайте папку с именем spark.

  5. Создайте вложенную папку с именем script в папке spark.

  6. Отправьте файл WordCount_Spark.py во вложенную папку script.

Отправка входного файла

  1. Создайте файл с определенным текстом и назовите его minecraftstory.txt. Программа Spark подсчитывает количество слов в этом тексте.
  2. Создайте вложенную папку с именем inputfiles в папке spark.
  3. Отправьте файл minecraftstory.txt во вложенную папку inputfiles.

Создание связанных служб

Создайте две связанные службы в этом разделе:

  • Связанную службу хранилища Azure, которая связывает учетную запись хранения Azure с фабрикой данных. Это хранилище используется кластером HDInsight по запросу. В нем также содержится скрипт Spark для выполнения.
  • Связанную службу HDInsight по запросу. Фабрика данных Azure автоматически создает кластер HDInsight, запускает программу Spark, а затем удаляет кластер HDInsight после простоя в течение предварительно настроенного времени.

Связанная служба хранения Azure

Создайте файл JSON, используя предпочитаемый редактор, скопируйте следующее определение JSON связанной службы хранилища Azure и сохраните файл как MyStorageLinkedService.json.

{
    "name": "MyStorageLinkedService",
    "properties": {
      "type": "AzureStorage",
      "typeProperties": {
        "connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>"
      }
    }
}

Обновите параметры <storageAccountName> и <storageAccountKey>, использовав имя и ключ своей учетной записи хранения Azure.

Связанная служба HDInsight по запросу

Создайте файл JSON, используя предпочитаемый редактор, скопируйте следующее определение JSON связанной службы HDInsight Azure и сохраните файл как MyOnDemandSparkLinkedService.json.

{
    "name": "MyOnDemandSparkLinkedService",
    "properties": {
      "type": "HDInsightOnDemand",
      "typeProperties": {
        "clusterSize": 2,
        "clusterType": "spark",
        "timeToLive": "00:15:00",
        "hostSubscriptionId": "<subscriptionID> ",
        "servicePrincipalId": "<servicePrincipalID>",
        "servicePrincipalKey": {
          "value": "<servicePrincipalKey>",
          "type": "SecureString"
        },
        "tenant": "<tenant ID>",
        "clusterResourceGroup": "<resourceGroupofHDICluster>",
        "version": "3.6",
        "osType": "Linux",
        "clusterNamePrefix":"ADFSparkSample",
        "linkedServiceName": {
          "referenceName": "MyStorageLinkedService",
          "type": "LinkedServiceReference"
        }
      }
    }
}

Обновите значения следующих свойств в определении связанных служб:

  • hostSubscriptionId. Замените значение <> на идентификатор подписки Azure. В этой подписке создается кластер HDInsight по требованию.
  • tenant. Замените <tenantID> на идентификатор своего клиента Azure.
  • servicePrincipalId, servicePrincipalKey. Замените <servicePrincipalID> и servicePrincipalKey> идентификатором и <ключом субъекта-службы в идентификаторе Microsoft Entra. Этому субъекту-службе должна быть назначена роли участника подписки или группы ресурсов, в которой создается кластер. Дополнительные сведения см . в разделе "Создание приложения Microsoft Entra" и субъекта-службы . Идентификатор субъекта-службы —это эквивалент идентификатора приложения, а ключ субъекта-службы — значения секрета клиента.
  • clusterResourceGroup. Замените <resourceGroupOfHDICluster> на имя группы ресурсов, в которой необходимо создать кластер HDInsight.

Примечание.

Azure HDInsight имеет ограничение на общее количество ядер, которые можно использовать в каждом поддерживаемом регионе Azure. Для связанной службы HDInsight по требованию будет создан кластер HDInsight в расположении хранилища Azure, используемом в качестве его основного хранилища. Убедитесь, что имеется достаточное количество квот ядра для успешного создания кластера. Дополнительные сведения см. в статье Установка кластеров в HDInsight с использованием Hadoop, Spark, Kafka и других технологий.

Создание конвейера

На этом этапе создайте конвейер с действием Spark. В действии используется пример статистики. Загрузите содержимое из этого расположения, если вы еще не сделали этого.

Создайте файл JSON в предпочитаемом редакторе, скопируйте следующее определение JSON определения конвейера и сохраните его как MySparkOnDemandPipeline.json.

{
  "name": "MySparkOnDemandPipeline",
  "properties": {
    "activities": [
      {
        "name": "MySparkActivity",
        "type": "HDInsightSpark",
        "linkedServiceName": {
            "referenceName": "MyOnDemandSparkLinkedService",
            "type": "LinkedServiceReference"
        },
        "typeProperties": {
          "rootPath": "adftutorial/spark",
          "entryFilePath": "script/WordCount_Spark.py",
          "getDebugInfo": "Failure",
          "sparkJobLinkedService": {
            "referenceName": "MyStorageLinkedService",
            "type": "LinkedServiceReference"
          }
        }
      }
    ]
  }
}

Обратите внимание на следующие аспекты:

  • rootPath указывает на папку spark контейнера adftutorial.
  • entryFilePath указывает на файл WordCount_Spark.py во вложенной папке скрипта папки spark.

Создание фабрики данных

Вы создали связанную службу и определения конвейера в файлах JSON. Теперь нужно создать фабрику данных и развернуть связанную службу и файлы JSON конвейера с помощью командлетов PowerShell. Последовательно выполните следующие команды PowerShell:

  1. По очереди задайте переменные.

    Имя группы ресурсов

    $resourceGroupName = "ADFTutorialResourceGroup" 
    

    Имя Фабрики данных. (оно должно быть глобально уникальным)

    $dataFactoryName = "MyDataFactory09102017"
    

    Имя конвейера

    $pipelineName = "MySparkOnDemandPipeline" # Name of the pipeline
    
  2. Запустите PowerShell. Не закрывайте Azure PowerShell, пока выполняются описанные в этом кратком руководстве инструкции. Если закрыть и снова открыть это окно, то придется вновь выполнять эти команды. Чтобы получить список регионов Azure, в которых в настоящее время доступна Фабрика данных, выберите интересующие вас регионы на следующей странице, а затем разверните раздел Аналитика, чтобы найти пункт Фабрика данных: Доступность продуктов по регионам. Хранилища данных (служба хранилища Azure, база данных SQL Azure и т. д.) и вычисления (HDInsight и т. д.), используемые фабрикой данных, могут располагаться в других регионах.

    Выполните следующую команду и введите имя пользователя и пароль, которые используются для входа на портал Azure.

    Connect-AzAccount
    

    Чтобы просмотреть все подписки для этой учетной записи, выполните следующую команду:

    Get-AzSubscription
    

    Выполните следующую команду, чтобы выбрать подписку, с которой вы собираетесь работать. Замените значение SubscriptionId на идентификатор подписки Azure:

    Select-AzSubscription -SubscriptionId "<SubscriptionId>"    
    
  3. Создайте группу ресурсов ADFTutorialResourceGroup.

    New-AzResourceGroup -Name $resourceGroupName -Location "East Us" 
    
  4. Создайте фабрику данных.

     $df = Set-AzDataFactoryV2 -Location EastUS -Name $dataFactoryName -ResourceGroupName $resourceGroupName
    

    Чтобы просмотреть выходные данные, выполните следующую команду:

    $df
    
  5. Перейдите в папку, где были созданы файлы JSON, и выполните следующую команду, чтобы развернуть связанную службу хранилища Azure.

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"
    
  6. Выполните следующую команду, чтобы развернуть связанную службу Spark по запросу.

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"
    
  7. Выполните следующую команду, чтобы развернуть конвейер.

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
    

Запуск и мониторинг выполнения конвейера

  1. Запуск конвейера. Эта команда также запишет идентификатор выполнения конвейера для будущего мониторинга.

    $runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName  
    
  2. Запустите следующий скрипт, чтобы проверять состояние выполнения, пока оно не завершится.

    while ($True) {
        $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)
    
        if(!$result) {
            Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow"
        }
        elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) {
            Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow"
        }
        else {
            Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow"
            $result
            break
        }
        ($result | Format-List | Out-String)
        Start-Sleep -Seconds 15
    }
    
    Write-Host "Activity `Output` section:" -foregroundcolor "Yellow"
    $result.Output -join "`r`n"
    
    Write-Host "Activity `Error` section:" -foregroundcolor "Yellow"
    $result.Error -join "`r`n" 
    
  3. Вот результат примера выполнения:

    Pipeline run status: In Progress
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : 
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : 
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 
    DurationInMs      : 
    Status            : InProgress
    Error             :
    …
    
    Pipeline ' MySparkOnDemandPipeline' run finished. Result:
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : MyDataFactory09102017
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime}
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 9/20/2017 6:46:30 AM
    DurationInMs      : 763466
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    Activity Output section:
    "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.net/"
    "jobId": "0"
    "ExecutionProgress": "Succeeded"
    "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
    Activity Error section:
    "errorCode": ""
    "message": ""
    "failureType": ""
    "target": "MySparkActivity"
    
  4. Убедитесь, что папка с именем outputfiles создана в папке spark контейнера adftutorial с выходными данными программы Spark.

В этом примере конвейер копирует данные из одного расположения в другое в хранилище BLOB-объектов Azure. Вы научились выполнять следующие задачи:

  • Создали фабрику данных.
  • Создали и развернули эти связанные службы.
  • Создание и развертывание конвейера.
  • Запуск конвейера.
  • Осуществили мониторинг выполнения конвейера.

Чтобы узнать, как преобразовать данные, запустив скрипт Hive в кластере Azure HDInsight, который находится в виртуальной сети, ознакомьтесь со следующим руководством.

Transform data in Azure Virtual Network using Hive activity in Azure Data Factory (Преобразование данных в виртуальной сети Azure с помощью действия Hive в фабрике данных Azure).