Przekształcanie danych w chmurze za pomocą działania platformy Spark w usłudze Azure Data Factory

DOTYCZY: Azure Data Factory Azure Synapse Analytics

Napiwek

Wypróbuj usługę Data Factory w usłudze Microsoft Fabric — rozwiązanie analityczne typu all-in-one dla przedsiębiorstw. Usługa Microsoft Fabric obejmuje wszystko, od przenoszenia danych do nauki o danych, analizy w czasie rzeczywistym, analizy biznesowej i raportowania. Dowiedz się, jak bezpłatnie rozpocząć nową wersję próbną !

W tym samouczku użyjesz programu Azure PowerShell do utworzenia potoku fabryki danych, który przekształca dane przy użyciu działania platformy Spark i połączonej usługi HDInsight na żądanie. Ten samouczek obejmuje następujące procedury:

  • Tworzenie fabryki danych.
  • Tworzenie i wdrażanie połączonych usług
  • Redagowanie i wdrażanie potoku.
  • Uruchom potok.
  • Monitorowanie uruchomienia potoku.

Jeśli nie masz subskrypcji platformy Azure, przed rozpoczęciem utwórz bezpłatne konto.

Wymagania wstępne

Uwaga

Do interakcji z platformą Azure zalecamy używanie modułu Azure Az w programie PowerShell. Zobacz Instalowanie programu Azure PowerShell, aby rozpocząć. Aby dowiedzieć się, jak przeprowadzić migrację do modułu Az PowerShell, zobacz Migracja programu Azure PowerShell z modułu AzureRM do modułu Az.

  • Konto usługi Azure Storage. Utworzysz skrypt języka Python i plik wejściowy, a następnie przekażesz go do usługi Azure Storage. Dane wyjściowe programu platformy Spark są przechowywane na tym koncie magazynu. Klaster platformy Spark na żądanie używa tego samego konta magazynu, jako swojego podstawowego magazynu.
  • Azure PowerShell. Wykonaj instrukcje podane w temacie Instalowanie i konfigurowanie programu Azure PowerShell.

Przekazywanie skryptu języka Python na konto usługi Blob Storage

  1. Utwórz plik w języku Python o nazwie WordCount_Spark.py i następującej zawartości:

    import sys
    from operator import add
    
    from pyspark.sql import SparkSession
    
    def main():
        spark = SparkSession\
            .builder\
            .appName("PythonWordCount")\
            .getOrCreate()
    
        lines = spark.read.text("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/inputfiles/minecraftstory.txt").rdd.map(lambda r: r[0])
        counts = lines.flatMap(lambda x: x.split(' ')) \
            .map(lambda x: (x, 1)) \
            .reduceByKey(add)
        counts.saveAsTextFile("wasbs://adftutorial@<storageaccountname>.blob.core.windows.net/spark/outputfiles/wordcount")
    
        spark.stop()
    
    if __name__ == "__main__":
        main()
    
  2. Zastąp wartość <storageAccountName> nazwą swojego konta usługi Azure Storage. Następnie zapisz plik.

  3. W usłudze Azure Blob Storage utwórz kontener o nazwie adftutorial, jeśli nie istnieje.

  4. Utwórz folder o nazwie spark.

  5. Utwórz podfolder o nazwie script w folderze spark.

  6. Przekaż plik WordCount_Spark.py do podfolderu script.

Przekazywanie pliku wejściowego

  1. Utwórz plik o nazwie minecraftstory.txt zawierający tekst. Program platformy Spark zlicza liczbę słów w tym tekście.
  2. Utwórz podfolder o nazwie inputfiles w folderze spark.
  3. Przekaż minecraftstory.txt do podfolderu inputfiles.

Redagowanie połączonych usług

W tej sekcji zredagujesz dwie połączone usługi:

  • Połączoną usługę Azure Storage, która łączy konto usługi Azure Storage z fabryką danych. Ten magazyn jest używany przez klaster usługi HDInsight na żądanie. Zawiera on także skrypt platformy Spark do wykonania.
  • Połączona usługa HDInsight na żądanie. Usługa Azure Data Factory automatycznie tworzy klaster usługi HDInsight, uruchamia program platformy Spark i usuwa klaster usługi HDInsight, gdy jest on bezczynny przez wstępnie skonfigurowany czas.

Połączona usługa Azure Storage

Utwórz plik w formacie JSON za pomocą preferowanego edytora, skopiuj poniższą definicję formatu JSON dotyczącą połączonej usługi Azure Storage, a następnie zapisz plik jako MyStorageLinkedService.json.

{
    "name": "MyStorageLinkedService",
    "properties": {
      "type": "AzureStorage",
      "typeProperties": {
        "connectionString": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>"
      }
    }
}

Zaktualizuj wartości parametrów <storageAccountName> i <storageAccountKey> nazwą konta usługi Azure Storage i jego kluczem.

Połączona usługa HDInsight na żądanie

Utwórz plik w formacie JSON za pomocą preferowanego edytora, skopiuj poniższą definicję formatu JSON dotyczącą połączonej usługi Azure HDIsight, a następnie zapisz plik jako MyOnDemandSparkLinkedService.json.

{
    "name": "MyOnDemandSparkLinkedService",
    "properties": {
      "type": "HDInsightOnDemand",
      "typeProperties": {
        "clusterSize": 2,
        "clusterType": "spark",
        "timeToLive": "00:15:00",
        "hostSubscriptionId": "<subscriptionID> ",
        "servicePrincipalId": "<servicePrincipalID>",
        "servicePrincipalKey": {
          "value": "<servicePrincipalKey>",
          "type": "SecureString"
        },
        "tenant": "<tenant ID>",
        "clusterResourceGroup": "<resourceGroupofHDICluster>",
        "version": "3.6",
        "osType": "Linux",
        "clusterNamePrefix":"ADFSparkSample",
        "linkedServiceName": {
          "referenceName": "MyStorageLinkedService",
          "type": "LinkedServiceReference"
        }
      }
    }
}

Zaktualizuj wartości następujących właściwości w definicji połączonej usługi:

  • hostSubscriptionId. Zastąp właściwość <SubscriptionId> identyfikatorem subskrypcji platformy Azure. Klaster usługi HDInsight na żądanie jest tworzony w tej subskrypcji.
  • tenant. Zastąp właściwość <tenantID> identyfikatorem dzierżawy platformy Azure.
  • servicePrincipalId, servicePrincipalKey. Zastąp <wartości servicePrincipalID> i <servicePrincipalKey> identyfikatorem i kluczem jednostki usługi w identyfikatorze Entra firmy Microsoft. Jednostka usługi musi być członkiem roli współautora subskrypcji lub grupy zasobów, gdzie został utworzony klaster. Aby uzyskać szczegółowe informacje, zobacz tworzenie aplikacji Microsoft Entra i jednostki usługi. Identyfikator jednostki usługi jest odpowiednikiem identyfikatora aplikacji, a klucz jednostki usługi jest odpowiednikiem wartości klucza tajnego klienta.
  • clusterResourceGroup. Zastąp właściwość <resourceGroupOfHDICluster> nazwą grupy zasobów, w której ma zostać utworzony klaster usługi HDInsight.

Uwaga

Usługa Azure HDInsight ma ograniczenia całkowitej liczby rdzeni, których możesz użyć w każdym obsługiwanym przez nią regionie platformy Azure. Dla połączonej usługi HDInsight na żądanie zostanie utworzony klaster usługi HDInsight w tej samej lokalizacji usługi Azure Storage użytej jako jej podstawowy magazyn. Upewnij się, że masz wystarczająco duże limity przydziału dla klastra, aby można go było pomyślnie utworzyć. Aby uzyskać więcej informacji, zobacz Set up clusters in HDInsight with Hadoop, Spark, Kafka and more (Konfigurowanie klastrów w usłudze HDInsight za pomocą platform Hadoop, Spark, Kafka i innych).

Redagowanie potoku

W tym kroku utworzysz nowy potok za pomocą działania platformy Spark. Działanie wykorzystuje próbkę liczby słów. Jeśli jeszcze tego nie zrobiono, pobierz zawartość z tej lokalizacji.

Utwórz plik w formacie JSON za pomocą preferowanego edytora, skopiuj poniższą definicję formatu JSON dotyczącą definicji potoku, a następnie zapisz go jako MySparkOnDemandPipeline.json.

{
  "name": "MySparkOnDemandPipeline",
  "properties": {
    "activities": [
      {
        "name": "MySparkActivity",
        "type": "HDInsightSpark",
        "linkedServiceName": {
            "referenceName": "MyOnDemandSparkLinkedService",
            "type": "LinkedServiceReference"
        },
        "typeProperties": {
          "rootPath": "adftutorial/spark",
          "entryFilePath": "script/WordCount_Spark.py",
          "getDebugInfo": "Failure",
          "sparkJobLinkedService": {
            "referenceName": "MyStorageLinkedService",
            "type": "LinkedServiceReference"
          }
        }
      }
    ]
  }
}

Należy uwzględnić następujące informacje:

  • Właściwość rootPath wskazuje folder platformy Spark kontenera adftutorial.
  • Właściwość entryFilePath wskazuje plik WordCount_Spark.py w podfolderze skryptu folderu Spark.

Tworzenie fabryki danych

W plikach w formacie JSON zostały zredagowane połączona usługa i definicje potoku. Teraz utwórzmy fabrykę danych i wdróżmy połączone pliki JSON usługi i potoku przy użyciu poleceń cmdlet programu PowerShell. Uruchom następujące polecenia programu PowerShell jedno po drugim:

  1. Ustaw zmienne jedną po drugiej.

    Nazwa grupy zasobów

    $resourceGroupName = "ADFTutorialResourceGroup" 
    

    Nazwa fabryki danych. Musi ona być unikatowa w skali globalnej

    $dataFactoryName = "MyDataFactory09102017"
    

    Nazwa potoku

    $pipelineName = "MySparkOnDemandPipeline" # Name of the pipeline
    
  2. Uruchom program PowerShell. Nie zamykaj programu Azure PowerShell aż do końca tego samouczka Szybki start. Jeśli go zamkniesz i otworzysz ponownie, musisz uruchomić te polecenia jeszcze raz. Aby uzyskać listę regionów platformy Azure, w których obecnie jest dostępna usługa Data Factory, wybierz dane regiony na poniższej stronie, a następnie rozwiń węzeł Analiza, aby zlokalizować pozycję Data Factory: Produkty dostępne według regionu. Magazyny danych (Azure Storage, Azure SQL Database itp.) i jednostki obliczeniowe (HDInsight itp.) używane przez fabrykę danych mogą mieścić się w innych regionach.

    Uruchom poniższe polecenie i wprowadź nazwę użytkownika oraz hasło, których używasz do logowania się w witrynie Azure Portal:

    Connect-AzAccount
    

    Uruchom poniższe polecenie, aby wyświetlić wszystkie subskrypcje dla tego konta:

    Get-AzSubscription
    

    Uruchom poniższe polecenie, aby wybrać subskrypcję, z którą chcesz pracować. Zastąp parametr SubscriptionId identyfikatorem Twojej subskrypcji platformy Azure:

    Select-AzSubscription -SubscriptionId "<SubscriptionId>"    
    
  3. Utwórz grupę zasobów: ADFTutorialResourceGroup.

    New-AzResourceGroup -Name $resourceGroupName -Location "East Us" 
    
  4. Utwórz fabrykę danych.

     $df = Set-AzDataFactoryV2 -Location EastUS -Name $dataFactoryName -ResourceGroupName $resourceGroupName
    

    Wykonaj następujące polecenie, aby wyświetlić dane wyjściowe:

    $df
    
  5. Przejdź do folderu, w którym zostały utworzone pliki w formacie JSON, a następnie uruchom następujące polecenie, aby wdrożyć połączoną usługę Azure Storage:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyStorageLinkedService" -File "MyStorageLinkedService.json"
    
  6. Uruchom następujące polecenie, aby wdrożyć połączoną usługę Spark na żądanie:

    Set-AzDataFactoryV2LinkedService -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name "MyOnDemandSparkLinkedService" -File "MyOnDemandSparkLinkedService.json"
    
  7. Uruchom następujące polecenie, aby wdrożyć potok:

    Set-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -Name $pipelineName -File "MySparkOnDemandPipeline.json"
    

Uruchamianie i monitorowanie działania potoku

  1. Uruchom potok. Umożliwia to również przechwycenie identyfikatora uruchomienia potoku w celu monitorowania w przyszłości.

    $runId = Invoke-AzDataFactoryV2Pipeline -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineName $pipelineName  
    
  2. Uruchom następujący skrypt, aby stale sprawdzać stan uruchomienia potoku do momentu zakończenia jego działania.

    while ($True) {
        $result = Get-AzDataFactoryV2ActivityRun -DataFactoryName $dataFactoryName -ResourceGroupName $resourceGroupName -PipelineRunId $runId -RunStartedAfter (Get-Date).AddMinutes(-30) -RunStartedBefore (Get-Date).AddMinutes(30)
    
        if(!$result) {
            Write-Host "Waiting for pipeline to start..." -foregroundcolor "Yellow"
        }
        elseif (($result | Where-Object { $_.Status -eq "InProgress" } | Measure-Object).count -ne 0) {
            Write-Host "Pipeline run status: In Progress" -foregroundcolor "Yellow"
        }
        else {
            Write-Host "Pipeline '"$pipelineName"' run finished. Result:" -foregroundcolor "Yellow"
            $result
            break
        }
        ($result | Format-List | Out-String)
        Start-Sleep -Seconds 15
    }
    
    Write-Host "Activity `Output` section:" -foregroundcolor "Yellow"
    $result.Output -join "`r`n"
    
    Write-Host "Activity `Error` section:" -foregroundcolor "Yellow"
    $result.Error -join "`r`n" 
    
  3. Oto dane wyjściowe przykładowego przebiegu:

    Pipeline run status: In Progress
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : 
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : 
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 
    DurationInMs      : 
    Status            : InProgress
    Error             :
    …
    
    Pipeline ' MySparkOnDemandPipeline' run finished. Result:
    ResourceGroupName : ADFTutorialResourceGroup
    DataFactoryName   : MyDataFactory09102017
    ActivityName      : MySparkActivity
    PipelineRunId     : 94e71d08-a6fa-4191-b7d1-cf8c71cb4794
    PipelineName      : MySparkOnDemandPipeline
    Input             : {rootPath, entryFilePath, getDebugInfo, sparkJobLinkedService}
    Output            : {clusterInUse, jobId, ExecutionProgress, effectiveIntegrationRuntime}
    LinkedServiceName : 
    ActivityRunStart  : 9/20/2017 6:33:47 AM
    ActivityRunEnd    : 9/20/2017 6:46:30 AM
    DurationInMs      : 763466
    Status            : Succeeded
    Error             : {errorCode, message, failureType, target}
    
    Activity Output section:
    "clusterInUse": "https://ADFSparkSamplexxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx.azurehdinsight.net/"
    "jobId": "0"
    "ExecutionProgress": "Succeeded"
    "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US)"
    Activity Error section:
    "errorCode": ""
    "message": ""
    "failureType": ""
    "target": "MySparkActivity"
    
  4. Potwierdź, że folder o nazwie outputfiles jest tworzony w folderze spark kontenera adftutorial zawierającym dane wyjściowe z programu platformy Spark.

Potok w tym przykładzie kopiuje dane z jednej lokalizacji do innej lokalizacji w usłudze Azure Blob Storage. W tym samouczku omówiono:

  • Tworzenie fabryki danych.
  • Tworzenie i wdrażanie połączonych usług
  • Redagowanie i wdrażanie potoku.
  • Uruchom potok.
  • Monitorowanie uruchomienia potoku.

Przejdź do następnego samouczka, aby dowiedzieć się, jak przekształcać dane, uruchamiając skrypt programu Hive w klastrze usługi Azure HDInsight, który znajduje się w sieci wirtualnej.