Use C# with MapReduce streaming on Hadoop in HDInsight

Learn how to use C# to create a MapReduce solution on HDInsight.

Important

Linux is the only operating system used on HDInsight version 3.4 or greater. For more information, see HDInsight component versioning.

Hadoop streaming is a utility that allows you to run MapReduce jobs using a script or executable. In this example, .NET is used to implement the mapper and reducer for a word count solution.

.NET on HDInsight

Linux-based HDInsight clusters use Mono (https://mono-project.com) to run .NET applications. Mono version 4.2.1 is included with HDInsight version 3.5. For more information on the version of Mono included with HDInsight, see HDInsight component versions. To use a specific version of Mono, see the Install or update Mono document.

For more information on Mono compatibility with .NET Framework versions, see Mono compatibility.

How Hadoop streaming works

The basic process used for streaming in this document is as follows:

  1. Hadoop passes data to the mapper (mapper.exe in this example) on STDIN.
  2. The mapper processes the data, and emits tab-delimited key/value pairs to STDOUT.
  3. The output is read by Hadoop, and then passed to the reducer (reducer.exe in this example) on STDIN.
  4. The reducer reads the tab-delimited key/value pairs, processes the data, and then emits the result as tab-delimited key/value pairs on STDOUT.
  5. The output is read by Hadoop and written to the output directory.

For more information on streaming, see Hadoop Streaming (https://hadoop.apache.org/docs/r2.7.1/hadoop-streaming/HadoopStreaming.html).

Prerequisites

  • A familiarity with writing and building C# code that targets .NET Framework 4.5. The steps in this document use Visual Studio 2017.

  • A way to upload .exe files to the cluster. The steps in this document use the Data Lake Tools for Visual Studio to upload the files to primary storage for the cluster.

  • Azure PowerShell or an SSH client.

  • A Hadoop on HDInsight cluster. For more information on creating a cluster, see Create an HDInsight cluster.

Create the mapper

In Visual Studio, create a new Console application named mapper. Use the following code for the application:

using System;
using System.Text.RegularExpressions;

namespace mapper
{
    class Program
    {
        static void Main(string[] args)
        {
            string line;
            //Hadoop passes data to the mapper on STDIN
            while((line = Console.ReadLine()) != null)
            {
                // We only want words, so strip out punctuation, numbers, etc.
                var onlyText = Regex.Replace(line, @"\.|;|:|,|[0-9]|'", "");
                // Split at whitespace.
                var words = Regex.Matches(onlyText, @"[\w]+");
                // Loop over the words
                foreach(var word in words)
                {
                    //Emit tab-delimited key/value pairs.
                    //In this case, a word and a count of 1.
                    Console.WriteLine("{0}\t1",word);
                }
            }
        }
    }
}

After creating the application, build it to produce the /bin/Debug/mapper.exe file in the project directory.

Create the reducer

In Visual Studio, create a new Console application named reducer. Use the following code for the application:

using System;
using System.Collections.Generic;

namespace reducer
{
    class Program
    {
        static void Main(string[] args)
        {
            //Dictionary for holding a count of words
            Dictionary<string, int> words = new Dictionary<string, int>();

            string line;
            //Read from STDIN
            while ((line = Console.ReadLine()) != null)
            {
                // Data from Hadoop is tab-delimited key/value pairs
                var sArr = line.Split('\t');
                // Get the word
                string word = sArr[0];
                // Get the count
                int count = Convert.ToInt32(sArr[1]);

                //Do we already have a count for the word?
                if(words.ContainsKey(word))
                {
                    //If so, increment the count
                    words[word] += count;
                } else
                {
                    //Add the key to the collection
                    words.Add(word, count);
                }
            }
            //Finally, emit each word and count
            foreach (var word in words)
            {
                //Emit tab-delimited key/value pairs.
                //In this case, a word and a count of 1.
                Console.WriteLine("{0}\t{1}", word.Key, word.Value);
            }
        }
    }
}

After creating the application, build it to produce the /bin/Debug/reducer.exe file in the project directory.

Upload to storage

  1. In Visual Studio, open Server Explorer.

  2. Expand Azure, and then expand HDInsight.

  3. If prompted, enter your Azure subscription credentials, and then click Sign In.

  4. Expand the HDInsight cluster that you wish to deploy this application to. An entry with the text (Default Storage Account) is listed.

    Server Explorer showing the storage account for the cluster

    • If this entry can be expanded, you are using an Azure Storage Account as default storage for the cluster. To view the files on the default storage for the cluster, expand the entry and then double-click the (Default Container).

    • If this entry cannot be expanded, you are using Azure Data Lake Store as the default storage for the cluster. To view the files on the default storage for the cluster, double-click the (Default Storage Account) entry.

  5. To upload the .exe files, use one of the following methods:

    • If using an Azure Storage Account, click the upload icon, and then browse to the bin\debug folder for the mapper project. Finally, select the mapper.exe file and click Ok.

      upload icon

    • If using Azure Data Lake Store, right-click an empty area in the file listing, and then select Upload. Finally, select the mapper.exe file and click Open.

      Once the mapper.exe upload has finished, repeat the upload process for the reducer.exe file.

Run a job: Using an SSH session

  1. Use SSH to connect to the HDInsight cluster. For more information, see Use SSH with HDInsight.

  2. Use one of the following commands to start the MapReduce job:

    • If using Data Lake Store as default storage:

      yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar -files adl:///mapper.exe,adl:///reducer.exe -mapper mapper.exe -reducer reducer.exe -input /example/data/gutenberg/davinci.txt -output /example/wordcountout
      
    • If using Azure Storage as default storage:

      yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar -files wasb:///mapper.exe,wasb:///reducer.exe -mapper mapper.exe -reducer reducer.exe -input /example/data/gutenberg/davinci.txt -output /example/wordcountout
      

      The following list describes what each parameter does:

    • hadoop-streaming.jar: The jar file that contains the streaming MapReduce functionality.

    • -files: Adds the mapper.exe and reducer.exe files to this job. The adl:/// or wasb:/// before each file is the path to the root of default storage for the cluster.
    • -mapper: Specifies which file implements the mapper.
    • -reducer: Specifies which file implements the reducer.
    • -input: The input data.
    • -output: The output directory.
  3. Once the MapReduce job completes, use the following to view the results:

    hdfs dfs -text /example/wordcountout/part-00000
    

    The following text is an example of the data returned by this command:

     you     1128
     young   38
     younger 1
     youngest        1
     your    338
     yours   4
     yourself        34
     yourselves      3
     youth   17
    

Run a job: Using PowerShell

Use the following PowerShell script to run a MapReduce job and download the results.

# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzureRmSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
    Add-AzureRmAccount
}

# Get HDInsight info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$creds=Get-Credential -Message "Enter the login for the cluster"

# Path for job output
$outputPath="/example/wordcountoutput"

# Progress indicator
$activity="C# MapReduce example"
Write-Progress -Activity $activity -Status "Getting cluster information..."
#Get HDInsight info so we can get the resource group, storage, etc.
$clusterInfo = Get-AzureRmHDInsightCluster -ClusterName $clusterName
$resourceGroup = $clusterInfo.ResourceGroup
$storageActArr=$clusterInfo.DefaultStorageAccount.split('.')
$storageAccountName=$storageActArr[0]
$storageType=$storageActArr[1]

# Progress indicator
#Define the MapReduce job
# Note: using "/mapper.exe" and "/reducer.exe" looks in the root
#       of default storage.
$jobDef=New-AzureRmHDInsightStreamingMapReduceJobDefinition `
    -Files "/mapper.exe","/reducer.exe" `
    -Mapper "mapper.exe" `
    -Reducer "reducer.exe" `
    -InputPath "/example/data/gutenberg/davinci.txt" `
    -OutputPath $outputPath

# Start the job
Write-Progress -Activity $activity -Status "Starting MapReduce job..."
$job=Start-AzureRmHDInsightJob `
    -ClusterName $clusterName `
    -JobDefinition $jobDef `
    -HttpCredential $creds

#Wait for the job to complete
Write-Progress -Activity $activity -Status "Waiting for the job to complete..."
Wait-AzureRmHDInsightJob `
    -ClusterName $clusterName `
    -JobId $job.JobId `
    -HttpCredential $creds

Write-Progress -Activity $activity -Completed

# Download the output 
if($storageType -eq 'azuredatalakestore') {
    # Azure Data Lake Store
    # Fie path is the root of the HDInsight storage + $outputPath
    $filePath=$clusterInfo.DefaultStorageRootPath + $outputPath + "/part-00000"
    Export-AzureRmDataLakeStoreItem `
        -Account $storageAccountName `
        -Path $filePath `
        -Destination output.txt
} else {
    # Azure Storage account
    # Get the container
    $container=$clusterInfo.DefaultStorageContainer
    #NOTE: This assumes that the storage account is in the same resource
    #      group as HDInsight. If it is not, change the
    #      --ResourceGroupName parameter to the group that contains storage.
    $storageAccountKey=(Get-AzureRmStorageAccountKey `
        -Name $storageAccountName `
    -ResourceGroupName $resourceGroup)[0].Value

    #Create a storage context
    $context = New-AzureStorageContext `
        -StorageAccountName $storageAccountName `
        -StorageAccountKey $storageAccountKey
    # Download the file
    Get-AzureStorageBlobContent `
        -Blob 'example/wordcountoutput/part-00000' `
        -Container $container `
        -Destination output.txt `
        -Context $context
}

This script prompts you for the cluster login account name and password, along with the HDInsight cluster name. Once the job completes, the output is downloaded to a file named output.txt. The following text is an example of the data in the output.txt file:

you     1128
young   38
younger 1
youngest        1
your    338
yours   4
yourself        34
yourselves      3
youth   17

Next steps

For more information on using MapReduce with HDInsight, see Use MapReduce with HDInsight.

For information on using C# with Hive and Pig, see Use a C# user-defined function with Hive and Pig.

For information on using C# with Storm on HDInsight, see Develop C# topologies for Storm on HDInsight.