HDInsight 上の Apache Hadoop で C# と MapReduce ストリーミングを使用するUse C# with MapReduce streaming on Apache Hadoop in HDInsight

HDInsight で C# を使用して MapReduce ソリューションを作成する方法について説明します。Learn how to use C# to create a MapReduce solution on HDInsight.

Apache Hadoop ストリーミングでは、スクリプトまたは実行可能ファイルを使用して MapReduce ジョブを実行できます。Apache Hadoop streaming allows you to run MapReduce jobs using a script or executable. ここでは、.NET を使用してマッパーとレジューサのワード カウント ソリューションを実装します。Here, .NET is used to implement the mapper and reducer for a word count solution.

HDInsight の .NET.NET on HDInsight

HDInsight クラスターでは、Mono (https://mono-project.com) を使用して .NET アプリケーションを実行します。HDInsight clusters use Mono (https://mono-project.com) to run .NET applications. Mono バージョン 4.2.1 は HDInsight バージョン 3.6 に付属しています。Mono version 4.2.1 is included with HDInsight version 3.6. HDInsight に含まれる Mono のバージョンについて詳しくは、「HDInsight の各バージョンで使用できる Apache Hadoop コンポーネント」を参照してください。For more information on the version of Mono included with HDInsight, see Apache Hadoop components available with different HDInsight versions.

.NET Framework のバージョンと Mono の互換性の詳細については、「Mono compatibility」 (Mono の互換性) を参照してください。For more information on Mono compatibility with .NET Framework versions, see Mono compatibility.

Hadoop ストリーミングのしくみHow Hadoop streaming works

このドキュメントでストリーミングに使用する基本的なプロセスは、次のとおりです。The basic process used for streaming in this document is as follows:

  1. Hadoop は、STDIN のマッパー (この例では mapper.exe ) にデータを渡します。Hadoop passes data to the mapper ( mapper.exe in this example) on STDIN.
  2. マッパーはデータを処理し、タブ区切りのキー/値ペアを STDOUT に出力します。The mapper processes the data, and emits tab-delimited key/value pairs to STDOUT.
  3. 出力は Hadoop によって読み取られ、STDIN のレジューサ (この例では reducer.exe ) に渡されます。The output is read by Hadoop, and then passed to the reducer ( reducer.exe in this example) on STDIN.
  4. レジューサは、タブ区切りのキー/値ペアを読み取り、データを処理した後、タブ区切りのキー/値ペアとして結果を STDOUT に出力します。The reducer reads the tab-delimited key/value pairs, processes the data, and then emits the result as tab-delimited key/value pairs on STDOUT.
  5. 出力は Hadoop によって読み取られ、出力ディレクトリに書き込まれます。The output is read by Hadoop and written to the output directory.

ストリーミングの詳細については、「Hadoop Streaming」(Hadoop ストリーミング) を参照してください。For more information on streaming, see Hadoop Streaming.

前提条件Prerequisites

  • 見ることができます。Visual Studio.

  • .NET Framework 4.5 を対象にした C# コードの記述と構築に精通していること。A familiarity with writing and building C# code that targets .NET Framework 4.5.

  • クラスターに .exe ファイルをアップロードする方法。A way to upload .exe files to the cluster. このドキュメントの手順では、Visual Studio の Data Lake ツールを使用して、クラスターのプライマリ ストレージにファイルをアップロードします。The steps in this document use the Data Lake Tools for Visual Studio to upload the files to primary storage for the cluster.

  • PowerShell を使用している場合は、AZ モジュールが必要になります。If using PowerShell, you'll need the Az Module.

  • HDInsight の Apache Hadoop クラスター。An Apache Hadoop cluster on HDInsight. Linux での HDInsight の概要に関するページを参照してください。See Get Started with HDInsight on Linux.

  • クラスターのプライマリ ストレージの URI スキーム。The URI scheme for your clusters primary storage. このスキームは、Azure Storage では wasb://、Azure Data Lake Storage Gen2 では abfs://、Azure Data Lake Storage Gen1 では adl:// です。This scheme would be wasb:// for Azure Storage, abfs:// for Azure Data Lake Storage Gen2 or adl:// for Azure Data Lake Storage Gen1. Azure Storage または Data Lake Storage Gen2 で安全な転送が有効になっている場合、URI はそれぞれ wasbs:// または abfss:// になります。If secure transfer is enabled for Azure Storage or Data Lake Storage Gen2, the URI would be wasbs:// or abfss://, respectively.

マッパーの作成Create the mapper

Visual Studio で、" マッパー " と呼ばれる新しい .NET Framework コンソール アプリケーションを作成します。In Visual Studio, create a new .NET Framework console application named mapper . アプリケーションには次のコードを使用します。Use the following code for the application:

using System;
using System.Text.RegularExpressions;

namespace mapper
{
    class Program
    {
        static void Main(string[] args)
        {
            string line;
            //Hadoop passes data to the mapper on STDIN
            while((line = Console.ReadLine()) != null)
            {
                // We only want words, so strip out punctuation, numbers, etc.
                var onlyText = Regex.Replace(line, @"\.|;|:|,|[0-9]|'", "");
                // Split at whitespace.
                var words = Regex.Matches(onlyText, @"[\w]+");
                // Loop over the words
                foreach(var word in words)
                {
                    //Emit tab-delimited key/value pairs.
                    //In this case, a word and a count of 1.
                    Console.WriteLine("{0}\t1",word);
                }
            }
        }
    }
}

アプリケーションを作成した後、それをビルドしてプロジェクト ディレクトリに /bin/Debug/mapper.exe ファイルを生成します。After you create the application, build it to produce the /bin/Debug/mapper.exe file in the project directory.

レジューサの作成Create the reducer

Visual Studio で、" レジューサ " と呼ばれる新しい .NET Framework コンソール アプリケーションを作成します。In Visual Studio, create a new .NET Framework console application named reducer . アプリケーションには次のコードを使用します。Use the following code for the application:

using System;
using System.Collections.Generic;

namespace reducer
{
    class Program
    {
        static void Main(string[] args)
        {
            //Dictionary for holding a count of words
            Dictionary<string, int> words = new Dictionary<string, int>();

            string line;
            //Read from STDIN
            while ((line = Console.ReadLine()) != null)
            {
                // Data from Hadoop is tab-delimited key/value pairs
                var sArr = line.Split('\t');
                // Get the word
                string word = sArr[0];
                // Get the count
                int count = Convert.ToInt32(sArr[1]);

                //Do we already have a count for the word?
                if(words.ContainsKey(word))
                {
                    //If so, increment the count
                    words[word] += count;
                } else
                {
                    //Add the key to the collection
                    words.Add(word, count);
                }
            }
            //Finally, emit each word and count
            foreach (var word in words)
            {
                //Emit tab-delimited key/value pairs.
                //In this case, a word and a count of 1.
                Console.WriteLine("{0}\t{1}", word.Key, word.Value);
            }
        }
    }
}

アプリケーションを作成した後、それをビルドしてプロジェクト ディレクトリに /bin/Debug/reducer.exe ファイルを生成します。After you create the application, build it to produce the /bin/Debug/reducer.exe file in the project directory.

ストレージにアップロードするUpload to storage

次に、" マッパー " アプリケーションと " レジューサ " アプリケーションを HDInsight ストレージにアップロードする必要があります。Next, you need to upload the mapper and reducer applications to HDInsight storage.

  1. Visual Studio で、 [表示] > [サーバー エクスプローラー] を選択します。In Visual Studio, select View > Server Explorer .

  2. [Azure] を右クリックし、 [Microsoft Azure サブスクリプションへの接続] を選択して、サインイン処理を完了します。Right-click Azure , select Connect to Microsoft Azure Subscription... , and complete the sign-in process.

  3. このアプリケーションをデプロイする HDInsight クラスターを展開します。Expand the HDInsight cluster that you wish to deploy this application to. エントリとテキスト (既定のストレージ アカウント) が一覧表示されます。An entry with the text (Default Storage Account) is listed.

    ストレージ アカウント、HDInsight クラスター、サーバー エクスプローラー、Visual Studio

    • [(既定のストレージ アカウント)] エントリを展開できる場合は、クラスターの既定のストレージとして Azure ストレージ アカウント を使用しています。If the (Default Storage Account) entry can be expanded, you're using an Azure Storage Account as default storage for the cluster. クラスターの既定のストレージのファイルを表示するには、エントリを展開し、 [(既定のコンテナー)] をダブルクリックします。To view the files on the default storage for the cluster, expand the entry and then double-click (Default Container) .

    • [(既定のストレージ アカウント)] エントリを展開できない場合は、クラスターの既定のストレージとして Azure Data Lake Storage を使用しています。If the (Default Storage Account) entry can't be expanded, you're using Azure Data Lake Storage as the default storage for the cluster. クラスターの既定のストレージにファイルを表示するには、 (既定のストレージ アカウント) エントリをダブルクリックします。To view the files on the default storage for the cluster, double-click the (Default Storage Account) entry.

  4. .exe ファイルをアップロードするには、次のいずれかの方法を使用します。To upload the .exe files, use one of the following methods:

    • Azure ストレージ アカウント を使用している場合は、 [BLOB のアップロード] アイコンを選択します。If you're using an Azure Storage Account , select the Upload Blob icon.

      マッパーの HDInsight アップロード アイコン、Visual Studio

      [新しいファイルのアップロード] ダイアログ ボックスの [ファイル名] で、 [参照] を選択します。In the Upload New File dialog box, under File name , select Browse . [BLOB のアップロード] ダイアログ ボックスで、" マッパー " プロジェクトの bin\debug フォルダーに移動し、 mapper.exe ファイルを選択します。In the Upload Blob dialog box, go to the bin\debug folder for the mapper project, and then choose the mapper.exe file. 最後に、 [開く] を選択し、 [OK] を選択してアップロードを完了します。Finally, select Open and then OK to complete the upload.

    • Azure Data Lake Storage の場合は、ファイルの一覧の空の領域を右クリックし、 [アップロード] を選択します。For Azure Data Lake Storage , right-click an empty area in the file listing, and then select Upload . 最後に、 mapper.exe ファイルを選択し、 [開く] を選択します。Finally, select the mapper.exe file and then select Open .

    mapper.exe のアップロードが完了したら、 reducer.exe ファイルのアップロード プロセスを繰り返します。Once the mapper.exe upload has finished, repeat the upload process for the reducer.exe file.

ジョブの実行: SSH セッションを使用Run a job: Using an SSH session

次の手順では、SSH セッションを使用して MapReduce ジョブを実行する方法について説明します。The following procedure describes how to run a MapReduce job using an SSH session:

  1. ssh コマンドを使用してクラスターに接続します。Use ssh command to connect to your cluster. 次のコマンドを編集して CLUSTERNAME をクラスターの名前に置き換えてから、そのコマンドを入力します。Edit the command below by replacing CLUSTERNAME with the name of your cluster, and then enter the command:

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. MapReduce ジョブを開始するには、次のいずれかのコマンドを使用します。Use one of the following commands to start the MapReduce job:

    • 既定のストレージが Azure Storage の場合:If the default storage is Azure Storage :

      yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
          -files wasbs:///mapper.exe,wasbs:///reducer.exe \
          -mapper mapper.exe \
          -reducer reducer.exe \
          -input /example/data/gutenberg/davinci.txt \
          -output /example/wordcountout
      
    • 既定のストレージが Data Lake Storage Gen1 の場合:If the default storage is Data Lake Storage Gen1 :

      yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
          -files adl:///mapper.exe,adl:///reducer.exe \
          -mapper mapper.exe \
          -reducer reducer.exe \
          -input /example/data/gutenberg/davinci.txt \
          -output /example/wordcountout
      
    • 既定のストレージが Data Lake Storage Gen2 の場合:If the default storage is Data Lake Storage Gen2 :

      yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
          -files abfs:///mapper.exe,abfs:///reducer.exe \
          -mapper mapper.exe \
          -reducer reducer.exe \
          -input /example/data/gutenberg/davinci.txt \
          -output /example/wordcountout
      

    次の一覧では、各パラメーターおよびオプションによって表されているものについて説明します。The following list describes what each parameter and option represents:

    パラメーターParameter 説明Description
    hadoop-streaming.jarhadoop-streaming.jar ストリーミング MapReduce 機能が含まれる jar ファイルを指定します。Specifies the jar file that contains the streaming MapReduce functionality.
    -files-files このジョブに対する mapper.exe ファイルと reducer.exe ファイルを指定します。Specifies the mapper.exe and reducer.exe files for this job. 各ファイルの前の wasbs:///adl:///、または abfs:/// のプロトコル宣言は、クラスターの既定の記憶域のルートへのパスです。The wasbs:///, adl:///, or abfs:/// protocol declaration before each file is the path to the root of default storage for the cluster.
    -mapper-mapper マッパーが実装されているファイルを指定します。Specifies the file that implements the mapper.
    -reducer-reducer レジューサが実装されているファイルを指定します。Specifies the file that implements the reducer.
    -input-input 入力データを指定します。Specifies the input data.
    -output-output 出力ディレクトリを指定します。Specifies the output directory.
  3. MapReduce ジョブが完了したら、次のコマンドを使用して結果を表示します。Once the MapReduce job completes, use the following command to view the results:

    hdfs dfs -text /example/wordcountout/part-00000
    

    次のテキストは、このコマンドによって返されるデータの例です。The following text is an example of the data returned by this command:

    you     1128
    young   38
    younger 1
    youngest        1
    your    338
    yours   4
    yourself        34
    yourselves      3
    youth   17
    

ジョブの実行: PowerShell の使用Run a job: Using PowerShell

次の PowerShell スクリプトを使用して、MapReduce ジョブを実行し、結果をダウンロードします。Use the following PowerShell script to run a MapReduce job and download the results.

# Login to your Azure subscription
$context = Get-AzContext
if ($context -eq $null) 
{
    Connect-AzAccount
}
$context

# Get HDInsight info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$creds=Get-Credential -Message "Enter the login for the cluster"

# Path for job output
$outputPath="/example/wordcountoutput"

# Progress indicator
$activity="C# MapReduce example"
Write-Progress -Activity $activity -Status "Getting cluster information..."
#Get HDInsight info so we can get the resource group, storage, etc.
$clusterInfo = Get-AzHDInsightCluster -ClusterName $clusterName
$resourceGroup = $clusterInfo.ResourceGroup
$storageActArr=$clusterInfo.DefaultStorageAccount.split('.')
$storageAccountName=$storageActArr[0]
$storageType=$storageActArr[1]

# Progress indicator
#Define the MapReduce job
# Note: using "/mapper.exe" and "/reducer.exe" looks in the root
#       of default storage.
$jobDef=New-AzHDInsightStreamingMapReduceJobDefinition `
    -Files "/mapper.exe","/reducer.exe" `
    -Mapper "mapper.exe" `
    -Reducer "reducer.exe" `
    -InputPath "/example/data/gutenberg/davinci.txt" `
    -OutputPath $outputPath

# Start the job
Write-Progress -Activity $activity -Status "Starting MapReduce job..."
$job=Start-AzHDInsightJob `
    -ClusterName $clusterName `
    -JobDefinition $jobDef `
    -HttpCredential $creds

#Wait for the job to complete
Write-Progress -Activity $activity -Status "Waiting for the job to complete..."
Wait-AzHDInsightJob `
    -ClusterName $clusterName `
    -JobId $job.JobId `
    -HttpCredential $creds

Write-Progress -Activity $activity -Completed

# Download the output 
if($storageType -eq 'azuredatalakestore') {
    # Azure Data Lake Store
    # Fie path is the root of the HDInsight storage + $outputPath
    $filePath=$clusterInfo.DefaultStorageRootPath + $outputPath + "/part-00000"
    Export-AzDataLakeStoreItem `
        -Account $storageAccountName `
        -Path $filePath `
        -Destination output.txt
} else {
    # Az.Storage account
    # Get the container
    $container=$clusterInfo.DefaultStorageContainer
    #NOTE: This assumes that the storage account is in the same resource
    #      group as HDInsight. If it is not, change the
    #      --ResourceGroupName parameter to the group that contains storage.
    $storageAccountKey=(Get-AzStorageAccountKey `
        -Name $storageAccountName `
    -ResourceGroupName $resourceGroup)[0].Value

    #Create a storage context
    $context = New-AzStorageContext `
        -StorageAccountName $storageAccountName `
        -StorageAccountKey $storageAccountKey
    # Download the file
    Get-AzStorageBlobContent `
        -Blob 'example/wordcountoutput/part-00000' `
        -Container $container `
        -Destination output.txt `
        -Context $context
}

このスクリプトには、クラスター ログインのアカウント名とパスワードに加え、HDInsight のクラスター名が求められます。This script prompts you for the cluster login account name and password, along with the HDInsight cluster name. ジョブが完了すると、出力が output.txt というファイルにダウンロードされます。Once the job completes, the output is downloaded to a file named output.txt . 次のテキストは、output.txtファイル内のデータの例です。The following text is an example of the data in the output.txt file:

you     1128
young   38
younger 1
youngest        1
your    338
yours   4
yourself        34
yourselves      3
youth   17

次のステップNext steps