Usare C# con lo streaming di MapReduce su Apache Hadoop in HDInsight

Informazioni su come usare C# per creare una soluzione di MapReduce su HDInsight.

Lo streaming Apache Hadoop consente di eseguire processi MapReduce usando uno script o un eseguibile. In questo caso, .NET viene usato per implementare il mapper e il riduttore per una soluzione di conteggio delle parole.

.NET su HDInsight

I cluster HDInsight usano Mono (https://mono-project.com) per eseguire applicazioni .NET. La versione Mono 4.2.1 è inclusa nella versione 3.6 di HDInsight. Per altre informazioni sulla versione di Mono inclusa in HDInsight, vedere Componenti apache Hadoop disponibili con le versioni di HDInsight.

Per altre informazioni sulla compatibilità Mono con le versioni di .NET Framework, vedere il documento relativo alla compatibilità Mono.

Come funziona lo streaming di Hadoop

Il processo di base usato per il flusso in questo documento è il seguente:

  1. Hadoop passa i dati al mapper (mapper.exe in questo esempio) su STDIN.
  2. Il mapper elabora i dati ed emette una coppia chiave/valore delimitata da tabulazione su STDOUT.
  3. L'output viene letto da Hadoop e quindi passato al riduttore (reducer.exe in questo esempio) su STDIN.
  4. Il riduttore legge le coppie chiave/valore delimitate da tabulazioni, elabora i dati e quindi genera il risultato come coppie chiave/valore delimitate da tabulazione su STDOUT.
  5. L'output viene letto da Hadoop e scritto nella directory di output.

Per altre informazioni sullo streaming, vedere Hadoop Streaming.

Prerequisiti

  • Visual Studio.

  • Una familiarità nello scrivere e nel compilare il codice C# destinato a .NET Framework 4.5.

  • Un modo per caricare i file .exe sul cluster. La procedura in questo documento usa gli strumenti Data Lake per Visual Studio per caricare i file nell'archiviazione primaria per il cluster.

  • Se si usa PowerShell, è necessario il modulo Az.

  • Un cluster Apache Hadoop in HDInsight. Vedere Guida introduttiva: Introduzione ad Apache Hadoop e Apache Hive in Azure HDInsight usando il modello di Resource Manager.

  • Lo schema URI per l'archiviazione primaria dei cluster. Questo schema è wasb:// per Archiviazione di Azure, abfs:// per Azure Data Lake Archiviazione Gen2 o adl:// per Azure Data Lake Archiviazione Gen1. Se il trasferimento sicuro è abilitato per Archiviazione di Azure o Data Lake Archiviazione Gen2, l'URI sarà wasbs:// rispettivamente o abfss://.

Creare il mapper

In Visual Studio creare una nuova applicazione console .NET Framework denominata mapper. Usare il codice seguente per l'applicazione:

using System;
using System.Text.RegularExpressions;

namespace mapper
{
    class Program
    {
        static void Main(string[] args)
        {
            string line;
            //Hadoop passes data to the mapper on STDIN
            while((line = Console.ReadLine()) != null)
            {
                // We only want words, so strip out punctuation, numbers, etc.
                var onlyText = Regex.Replace(line, @"\.|;|:|,|[0-9]|'", "");
                // Split at whitespace.
                var words = Regex.Matches(onlyText, @"[\w]+");
                // Loop over the words
                foreach(var word in words)
                {
                    //Emit tab-delimited key/value pairs.
                    //In this case, a word and a count of 1.
                    Console.WriteLine("{0}\t1",word);
                }
            }
        }
    }
}

Dopo aver creato l'applicazione, compilarla per produrre il file /bin/Debug/mapper.exe nella directory del progetto.

Creare il reducer

In Visual Studio creare una nuova applicazione console .NET Framework denominata reducer. Usare il codice seguente per l'applicazione:

using System;
using System.Collections.Generic;

namespace reducer
{
    class Program
    {
        static void Main(string[] args)
        {
            //Dictionary for holding a count of words
            Dictionary<string, int> words = new Dictionary<string, int>();

            string line;
            //Read from STDIN
            while ((line = Console.ReadLine()) != null)
            {
                // Data from Hadoop is tab-delimited key/value pairs
                var sArr = line.Split('\t');
                // Get the word
                string word = sArr[0];
                // Get the count
                int count = Convert.ToInt32(sArr[1]);

                //Do we already have a count for the word?
                if(words.ContainsKey(word))
                {
                    //If so, increment the count
                    words[word] += count;
                } else
                {
                    //Add the key to the collection
                    words.Add(word, count);
                }
            }
            //Finally, emit each word and count
            foreach (var word in words)
            {
                //Emit tab-delimited key/value pairs.
                //In this case, a word and a count of 1.
                Console.WriteLine("{0}\t{1}", word.Key, word.Value);
            }
        }
    }
}

Dopo aver creato l'applicazione, compilarla per produrre il file /bin/Debug/reducer.exe nella directory del progetto.

Caricare nella risorsa di archiviazione

Successivamente, è necessario caricare le applicazioni mapper e reducer nell'archiviazione HDInsight.

  1. In Visual Studio selezionare Visualizza>Esplora server.

  2. Fare clic con il pulsante destro del mouse su Azure, selezionare Connessione alla sottoscrizione di Microsoft Azure e completare il processo di accesso.

  3. Espandere il cluster HDInsight in cui si desidera distribuire l'applicazione. Viene elencata una voce con il testo (Account di archiviazione predefinito).

    Storage account, HDInsight cluster, Server Explorer, Visual Studio.

    • Se la voce (Account Archiviazione predefinito) può essere espansa, si usa un account Archiviazione di Azure come risorsa di archiviazione predefinita per il cluster. Per visualizzare i file nella risorsa di archiviazione predefinita per il cluster, espandere la voce e quindi fare doppio clic su (Contenitore predefinito).

    • Se la voce (Account Archiviazione predefinito) non può essere espansa, si usa Azure Data Lake Archiviazione come risorsa di archiviazione predefinita per il cluster. Per visualizzare i file nel percorso di archiviazione predefinito per il cluster, fare doppio clic sulla voce (Account di archiviazione predefinito).

  4. Per caricare i file con estensione .exe, usare uno dei metodi seguenti:

    • Se si usa un account Archiviazione di Azure, selezionare l'icona Carica BLOB.

      HDInsight upload icon for mapper, Visual Studio.

      Nella finestra di dialogo Carica nuovo file, in Nome file selezionare Sfoglia. Nella finestra di dialogo Carica BLOB passare alla cartella bin\debug per il progetto mapper e quindi scegliere il file mapper.exe. Infine, selezionare Apri e quindi OK per completare il caricamento.

    • Per Azure Data Lake Archiviazione, fare clic con il pulsante destro del mouse su un'area vuota nell'elenco di file e quindi scegliere Carica. Infine, selezionare il file mapper.exe e quindi selezionare Apri.

    Una volta terminato il caricamento mapper.exe, ripetere il processo di caricamento per il file reducer.exe.

Eseguire un processo: uso di una sessione SSH

La procedura seguente descrive come eseguire un processo MapReduce usando una sessione SSH:

  1. Usare il comando ssh per connettersi al cluster. Modificare il comando seguente sostituendo CLUSTERNAME con il nome del cluster in uso e quindi immettere il comando:

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Usare uno dei comandi seguenti per avviare il processo MapReduce:

    • Se l'archiviazione predefinita è Archiviazione di Azure:

      yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
          -files wasbs:///mapper.exe,wasbs:///reducer.exe \
          -mapper mapper.exe \
          -reducer reducer.exe \
          -input /example/data/gutenberg/davinci.txt \
          -output /example/wordcountout
      
    • Se l'archiviazione predefinita è Data Lake Archiviazione Gen1:

      yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
          -files adl:///mapper.exe,adl:///reducer.exe \
          -mapper mapper.exe \
          -reducer reducer.exe \
          -input /example/data/gutenberg/davinci.txt \
          -output /example/wordcountout
      
    • Se l'archiviazione predefinita è Data Lake Archiviazione Gen2:

      yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
          -files abfs:///mapper.exe,abfs:///reducer.exe \
          -mapper mapper.exe \
          -reducer reducer.exe \
          -input /example/data/gutenberg/davinci.txt \
          -output /example/wordcountout
      

    L'elenco seguente descrive cosa rappresenta ogni parametro e opzione:

    Parametro Descrizione
    hadoop-streaming.jar Specifica il file JAR che contiene la funzionalità MapReduce di streaming.
    -File Specifica i file mapper.exe e reducer.exe per questo processo. La wasbs:///dichiarazione del protocollo , adl:///o abfs:/// prima di ogni file è il percorso della radice dell'archiviazione predefinita per il cluster.
    -Mapper Specifica il file che implementa il mapper.
    -Riduttore Specifica il file che implementa il riduttore.
    -Input Specifica i dati di input.
    -Output Specifica la directory di output.
  3. Al termine del processo MapReduce, usare il comando seguente per visualizzare i risultati:

    hdfs dfs -text /example/wordcountout/part-00000
    

    L'elenco seguente è un esempio dei dati restituiti da questo comando:

    you     1128
    young   38
    younger 1
    youngest        1
    your    338
    yours   4
    yourself        34
    yourselves      3
    youth   17
    

Esecuzione di un processo: Uso di PowerShell

Usare il seguente script di PowerShell per eseguire un processo MapReduce e scaricare i risultati.

# Login to your Azure subscription
$context = Get-AzContext
if ($context -eq $null) 
{
    Connect-AzAccount
}
$context

# Get HDInsight info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$creds=Get-Credential -Message "Enter the login for the cluster"

# Path for job output
$outputPath="/example/wordcountoutput"

# Progress indicator
$activity="C# MapReduce example"
Write-Progress -Activity $activity -Status "Getting cluster information..."
#Get HDInsight info so we can get the resource group, storage, etc.
$clusterInfo = Get-AzHDInsightCluster -ClusterName $clusterName
$resourceGroup = $clusterInfo.ResourceGroup
$storageActArr=$clusterInfo.DefaultStorageAccount.split('.')
$storageAccountName=$storageActArr[0]
$storageType=$storageActArr[1]

# Progress indicator
#Define the MapReduce job
# Note: using "/mapper.exe" and "/reducer.exe" looks in the root
#       of default storage.
$jobDef=New-AzHDInsightStreamingMapReduceJobDefinition `
    -Files "/mapper.exe","/reducer.exe" `
    -Mapper "mapper.exe" `
    -Reducer "reducer.exe" `
    -InputPath "/example/data/gutenberg/davinci.txt" `
    -OutputPath $outputPath

# Start the job
Write-Progress -Activity $activity -Status "Starting MapReduce job..."
$job=Start-AzHDInsightJob `
    -ClusterName $clusterName `
    -JobDefinition $jobDef `
    -HttpCredential $creds

#Wait for the job to complete
Write-Progress -Activity $activity -Status "Waiting for the job to complete..."
Wait-AzHDInsightJob `
    -ClusterName $clusterName `
    -JobId $job.JobId `
    -HttpCredential $creds

Write-Progress -Activity $activity -Completed

# Download the output 
if($storageType -eq 'azuredatalakestore') {
    # Azure Data Lake Store
    # Fie path is the root of the HDInsight storage + $outputPath
    $filePath=$clusterInfo.DefaultStorageRootPath + $outputPath + "/part-00000"
    Export-AzDataLakeStoreItem `
        -Account $storageAccountName `
        -Path $filePath `
        -Destination output.txt
} else {
    # Az.Storage account
    # Get the container
    $container=$clusterInfo.DefaultStorageContainer
    #NOTE: This assumes that the storage account is in the same resource
    #      group as HDInsight. If it is not, change the
    #      --ResourceGroupName parameter to the group that contains storage.
    $storageAccountKey=(Get-AzStorageAccountKey `
        -Name $storageAccountName `
    -ResourceGroupName $resourceGroup)[0].Value

    #Create a storage context
    $context = New-AzStorageContext `
        -StorageAccountName $storageAccountName `
        -StorageAccountKey $storageAccountKey
    # Download the file
    Get-AzStorageBlobContent `
        -Blob 'example/wordcountoutput/part-00000' `
        -Container $container `
        -Destination output.txt `
        -Context $context
}

Questo script richiede l'account di accesso del cluster e la password, insieme al nome del cluster HDInsight. Al termine del processo, l'output viene scaricato in un file denominato output.txt. Il testo seguente è un esempio dei dati nel file output.txt:

you     1128
young   38
younger 1
youngest        1
your    338
yours   4
yourself        34
yourselves      3
youth   17

Passaggi successivi