HDInsight의 Apache Hadoop에서 MapReduce 스트리밍으로 C# 사용

HDInsight에서 C#을 사용하여 MapReduce 솔루션을 만드는 방법에 대해 알아보세요.

Apache Hadoop 스트리밍을 사용하면 스크립트 또는 실행 파일을 사용하여 MapReduce 작업을 실행할 수 있습니다. 여기서 .NET은 단어 수 솔루션을 위한 매퍼 및 리듀서를 구현하는데 사용됩니다.

HDInsight에서.NET

HDInsight 클러스터는 Mono(https://mono-project.com)를 사용하여 .NET 애플리케이션을 실행합니다. Mono 버전 4.2.1은 HDInsight 버전 3.6에 포함되어 있습니다. HDInsight에 포함된 Mono 버전에 대한 자세한 내용은 HDInsight 버전에서 사용할 수 있는 Apache Hadoop 구성 요소를 참조하세요.

.NET 프레임워크 버전과 Mono의 호환성에 대한 자세한 내용은 Mono 호환성을 참조하세요.

Hadoop 스트리밍 작동 방식

이 문서의 스트리밍에 사용된 기본 프로세스는 다음과 같습니다.

  1. Hadoop은 STDIN의 매퍼(이 예제에서는 mapper.exe)에 데이터를 전달합니다.
  2. 매퍼가 데이터를 처리하고 탭으로 구분된 키/값 쌍을 STDOUT으로 내보냅니다.
  3. Hadoop에서 출력을 읽은 다음 STDIN 리듀서(이 예제에서는 reducer.exe)로 전달합니다.
  4. 리듀서는 탭으로 구분된 키/값 쌍을 읽고 데이터를 처리한 다음 STDOUT에서 탭으로 구분된 키/값 쌍의 결과를 내보냅니다.
  5. Hadoop에서 이 출력을 읽습니다. 그런 다음 출력 디렉터리에 기록됩니다.

스트리밍에 대한 자세한 내용은 Hadoop 스트리밍을 참조하세요.

필수 조건

  • Visual Studio.

  • .NET Framework 4.5를 대상으로 하는 C# 코드 작성 및 빌드에 대해 잘 알고 있어야 합니다.

  • 클러스터로 .exe 파일을 업로드하는 방법. 이 문서의 단계는 Data Lake Tools for Visual Studio를 사용하여 클러스터의 기본 스토리지로 파일을 업로드합니다.

  • PowerShell을 사용하는 경우 Az Module이 필요합니다.

  • HDInsight의 Apache Hadoop 클러스터. Linux에서 HDInsight 시작을 참조하세요.

  • 클러스터 기본 스토리지에 대한 URI 체계입니다. 이 체계는 wasb://Azure Storage, abfs://Azure Data Lake Storage Gen2 또는 adl://Azure Data Lake Storage Gen1에 해당됩니다. Azure Storage 또는 Data Lake Storage Gen2에 보안 전송을 사용하는 경우 URI는 각각 wasbs:// 또는 abfss://가 됩니다.

매퍼 만들기

Visual Studio에서 매퍼라는 새 .NET 프레임워크 콘솔 애플리케이션을 만듭니다. 애플리케이션에 대해 다음 코드를 사용합니다.

using System;
using System.Text.RegularExpressions;

namespace mapper
{
    class Program
    {
        static void Main(string[] args)
        {
            string line;
            //Hadoop passes data to the mapper on STDIN
            while((line = Console.ReadLine()) != null)
            {
                // We only want words, so strip out punctuation, numbers, etc.
                var onlyText = Regex.Replace(line, @"\.|;|:|,|[0-9]|'", "");
                // Split at whitespace.
                var words = Regex.Matches(onlyText, @"[\w]+");
                // Loop over the words
                foreach(var word in words)
                {
                    //Emit tab-delimited key/value pairs.
                    //In this case, a word and a count of 1.
                    Console.WriteLine("{0}\t1",word);
                }
            }
        }
    }
}

애플리케이션을 만든 후 빌드하여 프로젝트 디렉터리에 /bin/Debug/mapper.exe 파일을 생성합니다.

리듀서 만들기

Visual Studio에서 리듀서라는 새 .NET 프레임워크 콘솔 애플리케이션을 만듭니다. 애플리케이션에 대해 다음 코드를 사용합니다.

using System;
using System.Collections.Generic;

namespace reducer
{
    class Program
    {
        static void Main(string[] args)
        {
            //Dictionary for holding a count of words
            Dictionary<string, int> words = new Dictionary<string, int>();

            string line;
            //Read from STDIN
            while ((line = Console.ReadLine()) != null)
            {
                // Data from Hadoop is tab-delimited key/value pairs
                var sArr = line.Split('\t');
                // Get the word
                string word = sArr[0];
                // Get the count
                int count = Convert.ToInt32(sArr[1]);

                //Do we already have a count for the word?
                if(words.ContainsKey(word))
                {
                    //If so, increment the count
                    words[word] += count;
                } else
                {
                    //Add the key to the collection
                    words.Add(word, count);
                }
            }
            //Finally, emit each word and count
            foreach (var word in words)
            {
                //Emit tab-delimited key/value pairs.
                //In this case, a word and a count of 1.
                Console.WriteLine("{0}\t{1}", word.Key, word.Value);
            }
        }
    }
}

애플리케이션을 만든 후 빌드하여 프로젝트 디렉터리에 /bin/Debug/reducer.exe 파일을 생성합니다.

스토리지에 업로드

다음으로, 매퍼리듀서 애플리케이션을 HDInsight 스토리지에 업로드해야 합니다.

  1. Visual Studio에서 >서버 탐색기보기를 선택합니다.

  2. 마우스 오른쪽 단추로 Azure를 클릭하고 Microsoft Azure 구독에 연결...을 선택한 다음 로그인 프로세스를 완료합니다.

  3. 이 애플리케이션을 배포하려는 HDInsight 클러스터를 확장합니다. 텍스트가 포함된 항목(기본 Storage 계정)이 목록에 표시됩니다.

    Storage account, HDInsight cluster, Server Explorer, Visual Studio.

    • (기본 Storage 계정) 항목을 확장할 수 있는 경우 클러스터용 기본 스토리지로 Azure Storage 계정을 사용하는 것입니다. 클러스터의 기본 스토리지에 있는 파일을 보려면 항목을 확장한 다음 (기본 컨테이너)를 두 번 클릭합니다.

    • (기본 Storage 계정) 항목을 확장할 수 없는 경우 클러스터용 기본 스토리지로 Azure Data Lake Storage를 사용하는 것입니다. 클러스터의 기본 스토리지에 있는 파일을 보려면 항목을 확장한 다음 (기본 Storage 계정)을 두 번 클릭합니다.

  4. .exe 파일을 업로드하려면 다음 방법 중 하나를 사용합니다.

    • Azure Storage 계정을 사용하는 경우 Blob 업로드 아이콘을 선택합니다.

      HDInsight upload icon for mapper, Visual Studio.

      새 파일 업로드 대화 상자에서 파일 이름 아래의 찾아보기를 선택합니다. Blob 업로드 대화 상자에서 매퍼 프로젝트의 bin\debug 폴더로 이동하여 mapper.exe 파일을 선택합니다. 마지막으로 열기를 선택하고 확인을 선택하여 업로드를 완료합니다.

    • Azure Data Lake Storage의 경우 파일 목록에서 빈 영역을 마우스 오른쪽 단추로 클릭한 다음 업로드를 선택합니다. 마지막으로 mapper.exe 파일을 선택하고 열기를 선택합니다.

    mapper.exe 업로드가 완료되면 reducer.exe 파일의 업로드 프로세스를 반복합니다.

작업 실행: SSH 세션 사용

다음 절차에서는 SSH 세션을 사용하여 MapReduce 작업을 실행하는 방법에 대해 설명합니다.

  1. ssh command 명령을 사용하여 클러스터에 연결합니다. CLUSTERNAME을 클러스터 이름으로 바꿔서 아래 명령을 편집하고, 다음 명령을 입력합니다.

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. 다음 명령 중 하나를 사용하여 MapReduce 작업을 시작합니다.

    • 기본 스토리지가 Azure Storage인 경우:

      yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
          -files wasbs:///mapper.exe,wasbs:///reducer.exe \
          -mapper mapper.exe \
          -reducer reducer.exe \
          -input /example/data/gutenberg/davinci.txt \
          -output /example/wordcountout
      
    • 기본 스토리지가 Data Lake Storage Gen1인 경우:

      yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
          -files adl:///mapper.exe,adl:///reducer.exe \
          -mapper mapper.exe \
          -reducer reducer.exe \
          -input /example/data/gutenberg/davinci.txt \
          -output /example/wordcountout
      
    • 기본 스토리지가 Data Lake Storage Gen2인 경우:

      yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
          -files abfs:///mapper.exe,abfs:///reducer.exe \
          -mapper mapper.exe \
          -reducer reducer.exe \
          -input /example/data/gutenberg/davinci.txt \
          -output /example/wordcountout
      

    다음 목록에서는 각 매개 변수 및 옵션이 나타내는 대상에 대해 설명합니다.

    매개 변수 설명
    hadoop-streaming.jar 스트리밍 MapReduce 기능이 포함된 jar 파일을 지정합니다.
    -파일 이 작업에 대한 mapper.exereducer.exe 파일을 지정합니다. 각 파일 앞의 wasbs:///, adl:///, 또는 abfs:/// 프로토콜 선언은 클러스터의 기본 스토리지 루트에 대한 경로입니다.
    -매퍼 매퍼를 구현하는 파일을 지정합니다.
    -리듀서 리듀서를 구현하는 파일을 지정합니다.
    -입력 입력 데이터를 지정합니다.
    -출력 출력 디렉터리를 지정합니다.
  3. MapReduce 작업이 완료되면 다음 명령을 사용하여 결과를 확인합니다.

    hdfs dfs -text /example/wordcountout/part-00000
    

    다음 텍스트는 이 명령에서 반환된 데이터의 예입니다.

    you     1128
    young   38
    younger 1
    youngest        1
    your    338
    yours   4
    yourself        34
    yourselves      3
    youth   17
    

작업 실행: PowerShell 사용

다음 PowerShell 스크립트를 사용하여 MapReduce 작업을 실행하고 결과를 다운로드합니다.

# Login to your Azure subscription
$context = Get-AzContext
if ($context -eq $null) 
{
    Connect-AzAccount
}
$context

# Get HDInsight info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$creds=Get-Credential -Message "Enter the login for the cluster"

# Path for job output
$outputPath="/example/wordcountoutput"

# Progress indicator
$activity="C# MapReduce example"
Write-Progress -Activity $activity -Status "Getting cluster information..."
#Get HDInsight info so we can get the resource group, storage, etc.
$clusterInfo = Get-AzHDInsightCluster -ClusterName $clusterName
$resourceGroup = $clusterInfo.ResourceGroup
$storageActArr=$clusterInfo.DefaultStorageAccount.split('.')
$storageAccountName=$storageActArr[0]
$storageType=$storageActArr[1]

# Progress indicator
#Define the MapReduce job
# Note: using "/mapper.exe" and "/reducer.exe" looks in the root
#       of default storage.
$jobDef=New-AzHDInsightStreamingMapReduceJobDefinition `
    -Files "/mapper.exe","/reducer.exe" `
    -Mapper "mapper.exe" `
    -Reducer "reducer.exe" `
    -InputPath "/example/data/gutenberg/davinci.txt" `
    -OutputPath $outputPath

# Start the job
Write-Progress -Activity $activity -Status "Starting MapReduce job..."
$job=Start-AzHDInsightJob `
    -ClusterName $clusterName `
    -JobDefinition $jobDef `
    -HttpCredential $creds

#Wait for the job to complete
Write-Progress -Activity $activity -Status "Waiting for the job to complete..."
Wait-AzHDInsightJob `
    -ClusterName $clusterName `
    -JobId $job.JobId `
    -HttpCredential $creds

Write-Progress -Activity $activity -Completed

# Download the output 
if($storageType -eq 'azuredatalakestore') {
    # Azure Data Lake Store
    # Fie path is the root of the HDInsight storage + $outputPath
    $filePath=$clusterInfo.DefaultStorageRootPath + $outputPath + "/part-00000"
    Export-AzDataLakeStoreItem `
        -Account $storageAccountName `
        -Path $filePath `
        -Destination output.txt
} else {
    # Az.Storage account
    # Get the container
    $container=$clusterInfo.DefaultStorageContainer
    #NOTE: This assumes that the storage account is in the same resource
    #      group as HDInsight. If it is not, change the
    #      --ResourceGroupName parameter to the group that contains storage.
    $storageAccountKey=(Get-AzStorageAccountKey `
        -Name $storageAccountName `
    -ResourceGroupName $resourceGroup)[0].Value

    #Create a storage context
    $context = New-AzStorageContext `
        -StorageAccountName $storageAccountName `
        -StorageAccountKey $storageAccountKey
    # Download the file
    Get-AzStorageBlobContent `
        -Blob 'example/wordcountoutput/part-00000' `
        -Container $container `
        -Destination output.txt `
        -Context $context
}

이 스크립트는 클러스터 로그인 계정 이름과 암호와 HDInsight 클러스터 이름을 묻습니다. 작업이 완료되면 출력이 output.txt라는 파일로 다운로드됩니다. 다음 텍스트는 output.txt 파일의 데이터 예제입니다.

you     1128
young   38
younger 1
youngest        1
your    338
yours   4
yourself        34
yourselves      3
youth   17

다음 단계