Run MapReduce jobs using HDInsight .NET SDK

Learn how to submit MapReduce jobs using HDInsight .NET SDK. HDInsight clusters come with a jar file with some MapReduce samples. The jar file is /example/jars/hadoop-mapreduce-examples.jar. One of the samples is wordcount. You develop a C# console application to submit a wordcount job. The job reads the /example/data/gutenberg/davinci.txt file, and outputs the results to /example/data/davinciwordcount. If you want to rerun the application, you must clean up the output folder.

Note

The steps in this article must be performed from a Windows client. For information on using a Linux, OS X, or Unix client to work with Hive, use the tab selector shown on the top of the article.

Prerequisites

Before you begin this article, you must have the following items:

Submit MapReduce jobs using HDInsight .NET SDK

The HDInsight .NET SDK provides .NET client libraries, which makes it easier to work with HDInsight clusters from .NET.

To Submit jobs

  1. Create a C# console application in Visual Studio.
  2. From the Nuget Package Manager Console, run the following command:

     Install-Package Microsoft.Azure.Management.HDInsight.Job
    
  3. Use the following code:

     using System.Collections.Generic;
     using System.IO;
     using System.Text;
     using System.Threading;
     using Microsoft.Azure.Management.HDInsight.Job;
     using Microsoft.Azure.Management.HDInsight.Job.Models;
     using Hyak.Common;
     using Microsoft.WindowsAzure.Storage;
     using Microsoft.WindowsAzure.Storage.Blob;
    
     namespace SubmitHDInsightJobDotNet
     {
         class Program
         {
             private static HDInsightJobManagementClient _hdiJobManagementClient;
    
             private const string existingClusterName = "<Your HDInsight Cluster Name>";
             private const string existingClusterUri = existingClusterName + ".azurehdinsight.net";
             private const string existingClusterUsername = "<Cluster Username>";
             private const string existingClusterPassword = "<Cluster User Password>";
    
             private const string defaultStorageAccountName = "<Default Storage Account Name>"; //<StorageAccountName>.blob.core.windows.net
             private const string defaultStorageAccountKey = "<Default Storage Account Key>";
             private const string defaultStorageContainerName = "<Default Blob Container Name>";
    
             private const string sourceFile = "/example/data/gutenberg/davinci.txt";  
             private const string outputFolder = "/example/data/davinciwordcount";
    
             static void Main(string[] args)
             {
                 System.Console.WriteLine("The application is running ...");
    
                 var clusterCredentials = new BasicAuthenticationCloudCredentials { Username = existingClusterUsername, Password = existingClusterPassword };
                 _hdiJobManagementClient = new HDInsightJobManagementClient(existingClusterUri, clusterCredentials);
    
                 SubmitMRJob();
    
                 System.Console.WriteLine("Press ENTER to continue ...");
                 System.Console.ReadLine();
             }
    
             private static void SubmitMRJob()
             {
                 List<string> args = new List<string> { { "/example/data/gutenberg/davinci.txt" }, { "/example/data/davinciwordcount" } };
    
                 var paras = new MapReduceJobSubmissionParameters
                 {
                     JarFile = @"/example/jars/hadoop-mapreduce-examples.jar",
                     JarClass = "wordcount",
                     Arguments = args
                 };
    
                 System.Console.WriteLine("Submitting the MR job to the cluster...");
                 var jobResponse = _hdiJobManagementClient.JobManagement.SubmitMapReduceJob(paras);
                 var jobId = jobResponse.JobSubmissionJsonResponse.Id;
                 System.Console.WriteLine("Response status code is " + jobResponse.StatusCode);
                 System.Console.WriteLine("JobId is " + jobId);
    
                 System.Console.WriteLine("Waiting for the job completion ...");
    
                 // Wait for job completion
                 var jobDetail = _hdiJobManagementClient.JobManagement.GetJob(jobId).JobDetail;
                 while (!jobDetail.Status.JobComplete)
                 {
                     Thread.Sleep(1000);
                     jobDetail = _hdiJobManagementClient.JobManagement.GetJob(jobId).JobDetail;
                 }
    
                 // Get job output
                 System.Console.WriteLine("Job output is: ");
                 var storageAccess = new AzureStorageAccess(defaultStorageAccountName, defaultStorageAccountKey,
                     defaultStorageContainerName);
    
                 if (jobDetail.ExitValue == 0)
                 {
                     // Create the storage account object
                     CloudStorageAccount storageAccount = CloudStorageAccount.Parse("DefaultEndpointsProtocol=https;AccountName=" + 
                         defaultStorageAccountName + 
                         ";AccountKey=" + defaultStorageAccountKey);
    
                     // Create the blob client.
                     CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient();
    
                     // Retrieve reference to a previously created container.
                     CloudBlobContainer container = blobClient.GetContainerReference(defaultStorageContainerName);
    
                     CloudBlockBlob blockBlob = container.GetBlockBlobReference(outputFolder.Substring(1) + "/part-r-00000");
    
                     using (var stream = blockBlob.OpenRead())
                     {
                         using (StreamReader reader = new StreamReader(stream))
                         {
                             while (!reader.EndOfStream)
                             {
                                 System.Console.WriteLine(reader.ReadLine());
                             }
                         }
                     }
                 }
                 else
                 {
                     // fetch stderr output in case of failure
                     var output = _hdiJobManagementClient.JobManagement.GetJobErrorLogs(jobId, storageAccess); 
    
                     using (var reader = new StreamReader(output, Encoding.UTF8))
                     {
                         string value = reader.ReadToEnd();
                         System.Console.WriteLine(value);
                     }
    
                 }
             }
         }
     }
    
  4. Press F5 to run the application.

To run the job again, you must change the job output folder name, in the sample, it is "/example/data/davinciwordcount".

When the job completes successfully, the application prints the content of the output file "part-r-00000".

Next steps

In this article, you have learned several ways to create an HDInsight cluster. To learn more, see the following articles: