Menjalankan pekerjaan MapReduce menggunakan HDInsight .NET SDK

Pelajari cara mengirimkan pekerjaan MapReduce menggunakan HDInsight .NET SDK. Kluster HDInsight dilengkapi dengan file jar dengan beberapa sampel MapReduce. File jar adalah /example/jars/hadoop-mapreduce-examples.jar. Salah satu sampelnya adalah wordcount. Anda mengembangkan aplikasi konsol C# untuk mengirimkan pekerjaan wordcount. Pekerjaan membaca file /example/data/gutenberg/davinci.txt, dan menghasilkan hasilnya untuk /example/data/davinciwordcount. Jika Anda ingin menjalankan ulang aplikasi, Anda harus membersihkan folder output.

Catatan

Langkah-langkah dalam artikel ini harus dilakukan dari klien Windows. Untuk mengetahui informasi tentang menggunakan klien Linux, OS X, atau Unix untuk bekerja dengan Hive, gunakan pemilih tab yang ditampilkan di bagian atas artikel.

Prasyarat

Kirimkan pekerjaan MapReduce menggunakan HDInsight .NET SDK

HDInsight .NET SDK menyediakan perpustakaan klien .NET, yang membuatnya lebih mudah untuk bekerja dengan kluster HDInsight dari .NET.

  1. Mulai Visual Studio dan buat aplikasi konsol C#.

  2. Buka Tools>NuGet Package ManagerPackage Manager>Console dan masukkan perintah berikut:

    Install-Package Microsoft.Azure.Management.HDInsight.Job
    
  3. Salin kode di bawah ini ke program.cs. Kemudian edit kode dengan mengatur nilai untuk: existingClusterName, existingClusterPassword, defaultStorageAccountName, defaultStorageAccountKey, dan defaultStorageContainerName.

    using System.Collections.Generic;
    using System.IO;
    using System.Text;
    using System.Threading;
    using Microsoft.Azure.Management.HDInsight.Job;
    using Microsoft.Azure.Management.HDInsight.Job.Models;
    using Hyak.Common;
    using Microsoft.WindowsAzure.Storage;
    using Microsoft.WindowsAzure.Storage.Blob;
    
    namespace SubmitHDInsightJobDotNet
    {
        class Program
        {
            private static HDInsightJobManagementClient _hdiJobManagementClient;
    
            private const string existingClusterName = "<Your HDInsight Cluster Name>";
            private const string existingClusterPassword = "<Cluster User Password>";
            private const string defaultStorageAccountName = "<Default Storage Account Name>"; 
            private const string defaultStorageAccountKey = "<Default Storage Account Key>";
            private const string defaultStorageContainerName = "<Default Blob Container Name>";
    
            private const string existingClusterUsername = "admin";
            private const string existingClusterUri = existingClusterName + ".azurehdinsight.net";
            private const string sourceFile = "/example/data/gutenberg/davinci.txt";
            private const string outputFolder = "/example/data/davinciwordcount";
    
            static void Main(string[] args)
            {
                System.Console.WriteLine("The application is running ...");
    
                var clusterCredentials = new BasicAuthenticationCloudCredentials { Username = existingClusterUsername, Password = existingClusterPassword };
                _hdiJobManagementClient = new HDInsightJobManagementClient(existingClusterUri, clusterCredentials);
    
                SubmitMRJob();
    
                System.Console.WriteLine("Press ENTER to continue ...");
                System.Console.ReadLine();
            }
    
            private static void SubmitMRJob()
            {
                List<string> args = new List<string> { { "/example/data/gutenberg/davinci.txt" }, { "/example/data/davinciwordcount" } };
    
                var paras = new MapReduceJobSubmissionParameters
                {
                    JarFile = @"/example/jars/hadoop-mapreduce-examples.jar",
                    JarClass = "wordcount",
                    Arguments = args
                };
    
                System.Console.WriteLine("Submitting the MR job to the cluster...");
                var jobResponse = _hdiJobManagementClient.JobManagement.SubmitMapReduceJob(paras);
                var jobId = jobResponse.JobSubmissionJsonResponse.Id;
                System.Console.WriteLine("Response status code is " + jobResponse.StatusCode);
                System.Console.WriteLine("JobId is " + jobId);
    
                System.Console.WriteLine("Waiting for the job completion ...");
    
                // Wait for job completion
                var jobDetail = _hdiJobManagementClient.JobManagement.GetJob(jobId).JobDetail;
                while (!jobDetail.Status.JobComplete)
                {
                    Thread.Sleep(1000);
                    jobDetail = _hdiJobManagementClient.JobManagement.GetJob(jobId).JobDetail;
                }
    
                // Get job output
                System.Console.WriteLine("Job output is: ");
                var storageAccess = new AzureStorageAccess(defaultStorageAccountName, defaultStorageAccountKey,
                    defaultStorageContainerName);
    
                if (jobDetail.ExitValue == 0)
                {
                    // Create the storage account object
                    CloudStorageAccount storageAccount = CloudStorageAccount.Parse("DefaultEndpointsProtocol=https;AccountName=" +
                        defaultStorageAccountName +
                        ";AccountKey=" + defaultStorageAccountKey);
    
                    // Create the blob client.
                    CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient();
    
                    // Retrieve reference to a previously created container.
                    CloudBlobContainer container = blobClient.GetContainerReference(defaultStorageContainerName);
    
                    CloudBlockBlob blockBlob = container.GetBlockBlobReference(outputFolder.Substring(1) + "/part-r-00000");
    
                    using (var stream = blockBlob.OpenRead())
                    {
                        using (StreamReader reader = new StreamReader(stream))
                        {
                            while (!reader.EndOfStream)
                            {
                                System.Console.WriteLine(reader.ReadLine());
                            }
                        }
                    }
                }
                else
                {
                    // fetch stderr output in case of failure
                    var output = _hdiJobManagementClient.JobManagement.GetJobErrorLogs(jobId, storageAccess);
    
                    using (var reader = new StreamReader(output, Encoding.UTF8))
                    {
                        string value = reader.ReadToEnd();
                        System.Console.WriteLine(value);
                    }
    
                }
            }
        }
    }
    
    
  4. Klik F5 untuk menjalankan aplikasi.

Untuk menjalankan pekerjaan lagi, Anda harus mengubah nama folder output pekerjaan, dalam sampel itu /example/data/davinciwordcount.

Ketika pekerjaan berhasil diselesaikan, aplikasi mencetak konten file output part-r-00000.

Langkah berikutnya

Dalam artikel ini, Anda telah mempelajari beberapa cara untuk membuat kluster Microsoft Azure HDInsight. Untuk mempelajari selengkapnya, lihat dokumen berikut ini: