Eseguire processi MapReduce con HDInsight .NET SDK

Informazioni su come inviare processi MapReduce con HDInsight .NET SDK. I cluster HDInsight includono un file JAR con alcuni esempi di MapReduce. Il file JAR è /example/jars/hadoop-mapreduce-examples.jar. Uno degli esempi è wordcount. Per inviare un processo wordcount, è necessario sviluppare un'applicazione console C#. Il processo legge il file /example/data/gutenberg/davinci.txt e restituisce i risultati in /example/data/davinciwordcount. Se si vuole eseguire di nuovo l'applicazione, è necessario pulire la cartella di output.

Nota

I passaggi descritti in questo articolo devono essere eseguiti da un client Windows. Per informazioni sull'uso di un client Linux, OS X o Unix con Hive, usare il selettore di schede visualizzato all'inizio dell'articolo.

Prerequisiti

Per eseguire le procedure descritte nell'articolo sono necessari gli elementi seguenti:

Inviare processi MapReduce mediante HDInsight .NET SDK

HDInsight .NET SDK fornisce librerie client .NET che semplificano l'uso dei cluster HDInsight da .NET.

Inviare i processi

  1. Creare un'applicazione console C# in Visual Studio.
  2. Dalla console di Gestione pacchetti NuGet eseguire il comando seguente:

     Install-Package Microsoft.Azure.Management.HDInsight.Job
    
  3. Usare il codice seguente:

     using System.Collections.Generic;
     using System.IO;
     using System.Text;
     using System.Threading;
     using Microsoft.Azure.Management.HDInsight.Job;
     using Microsoft.Azure.Management.HDInsight.Job.Models;
     using Hyak.Common;
     using Microsoft.WindowsAzure.Storage;
     using Microsoft.WindowsAzure.Storage.Blob;
    
     namespace SubmitHDInsightJobDotNet
     {
         class Program
         {
             private static HDInsightJobManagementClient _hdiJobManagementClient;
    
             private const string existingClusterName = "<Your HDInsight Cluster Name>";
             private const string existingClusterUri = existingClusterName + ".azurehdinsight.net";
             private const string existingClusterUsername = "<Cluster Username>";
             private const string existingClusterPassword = "<Cluster User Password>";
    
             private const string defaultStorageAccountName = "<Default Storage Account Name>"; //<StorageAccountName>.blob.core.windows.net
             private const string defaultStorageAccountKey = "<Default Storage Account Key>";
             private const string defaultStorageContainerName = "<Default Blob Container Name>";
    
             private const string sourceFile = "/example/data/gutenberg/davinci.txt";  
             private const string outputFolder = "/example/data/davinciwordcount";
    
             static void Main(string[] args)
             {
                 System.Console.WriteLine("The application is running ...");
    
                 var clusterCredentials = new BasicAuthenticationCloudCredentials { Username = existingClusterUsername, Password = existingClusterPassword };
                 _hdiJobManagementClient = new HDInsightJobManagementClient(existingClusterUri, clusterCredentials);
    
                 SubmitMRJob();
    
                 System.Console.WriteLine("Press ENTER to continue ...");
                 System.Console.ReadLine();
             }
    
             private static void SubmitMRJob()
             {
                 List<string> args = new List<string> { { "/example/data/gutenberg/davinci.txt" }, { "/example/data/davinciwordcount" } };
    
                 var paras = new MapReduceJobSubmissionParameters
                 {
                     JarFile = @"/example/jars/hadoop-mapreduce-examples.jar",
                     JarClass = "wordcount",
                     Arguments = args
                 };
    
                 System.Console.WriteLine("Submitting the MR job to the cluster...");
                 var jobResponse = _hdiJobManagementClient.JobManagement.SubmitMapReduceJob(paras);
                 var jobId = jobResponse.JobSubmissionJsonResponse.Id;
                 System.Console.WriteLine("Response status code is " + jobResponse.StatusCode);
                 System.Console.WriteLine("JobId is " + jobId);
    
                 System.Console.WriteLine("Waiting for the job completion ...");
    
                 // Wait for job completion
                 var jobDetail = _hdiJobManagementClient.JobManagement.GetJob(jobId).JobDetail;
                 while (!jobDetail.Status.JobComplete)
                 {
                     Thread.Sleep(1000);
                     jobDetail = _hdiJobManagementClient.JobManagement.GetJob(jobId).JobDetail;
                 }
    
                 // Get job output
                 System.Console.WriteLine("Job output is: ");
                 var storageAccess = new AzureStorageAccess(defaultStorageAccountName, defaultStorageAccountKey,
                     defaultStorageContainerName);
    
                 if (jobDetail.ExitValue == 0)
                 {
                     // Create the storage account object
                     CloudStorageAccount storageAccount = CloudStorageAccount.Parse("DefaultEndpointsProtocol=https;AccountName=" + 
                         defaultStorageAccountName + 
                         ";AccountKey=" + defaultStorageAccountKey);
    
                     // Create the blob client.
                     CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient();
    
                     // Retrieve reference to a previously created container.
                     CloudBlobContainer container = blobClient.GetContainerReference(defaultStorageContainerName);
    
                     CloudBlockBlob blockBlob = container.GetBlockBlobReference(outputFolder.Substring(1) + "/part-r-00000");
    
                     using (var stream = blockBlob.OpenRead())
                     {
                         using (StreamReader reader = new StreamReader(stream))
                         {
                             while (!reader.EndOfStream)
                             {
                                 System.Console.WriteLine(reader.ReadLine());
                             }
                         }
                     }
                 }
                 else
                 {
                     // fetch stderr output in case of failure
                     var output = _hdiJobManagementClient.JobManagement.GetJobErrorLogs(jobId, storageAccess); 
    
                     using (var reader = new StreamReader(output, Encoding.UTF8))
                     {
                         string value = reader.ReadToEnd();
                         System.Console.WriteLine(value);
                     }
    
                 }
             }
         }
     }
    
  4. Premere F5 per eseguire l'applicazione.

Per eseguire nuovamente il processo, è necessario cambiare il nome della cartella di output del processo, che nell'esempio è: "/example/data/davinciwordcount".

Al completamento del processo, l'applicazione stampa il contenuto del file di output "part-r-00000".

Passaggi successivi

Questo articolo ha spiegato vari modi per creare un cluster HDInsight. Per altre informazioni, vedere gli articoli seguenti: