Trasformare dati usando l'attività di streaming di Hadoop in Azure Data Factory

È possibile usare l'attività HDInsightStreamingActivity per richiamare un processo di Hadoop Streaming da una pipeline di Data factory di Azure. Il frammento JSON seguente illustra la sintassi per l'uso di HDInsightStreamingActivity in un file JSON della pipeline.

L'attività HDInsight Streaming Activity in una pipeline di Data Factory esegue i programmi di Hadoop Streaming nei cluster HDInsight personalizzati o su richiesta basati su Windows/Linux. Questo articolo si basa sull'articolo relativo alle attività di trasformazione dei dati che presenta una panoramica generale della trasformazione dei dati e le attività di trasformazione supportate.

Nota

Se non si ha familiarità con Azure Data Factory, leggere l'Introduzione ad Azure Data Factory ed eseguire l'esercitazione: Creare la prima pipeline di dati prima di leggere questo articolo.

Esempio JSON

Il cluster HDInsight viene popolato automaticamente con programmi di esempio (wc.exe e cat.exe) e con i dati (davinci.txt). Per impostazione predefinita, il nome del contenitore che viene utilizzato dal cluster HDInsight è il nome del cluster stesso. Ad esempio, se il nome del cluster è myhdicluster, il nome del contenitore BLOB associato sarebbe myhdicluster.

{
    "name": "HadoopStreamingPipeline",
    "properties": {
        "description": "Hadoop Streaming Demo",
        "activities": [
            {
                "type": "HDInsightStreaming",
                "typeProperties": {
                    "mapper": "cat.exe",
                    "reducer": "wc.exe",
                    "input": "wasb://<nameofthecluster>@spestore.blob.core.windows.net/example/data/gutenberg/davinci.txt",
                    "output": "wasb://<nameofthecluster>@spestore.blob.core.windows.net/example/data/StreamingOutput/wc.txt",
                    "filePaths": [
                        "<nameofthecluster>/example/apps/wc.exe",
                        "<nameofthecluster>/example/apps/cat.exe"
                    ],
                    "fileLinkedService": "AzureStorageLinkedService",
                    "getDebugInfo": "Failure"
                },
                "outputs": [
                    {
                        "name": "StreamingOutputDataset"
                    }
                ],
                "policy": {
                    "timeout": "01:00:00",
                    "concurrency": 1,
                    "executionPriorityOrder": "NewestFirst",
                    "retry": 1
                },
                "scheduler": {
                    "frequency": "Day",
                    "interval": 1
                },
                "name": "RunHadoopStreamingJob",
                "description": "Run a Hadoop streaming job",
                "linkedServiceName": "HDInsightLinkedService"
            }
        ],
        "start": "2014-01-04T00:00:00Z",
        "end": "2014-01-05T00:00:00Z"
    }
}

Tenere presente quanto segue:

  1. Impostare linkedServiceName sul nome del servizio collegato che punta al cluster HDInsight in cui viene eseguito il processo di streaming mapreduce.
  2. Impostare il tipo di attività su HDInsightStreaming.
  3. Per la proprietà mapper specificare il nome dell'eseguibile del mapper. Nell'esempio cat.exe è l'eseguibile del mapper.
  4. Per la proprietà reducer specificare il nome dell'eseguibile del reducer. Nell'esempio wc.exe è l'eseguibile del mapper.
  5. Per la proprietà di tipo input specificare il file di input (incluso il percorso) per il mapper. Nell'esempio "wasb://adfsample@.blob.core.windows.net/example/data/gutenberg/davinci.txt" adfsample è il contenitore BLOB, example/data/Gutenberg è la cartella e davinci.txt è il BLOB.
  6. Per la proprietà di tipo output specificare il file di output (incluso il percorso) per il reducer. L'output del processo di Hadoop Streaming viene scritto nel percorso specificato per questa proprietà.
  7. Nella sezione filePaths specificare i percorsi dei file eseguibili del mapper e del reducer. Nell'esempio: "adfsample/example/apps/wc.exe", adfsample è il contenitore BLOB, example/apps è la cartella e wc.exe è l'eseguibile.
  8. Per la proprietà fileLinkedService specificare il servizio collegato Archiviazione di Azure che rappresenta l'archivio di Azure contenente i file specificati nella sezione filePaths.
  9. Per la proprietà arguments specificare gli argomenti per il processo di streaming.
  10. La proprietà getDebugInfo è un elemento facoltativo. Quando viene impostata su Failure, i log vengono scaricati solo in caso di errore. Quando viene impostata su Always, i log vengono sempre scaricati indipendentemente dallo stato dell'esecuzione.
Nota

Come illustrato nell'esempio, è necessario specificare un set di dati di output per l'attività di Hadoop Streaming per la proprietà output . Questo è solo un set di dati fittizio necessario per la pianificazione della pipeline. Non è necessario specificare alcun set di dati di input per l'attività per la proprietà input .

Esempio

La pipeline in questa procedura dettagliata esegue il programma di mapping e riduzione dello streaming del conteggio delle parole sul cluster HDInsight di Azure.

Servizi collegati

Servizio collegato Archiviazione di Azure

In primo luogo, si crea un servizio collegato per collegare l'archiviazione di Azure utilizzata dal cluster HDInsight di Azure per la factory di dati di Azure. Se si copia e incolla il codice seguente, non dimenticare di sostituire il nome account e la chiave account con il nome e la chiave di archiviazione di Azure.

{
    "name": "StorageLinkedService",
    "properties": {
        "type": "AzureStorage",
        "typeProperties": {
            "connectionString": "DefaultEndpointsProtocol=https;AccountName=<account name>;AccountKey=<account key>"
        }
    }
}

Servizio collegato Azure HDInsight

Successivamente, si crea un servizio collegato per collegare il cluster HDInsight di Azure alla factory di dati di Azure. Se si copia e incolla il codice seguente, sostituire il nome del cluster HDInsight con il nome del cluster HDInsight e modificare i valori nome utente e password.

{
    "name": "HDInsightLinkedService",
    "properties": {
        "type": "HDInsight",
        "typeProperties": {
            "clusterUri": "https://<HDInsight cluster name>.azurehdinsight.net",
            "userName": "admin",
            "password": "**********",
            "linkedServiceName": "StorageLinkedService"
        }
    }
}

Set di dati

Set di dati di output

La pipeline in questo esempio non accetta alcun input. È necessario specificare un set di dati di output per l'attività Streaming di HDInsight. Questo è solo un set di dati fittizio necessario per la pianificazione della pipeline.

{
    "name": "StreamingOutputDataset",
    "properties": {
        "published": false,
        "type": "AzureBlob",
        "linkedServiceName": "StorageLinkedService",
        "typeProperties": {
            "folderPath": "adftutorial/streamingdata/",
            "format": {
                "type": "TextFormat",
                "columnDelimiter": ","
            },
        },
        "availability": {
            "frequency": "Day",
            "interval": 1
        }
    }
}

Pipeline

La pipeline in questo esempio contiene una sola attività che è di tipo: HDInsightStreaming.

Il cluster HDInsight viene popolato automaticamente con programmi di esempio (wc.exe e cat.exe) e con i dati (davinci.txt). Per impostazione predefinita, il nome del contenitore che viene utilizzato dal cluster HDInsight è il nome del cluster stesso. Ad esempio, se il nome del cluster è myhdicluster, il nome del contenitore BLOB associato sarebbe myhdicluster.

{
    "name": "HadoopStreamingPipeline",
    "properties": {
        "description": "Hadoop Streaming Demo",
        "activities": [
            {
                "type": "HDInsightStreaming",
                "typeProperties": {
                    "mapper": "cat.exe",
                    "reducer": "wc.exe",
                    "input": "wasb://<blobcontainer>@spestore.blob.core.windows.net/example/data/gutenberg/davinci.txt",
                    "output": "wasb://<blobcontainer>@spestore.blob.core.windows.net/example/data/StreamingOutput/wc.txt",
                    "filePaths": [
                        "<blobcontainer>/example/apps/wc.exe",
                        "<blobcontainer>/example/apps/cat.exe"
                    ],
                    "fileLinkedService": "StorageLinkedService"
                },
                "outputs": [
                    {
                        "name": "StreamingOutputDataset"
                    }
                ],
                "policy": {
                    "timeout": "01:00:00",
                    "concurrency": 1,
                    "executionPriorityOrder": "NewestFirst",
                    "retry": 1
                },
                "scheduler": {
                    "frequency": "Day",
                    "interval": 1
                },
                "name": "RunHadoopStreamingJob",
                "description": "Run a Hadoop streaming job",
                "linkedServiceName": "HDInsightLinkedService"
            }
        ],
        "start": "2017-01-03T00:00:00Z",
        "end": "2017-01-04T00:00:00Z"
    }
}

Vedere anche