HDInsight 上の Apache Hadoop で C# と MapReduce ストリーミングを使用する

HDInsight で C# を使用して MapReduce ソリューションを作成する方法について説明します。

Apache Hadoop ストリーミングでは、スクリプトまたは実行可能ファイルを使用して MapReduce ジョブを実行できます。 ここでは、.NET を使用してマッパーとレジューサのワード カウント ソリューションを実装します。

HDInsight の .NET

HDInsight クラスターでは、Mono (https://mono-project.com) を使用して .NET アプリケーションを実行します。 Mono バージョン 4.2.1 は HDInsight バージョン 3.6 に付属しています。 HDInsight に含まれる Mono のバージョンについて詳しくは、「HDInsight の各バージョンで使用できる Apache Hadoop コンポーネント」を参照してください。

.NET Framework のバージョンと Mono の互換性の詳細については、「Mono compatibility」 (Mono の互換性) を参照してください。

Hadoop ストリーミングのしくみ

このドキュメントでストリーミングに使用する基本的なプロセスは、次のとおりです。

  1. Hadoop は、STDIN のマッパー (この例では mapper.exe) にデータを渡します。
  2. マッパーはデータを処理し、タブ区切りのキー/値ペアを STDOUT に出力します。
  3. 出力は Hadoop によって読み取られ、STDIN のレジューサ (この例では reducer.exe) に渡されます。
  4. レジューサは、タブ区切りのキー/値ペアを読み取り、データを処理した後、タブ区切りのキー/値ペアとして結果を STDOUT に出力します。
  5. 出力は Hadoop によって読み取られ、出力ディレクトリに書き込まれます。

ストリーミングの詳細については、「Hadoop Streaming」(Hadoop ストリーミング) を参照してください。

前提条件

  • 見ることができます。

  • .NET Framework 4.5 を対象にした C# コードの記述と構築に精通していること。

  • クラスターに .exe ファイルをアップロードする方法。 このドキュメントの手順では、Visual Studio の Data Lake ツールを使用して、クラスターのプライマリ ストレージにファイルをアップロードします。

  • PowerShell を使用している場合は、AZ モジュールが必要になります。

  • HDInsight の Apache Hadoop クラスター。 Linux での HDInsight の概要に関するページを参照してください。

  • クラスターのプライマリ ストレージの URI スキーム。 このスキームは、Azure Storage では wasb://、Azure Data Lake Storage Gen2 では abfs://、Azure Data Lake Storage Gen1 では adl:// です。 Azure Storage または Data Lake Storage Gen2 で安全な転送が有効になっている場合、URI はそれぞれ wasbs:// または abfss:// になります。

マッパーの作成

Visual Studio で、"マッパー" と呼ばれる新しい .NET Framework コンソール アプリケーションを作成します。 アプリケーションには次のコードを使用します。

using System;
using System.Text.RegularExpressions;

namespace mapper
{
    class Program
    {
        static void Main(string[] args)
        {
            string line;
            //Hadoop passes data to the mapper on STDIN
            while((line = Console.ReadLine()) != null)
            {
                // We only want words, so strip out punctuation, numbers, etc.
                var onlyText = Regex.Replace(line, @"\.|;|:|,|[0-9]|'", "");
                // Split at whitespace.
                var words = Regex.Matches(onlyText, @"[\w]+");
                // Loop over the words
                foreach(var word in words)
                {
                    //Emit tab-delimited key/value pairs.
                    //In this case, a word and a count of 1.
                    Console.WriteLine("{0}\t1",word);
                }
            }
        }
    }
}

アプリケーションを作成した後、それをビルドしてプロジェクト ディレクトリに /bin/Debug/mapper.exe ファイルを生成します。

レジューサの作成

Visual Studio で、"レジューサ" と呼ばれる新しい .NET Framework コンソール アプリケーションを作成します。 アプリケーションには次のコードを使用します。

using System;
using System.Collections.Generic;

namespace reducer
{
    class Program
    {
        static void Main(string[] args)
        {
            //Dictionary for holding a count of words
            Dictionary<string, int> words = new Dictionary<string, int>();

            string line;
            //Read from STDIN
            while ((line = Console.ReadLine()) != null)
            {
                // Data from Hadoop is tab-delimited key/value pairs
                var sArr = line.Split('\t');
                // Get the word
                string word = sArr[0];
                // Get the count
                int count = Convert.ToInt32(sArr[1]);

                //Do we already have a count for the word?
                if(words.ContainsKey(word))
                {
                    //If so, increment the count
                    words[word] += count;
                } else
                {
                    //Add the key to the collection
                    words.Add(word, count);
                }
            }
            //Finally, emit each word and count
            foreach (var word in words)
            {
                //Emit tab-delimited key/value pairs.
                //In this case, a word and a count of 1.
                Console.WriteLine("{0}\t{1}", word.Key, word.Value);
            }
        }
    }
}

アプリケーションを作成した後、それをビルドしてプロジェクト ディレクトリに /bin/Debug/reducer.exe ファイルを生成します。

ストレージにアップロードする

次に、"マッパー" アプリケーションと "レジューサ" アプリケーションを HDInsight ストレージにアップロードする必要があります。

  1. Visual Studio で、 [表示]>[サーバー エクスプローラー] を選択します。

  2. [Azure] を右クリックし、 [Microsoft Azure サブスクリプションへの接続] を選択して、サインイン処理を完了します。

  3. このアプリケーションをデプロイする HDInsight クラスターを展開します。 エントリとテキスト (既定のストレージ アカウント) が一覧表示されます。

    Storage account, HDInsight cluster, Server Explorer, Visual Studio.

    • [(既定のストレージ アカウント)] エントリを展開できる場合は、クラスターの既定のストレージとして Azure ストレージ アカウントを使用しています。 クラスターの既定のストレージのファイルを表示するには、エントリを展開し、 [(既定のコンテナー)] をダブルクリックします。

    • [(既定のストレージ アカウント)] エントリを展開できない場合は、クラスターの既定のストレージとして Azure Data Lake Storage を使用しています。 クラスターの既定のストレージにファイルを表示するには、 (既定のストレージ アカウント) エントリをダブルクリックします。

  4. .exe ファイルをアップロードするには、次のいずれかの方法を使用します。

    • Azure ストレージ アカウントを使用している場合は、 [BLOB のアップロード] アイコンを選択します。

      HDInsight upload icon for mapper, Visual Studio.

      [新しいファイルのアップロード] ダイアログ ボックスの [ファイル名] で、 [参照] を選択します。 [BLOB のアップロード] ダイアログ ボックスで、"マッパー" プロジェクトの bin\debug フォルダーに移動し、mapper.exe ファイルを選択します。 最後に、 [開く] を選択し、 [OK] を選択してアップロードを完了します。

    • Azure Data Lake Storage の場合は、ファイルの一覧の空の領域を右クリックし、 [アップロード] を選択します。 最後に、mapper.exe ファイルを選択し、 [開く] を選択します。

    mapper.exe のアップロードが完了したら、 reducer.exe ファイルのアップロード プロセスを繰り返します。

ジョブの実行: SSH セッションを使用

次の手順では、SSH セッションを使用して MapReduce ジョブを実行する方法について説明します。

  1. ssh コマンドを使用してクラスターに接続します。 次のコマンドを編集して CLUSTERNAME をクラスターの名前に置き換えてから、そのコマンドを入力します。

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. MapReduce ジョブを開始するには、次のいずれかのコマンドを使用します。

    • 既定のストレージが Azure Storage の場合:

      yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
          -files wasbs:///mapper.exe,wasbs:///reducer.exe \
          -mapper mapper.exe \
          -reducer reducer.exe \
          -input /example/data/gutenberg/davinci.txt \
          -output /example/wordcountout
      
    • 既定のストレージが Data Lake Storage Gen1 の場合:

      yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
          -files adl:///mapper.exe,adl:///reducer.exe \
          -mapper mapper.exe \
          -reducer reducer.exe \
          -input /example/data/gutenberg/davinci.txt \
          -output /example/wordcountout
      
    • 既定のストレージが Data Lake Storage Gen2 の場合:

      yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
          -files abfs:///mapper.exe,abfs:///reducer.exe \
          -mapper mapper.exe \
          -reducer reducer.exe \
          -input /example/data/gutenberg/davinci.txt \
          -output /example/wordcountout
      

    次の一覧では、各パラメーターおよびオプションによって表されているものについて説明します。

    パラメーター 説明
    hadoop-streaming.jar ストリーミング MapReduce 機能が含まれる jar ファイルを指定します。
    -files このジョブに対する mapper.exe ファイルと reducer.exe ファイルを指定します。 各ファイルの前の wasbs:///adl:///、または abfs:/// のプロトコル宣言は、クラスターの既定の記憶域のルートへのパスです。
    -mapper マッパーが実装されているファイルを指定します。
    -reducer レジューサが実装されているファイルを指定します。
    -input 入力データを指定します。
    -output 出力ディレクトリを指定します。
  3. MapReduce ジョブが完了したら、次のコマンドを使用して結果を表示します。

    hdfs dfs -text /example/wordcountout/part-00000
    

    次のテキストは、このコマンドによって返されるデータの例です。

    you     1128
    young   38
    younger 1
    youngest        1
    your    338
    yours   4
    yourself        34
    yourselves      3
    youth   17
    

ジョブの実行: PowerShell の使用

次の PowerShell スクリプトを使用して、MapReduce ジョブを実行し、結果をダウンロードします。

# Login to your Azure subscription
$context = Get-AzContext
if ($context -eq $null) 
{
    Connect-AzAccount
}
$context

# Get HDInsight info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$creds=Get-Credential -Message "Enter the login for the cluster"

# Path for job output
$outputPath="/example/wordcountoutput"

# Progress indicator
$activity="C# MapReduce example"
Write-Progress -Activity $activity -Status "Getting cluster information..."
#Get HDInsight info so we can get the resource group, storage, etc.
$clusterInfo = Get-AzHDInsightCluster -ClusterName $clusterName
$resourceGroup = $clusterInfo.ResourceGroup
$storageActArr=$clusterInfo.DefaultStorageAccount.split('.')
$storageAccountName=$storageActArr[0]
$storageType=$storageActArr[1]

# Progress indicator
#Define the MapReduce job
# Note: using "/mapper.exe" and "/reducer.exe" looks in the root
#       of default storage.
$jobDef=New-AzHDInsightStreamingMapReduceJobDefinition `
    -Files "/mapper.exe","/reducer.exe" `
    -Mapper "mapper.exe" `
    -Reducer "reducer.exe" `
    -InputPath "/example/data/gutenberg/davinci.txt" `
    -OutputPath $outputPath

# Start the job
Write-Progress -Activity $activity -Status "Starting MapReduce job..."
$job=Start-AzHDInsightJob `
    -ClusterName $clusterName `
    -JobDefinition $jobDef `
    -HttpCredential $creds

#Wait for the job to complete
Write-Progress -Activity $activity -Status "Waiting for the job to complete..."
Wait-AzHDInsightJob `
    -ClusterName $clusterName `
    -JobId $job.JobId `
    -HttpCredential $creds

Write-Progress -Activity $activity -Completed

# Download the output 
if($storageType -eq 'azuredatalakestore') {
    # Azure Data Lake Store
    # Fie path is the root of the HDInsight storage + $outputPath
    $filePath=$clusterInfo.DefaultStorageRootPath + $outputPath + "/part-00000"
    Export-AzDataLakeStoreItem `
        -Account $storageAccountName `
        -Path $filePath `
        -Destination output.txt
} else {
    # Az.Storage account
    # Get the container
    $container=$clusterInfo.DefaultStorageContainer
    #NOTE: This assumes that the storage account is in the same resource
    #      group as HDInsight. If it is not, change the
    #      --ResourceGroupName parameter to the group that contains storage.
    $storageAccountKey=(Get-AzStorageAccountKey `
        -Name $storageAccountName `
    -ResourceGroupName $resourceGroup)[0].Value

    #Create a storage context
    $context = New-AzStorageContext `
        -StorageAccountName $storageAccountName `
        -StorageAccountKey $storageAccountKey
    # Download the file
    Get-AzStorageBlobContent `
        -Blob 'example/wordcountoutput/part-00000' `
        -Container $container `
        -Destination output.txt `
        -Context $context
}

このスクリプトには、クラスター ログインのアカウント名とパスワードに加え、HDInsight のクラスター名が求められます。 ジョブが完了すると、出力が output.txt というファイルにダウンロードされます。 次のテキストは、output.txtファイル内のデータの例です。

you     1128
young   38
younger 1
youngest        1
your    338
yours   4
yourself        34
yourselves      3
youth   17

次のステップ