Usare C# con lo streaming di MapReduce su Hadoop in HDInsight

Informazioni su come usare C# per creare una soluzione di MapReduce su HDInsight.

Importante

Linux è l'unico sistema operativo usato in HDInsight versione 3.4 o successiva. Per altre informazioni, vedere Componenti e versioni di Hadoop disponibili in HDInsight.

Hadoop streaming è un'utilità che consente di eseguire processi MapReduce tramite uno script o eseguibile. In questo esempio, .NET è usato per implementare il mapper e il reducer per una soluzione di conteggio parole.

.NET su HDInsight

I cluster HDInsight basati su Linux usano Mono (https://mono-project.com) per eseguire le applicazioni .NET. La versione Mono 4.2.1 è inclusa nella versione 3.5 di HDInsight. Per altre informazioni sulla versione Mono compresa in HDInsight, vedere Componenti e versioni di Hadoop disponibili in HDInsight. Per usare una versione specifica di Mono, vedere il documento Install or update Mono (Installare o aggiornare Mono).

Per altre informazioni sulla compatibilità Mono con le versioni di .NET Framework, vedere il documento relativo alla compatibilità Mono.

Come funziona lo streaming di Hadoop

Il processo di base usato per il flusso in questo documento è il seguente:

  1. Hadoop passa i dati al mapper (mapper.exe in questo esempio) su STDIN.
  2. Il mapper elabora i dati ed emette una coppia chiave/valore delimitata da tabulazione su STDOUT.
  3. L'output viene letto da Hadoop e quindi passato al riduttore (reducer.exe in questo esempio) su STDIN.
  4. Il riduttore legge le coppie chiave/valore delimitate da tabulazioni, elabora i dati e quindi genera il risultato come coppie chiave/valore delimitate da tabulazione su STDOUT.
  5. L'output viene letto da Hadoop e scritto nella directory di output.

Per altre informazioni sui flussi, vedere il documento Hadoop Streaming (https://hadoop.apache.org/docs/r2.7.1/hadoop-streaming/HadoopStreaming.html).

Prerequisiti

  • Una familiarità nello scrivere e nel compilare il codice C# destinato a .NET Framework 4.5. Nella procedura di questo documento viene usato Visual Studio 2017.

  • Un modo per caricare i file .exe sul cluster. La procedura in questo documento usa gli strumenti Data Lake per Visual Studio per caricare i file nell'archiviazione primaria per il cluster.

  • Azure PowerShell o un client SSH.

  • Un cluster Hadoop in HDInsight. Per altre informazioni sulla creazione di un cluster, vedere Creare cluster Hadoop in HDInsight.

Creare il mapper

In Visual Studio creare una nuova applicazione console denominata mapper. Usare il codice seguente per l'applicazione:

using System;
using System.Text.RegularExpressions;

namespace mapper
{
    class Program
    {
        static void Main(string[] args)
        {
            string line;
            //Hadoop passes data to the mapper on STDIN
            while((line = Console.ReadLine()) != null)
            {
                // We only want words, so strip out punctuation, numbers, etc.
                var onlyText = Regex.Replace(line, @"\.|;|:|,|[0-9]|'", "");
                // Split at whitespace.
                var words = Regex.Matches(onlyText, @"[\w]+");
                // Loop over the words
                foreach(var word in words)
                {
                    //Emit tab-delimited key/value pairs.
                    //In this case, a word and a count of 1.
                    Console.WriteLine("{0}\t1",word);
                }
            }
        }
    }
}

Dopo aver creato l'applicazione, compilarla per produrre il file /bin/Debug/mapper.exe nella directory del progetto.

Creare il reducer

In Visual Studio creare una nuova applicazione console denominata reducer. Usare il codice seguente per l'applicazione:

using System;
using System.Collections.Generic;

namespace reducer
{
    class Program
    {
        static void Main(string[] args)
        {
            //Dictionary for holding a count of words
            Dictionary<string, int> words = new Dictionary<string, int>();

            string line;
            //Read from STDIN
            while ((line = Console.ReadLine()) != null)
            {
                // Data from Hadoop is tab-delimited key/value pairs
                var sArr = line.Split('\t');
                // Get the word
                string word = sArr[0];
                // Get the count
                int count = Convert.ToInt32(sArr[1]);

                //Do we already have a count for the word?
                if(words.ContainsKey(word))
                {
                    //If so, increment the count
                    words[word] += count;
                } else
                {
                    //Add the key to the collection
                    words.Add(word, count);
                }
            }
            //Finally, emit each word and count
            foreach (var word in words)
            {
                //Emit tab-delimited key/value pairs.
                //In this case, a word and a count of 1.
                Console.WriteLine("{0}\t{1}", word.Key, word.Value);
            }
        }
    }
}

Dopo aver creato l'applicazione, compilarla per produrre il file /bin/Debug/reducer.exe nella directory del progetto.

Caricare nella risorsa di archiviazione

  1. In Visual Studio aprire Esplora server.

  2. Espandere Azure e quindi HDInsight.

  3. Se richiesto, immettere le credenziali della sottoscrizione di Azure, quindi fare clic su Accedi.

  4. Espandere il cluster HDInsight in cui si desidera distribuire l'applicazione. Viene elencata una voce con il testo (Account di archiviazione predefinito).

    Esplora server con account di archiviazione per il cluster

    • Se è possibile espandere questa voce, si usa un Account di archiviazione di Azure come risorsa di archiviazione predefinita per il cluster. Per visualizzare i file nel percorso di archiviazione predefinito per il cluster, espandere la voce e quindi fare doppio clic su (Contenitore predefinito).

    • Se non è possibile espandere questa voce, si usa un Azure Data Lake Store come risorsa di archiviazione predefinita per il cluster. Per visualizzare i file nel percorso di archiviazione predefinito per il cluster, fare doppio clic sulla voce (Account di archiviazione predefinito).

  5. Per caricare i file con estensione .exe, usare uno dei metodi seguenti:

    • Se si usa un Account di Archiviazione di Azure, fare clic sull'icona per il caricamento, quindi passare alla cartella bin\debug per il progetto mapper. Selezionare infine il file mapper.exe e fare clic su Ok.

      icona relativa al caricamento

    • Se si usa Azure Data Lake Store, fare doppio clic su un'area vuota nell'elenco di file e quindi selezionare Carica. Selezionare infine il file mapper.exe e fare clic su Apri.

      Una volta terminato il caricamento mapper.exe, ripetere il processo di caricamento per il file reducer.exe.

Eseguire un processo: uso di una sessione SSH

  1. Connettersi al cluster HDInsight usando SSH. Per altre informazioni, vedere Usare SSH con HDInsight.

  2. Usare uno dei seguenti comandi per avviare il processo MapReduce:

    • Se si usa Azure Data Lake Store come risorsa di archiviazione predefinita:

      yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar -files adl:///mapper.exe,adl:///reducer.exe -mapper mapper.exe -reducer reducer.exe -input /example/data/gutenberg/davinci.txt -output /example/wordcountout
      
    • Se si usa Archiviazione di Azure come risorsa di archiviazione predefinita:

      yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar -files wasb:///mapper.exe,wasb:///reducer.exe -mapper mapper.exe -reducer reducer.exe -input /example/data/gutenberg/davinci.txt -output /example/wordcountout
      

      L'elenco seguente descrive le operazioni eseguite da ogni parametro:

    • hadoop-streaming.jar: il file con estensione jar che contiene la funzionalità di streaming MapReduce.

    • -files: aggiunge i file mapper.exe e reducer.exe a questo processo. adl:/// o wasb:/// prima di ogni file rappresenta il percorso della radice di archiviazione predefinita per il cluster.
    • -mapper: specifica il file che implementa il mapper.
    • -reducer: specifica il file che implementa il reducer.
    • -input: dati di input.
    • -output: directory di output.
  3. Dopo il completamento del processo di MapReduce, usare il comando seguente per visualizzare i risultati:

    hdfs dfs -text /example/wordcountout/part-00000
    

    L'elenco seguente è un esempio dei dati restituiti da questo comando:

     you     1128
     young   38
     younger 1
     youngest        1
     your    338
     yours   4
     yourself        34
     yourselves      3
     youth   17
    

Esecuzione di un processo: Uso di PowerShell

Usare il seguente script di PowerShell per eseguire un processo MapReduce e scaricare i risultati.

# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzureRmSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
    Add-AzureRmAccount
}

# Get HDInsight info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$creds=Get-Credential -Message "Enter the login for the cluster"

# Path for job output
$outputPath="/example/wordcountoutput"

# Progress indicator
$activity="C# MapReduce example"
Write-Progress -Activity $activity -Status "Getting cluster information..."
#Get HDInsight info so we can get the resource group, storage, etc.
$clusterInfo = Get-AzureRmHDInsightCluster -ClusterName $clusterName
$resourceGroup = $clusterInfo.ResourceGroup
$storageActArr=$clusterInfo.DefaultStorageAccount.split('.')
$storageAccountName=$storageActArr[0]
$storageType=$storageActArr[1]

# Progress indicator
#Define the MapReduce job
# Note: using "/mapper.exe" and "/reducer.exe" looks in the root
#       of default storage.
$jobDef=New-AzureRmHDInsightStreamingMapReduceJobDefinition `
    -Files "/mapper.exe","/reducer.exe" `
    -Mapper "mapper.exe" `
    -Reducer "reducer.exe" `
    -InputPath "/example/data/gutenberg/davinci.txt" `
    -OutputPath $outputPath

# Start the job
Write-Progress -Activity $activity -Status "Starting MapReduce job..."
$job=Start-AzureRmHDInsightJob `
    -ClusterName $clusterName `
    -JobDefinition $jobDef `
    -HttpCredential $creds

#Wait for the job to complete
Write-Progress -Activity $activity -Status "Waiting for the job to complete..."
Wait-AzureRmHDInsightJob `
    -ClusterName $clusterName `
    -JobId $job.JobId `
    -HttpCredential $creds

Write-Progress -Activity $activity -Completed

# Download the output 
if($storageType -eq 'azuredatalakestore') {
    # Azure Data Lake Store
    # Fie path is the root of the HDInsight storage + $outputPath
    $filePath=$clusterInfo.DefaultStorageRootPath + $outputPath + "/part-00000"
    Export-AzureRmDataLakeStoreItem `
        -Account $storageAccountName `
        -Path $filePath `
        -Destination output.txt
} else {
    # Azure Storage account
    # Get the container
    $container=$clusterInfo.DefaultStorageContainer
    #NOTE: This assumes that the storage account is in the same resource
    #      group as HDInsight. If it is not, change the
    #      --ResourceGroupName parameter to the group that contains storage.
    $storageAccountKey=(Get-AzureRmStorageAccountKey `
        -Name $storageAccountName `
    -ResourceGroupName $resourceGroup)[0].Value

    #Create a storage context
    $context = New-AzureStorageContext `
        -StorageAccountName $storageAccountName `
        -StorageAccountKey $storageAccountKey
    # Download the file
    Get-AzureStorageBlobContent `
        -Blob 'example/data/WordCountOutput/part-r-00000' `
        -Container $container `
        -Destination output.txt `
        -Context $context
}

Questo script richiede l'account di accesso del cluster e la password, insieme al nome del cluster HDInsight. Al termine del processo, l'output è scaricato nel file output.txt nella directory da cui viene eseguito lo script. Il testo seguente è un esempio dei dati nel file output.txt:

you     1128
young   38
younger 1
youngest        1
your    338
yours   4
yourself        34
yourselves      3
youth   17

Passaggi successivi

Per altre informazioni sull'uso di MapReduce con HDInsight, vedere l'articolo Usare MapReduce in Hadoop su HDInsight.

Per informazioni sull'uso di C# con Hive e Pig, vedere Usare le funzioni definite dall'utente C# con lo streaming Hive e Pig in Hadoop in HDInsight.

Per informazioni sull'uso di C# con Storm in HDInsight tramite, vedere Sviluppare topologie C# per Apache Storm in HDInsight tramite gli strumenti Hadoop per Visual Studio.