チュートリアル: .NET for Apache Spark を使用した構造化ストリーミングTutorial: Structured Streaming with .NET for Apache Spark

このチュートリアルでは、.NET for Apache Spark を使用して Spark 構造化ストリーミングを呼び出す方法について説明します。This tutorial teaches you how to invoke Spark Structured Streaming using .NET for Apache Spark. Spark 構造化ストリーミングは、リアルタイム データ ストリームを処理するための Apache Spark のサポートです。Spark Structured Streaming is Apache Spark's support for processing real-time data streams. ストリーム処理とは、ライブ データの生成中にそれを分析することを意味します。Stream processing means analyzing live data as it's being produced.

このチュートリアルでは、次の作業を行う方法について説明します。In this tutorial, you learn how to:

  • .NET for Apache Spark アプリケーションを作成して実行するCreate and run a .NET for Apache Spark application
  • netcat を使用してデータ ストリームを作成するUse netcat to create a data stream
  • ユーザー定義関数と SparkSQL を使用してストリーミング データを分析するUse user-defined functions and SparkSQL to analyze streaming data

前提条件Prerequisites

これが初めての .NET for Apache Spark アプリケーションである場合は、基本について理解を深めるために、概要のチュートリアルに関する記事から始めてください。If this is your first .NET for Apache Spark application, start with the Getting Started tutorial to become familiar with the basics.

コンソール アプリケーションを作成するCreate a console application

  1. コマンド プロンプトで、次のコマンドを実行して、新しいコンソール アプリケーションを作成します。In your command prompt, run the following commands to create a new console application:

    dotnet new console -o mySparkStreamingApp
    cd mySparkStreamingApp
    

    dotnet コマンドで、種類が consolenew アプリケーションを作成します。The dotnet command creates a new application of type console for you. -o パラメーターによって、mySparkStreamingApp という名前のディレクトリが作成され、そこにアプリに必要なファイルが追加されます。The -o parameter creates a directory named mySparkStreamingApp where your app is stored and populates it with the required files. cd mySparkStreamingApp コマンドで、ディレクトリを、先ほど作成したアプリ ディレクトリに変更します。The cd mySparkStreamingApp command changes the directory to the app directory you just created.

  2. アプリで .NET for Apache Spark を使用するには、Microsoft.Spark パッケージをインストールします。To use .NET for Apache Spark in an app, install the Microsoft.Spark package. コンソールで、次のコマンドを実行します。In your console, run the following command:

    dotnet add package Microsoft.Spark
    

データ ストリームを確立して接続するEstablish and connect to a data stream

ストリーム処理をテストする一般的な方法の 1 つとして、netcat を使用する方法があります。One popular way to test stream processing is through netcat. netcat (nc とも呼ばれます) を使用すると、ネットワーク接続に対する読み取りと書き込みを行うことができます。netcat (also known as nc) allows you to read from and write to network connections. netcat とのネットワーク接続を確立するには、ターミナル ウィンドウを使用します。You establish a network connection with netcat through a terminal window.

netcat を使用してデータ ストリームを作成するCreate a data stream with netcat

  1. netcat をダウンロードしますDownload netcat. 次に、zip ダウンロードからファイルを抽出し、抽出したディレクトリを "PATH" 環境変数に追加します。Then, extract the file from the zip download and append the directory you extracted to your "PATH" environment variable.

  2. 新しい接続を開始するには、新しいコンソールを開き、localhost にポート 9999 で接続する次のコマンドを実行します。To start a new connection, open a new console and run the following command which connects to localhost on port 9999.

    Windows の場合:On Windows:

    nc -vvv -l -p 9999
    

    Linux の場合:On Linux:

    nc -lk 9999
    

    Spark プログラムでは、このコマンド プロンプトに入力する入力がリッスンされます。Your Spark program listens for the input you type into this command prompt.

SparkSession を作成するCreate a SparkSession

  1. 次の追加の using ステートメントを、mySparkStreamingAppProgram.cs ファイルの先頭に追加します。Add the following additional using statements to the top of the Program.cs file in mySparkStreamingApp:

    using System;
    using Microsoft.Spark.Sql;
    using Microsoft.Spark.Sql.Streaming;
    using static Microsoft.Spark.Sql.Functions;
    
  2. 新しい SparkSession を作成するために、次のコードを Main メソッドに追加します。Add the following code to your Main method to create a new SparkSession. Spark セッションは、Dataset API と DataFrame API を使用して Spark をプログラミングするためのエントリ ポイントです。The Spark Session is the entry point to programming Spark with the Dataset and DataFrame API.

    SparkSession spark = SparkSession
         .Builder()
         .AppName("Streaming example with a UDF")
         .GetOrCreate();
    

    上で作成した spark オブジェクトを呼び出すことで、プログラムの至るところで Spark と DataFrame の機能にアクセスできます。Calling the spark object created above allows you to access Spark and DataFrame functionality throughout your program.

ReadStream () を使用してストリームに接続するConnect to a stream with ReadStream()

ReadStream() メソッドによって、ストリーミング データを DataFrame として読み取るために使用できる DataStreamReader が返されます。The ReadStream() method returns a DataStreamReader that can be used to read streaming data in as a DataFrame. ストリーミング データを予期する場所を Spark アプリに通知する、ホストとポートの情報が含まれています。Include the host and port information to tell your Spark app where to expect its streaming data.

DataFrame lines = spark
    .ReadStream()
    .Format("socket")
    .Option("host", hostname)
    .Option("port", port)
    .Load();

ユーザー定義関数を登録するRegister a user-defined function

Spark アプリケーションで UDF ("ユーザー定義関数") を使用して、データの計算と分析を実行できます。You can use UDFs, user-defined functions, in Spark applications to perform calculations and analysis on your data.

udfArray という名前の UDF を登録するために、次のコードを Main メソッドに追加します。Add the following code to your Main method to register a UDF called udfArray.

Func<Column, Column> udfArray =
    Udf<string, string[]>((str) => new string[] { str, $"{str} {str.Length}" });

この UDF によって、netcat ターミナルから受け取った各文字列を処理して、元の文字列 (str に含まれています) と、その後ろに元の文字列の長さに連結された元の文字列が続く配列が生成されます。This UDF processes each string it receives from the netcat terminal to produce an array that includes the original string (contained in str), followed by the original string concatenated with the length of the original string.

たとえば、netcat ターミナルに Hello world と入力すると、次のような配列が生成されます。For example, entering Hello world in the netcat terminal produces an array where:

  • array[0] = Hello worldarray[0] = Hello world
  • array[1] = Hello world 11array[1] = Hello world 11

SparkSQL を使用するUse SparkSQL

SparkSQL を使用して、DataFrame に格納されているデータに対してさまざまな関数を実行します。Use SparkSQL to perform various functions on the data stored in your DataFrame. DataFrame の各行に対して 1 つの UDF を適用するために、複数の UDF と SparkSQL を組み合わせるのが一般的です。It's common to combine UDFs and SparkSQL to apply a UDF to each row of a DataFrame.

DataFrame arrayDF = lines.Select(Explode(udfArray(lines["value"])));

このコード スニペットでは、udfArray を DataFrame 内の各値に適用します。値は、netcat ターミナルから読み取られた各文字列を表します。This code snippet applies udfArray to each value in your DataFrame, which represents each string read from your netcat terminal. SparkSQL メソッド Explode を適用して、配列の各エントリを各行に配置します。Apply the SparkSQL method Explode to put each entry of your array in its own row. 最後に、Select を使用して、生成された列を新しい DataFrame arrayDF に配置します。Finally, use Select to place the columns you've produced in the new DataFrame arrayDF.

ストリームを表示するDisplay your stream

WriteStream() を使用して、出力の特性を確立します (コンソールへの結果の出力や、最新の出力のみの表示など)。Use WriteStream() to establish characteristics of your output, such as printing results to the console and displaying only the most recent output.

StreamingQuery query = arrayDf
    .WriteStream()
    .Format("console")
    .Start();

コードを実行するRun your code

Spark の構造化ストリーミングでは、一連の小さな バッチ を通してデータが処理されます。Structured streaming in Spark processes data through a series of small batches. プログラムを実行すると、netcat 接続を確立するコマンド プロンプトで入力を開始できます。When you run your program, the command prompt where you establish the netcat connection allows you to start typing. そのコマンド プロンプトでデータを入力して Enter キーを押すたびに、Spark によって入力がバッチとみなされ、UDF が実行されます。Each time you press the Enter key after typing data in that command prompt, Spark considers your entry a batch and runs the UDF.

spark-submit を使用してアプリを実行するUse spark-submit to run your app

新しい netcat セッションを開始した後、新しいターミナルを開き、次のコマンドのように spark-submit コマンドを実行します。After starting a new netcat session, open a new terminal and run your spark-submit command, similar to the following command:

spark-submit --class org.apache.spark.deploy.dotnet.DotnetRunner --master local /path/to/microsoft-spark-<spark_majorversion-spark_minorversion>_<scala_majorversion.scala_minorversion>-<spark_dotnet_version>.jar Microsoft.Spark.CSharp.Examples.exe Sql.Streaming.StructuredNetworkCharacterCount localhost 9999

注意

必ず上記のコマンドを実際の Microsoft Spark jar ファイルへのパスに更新してください。Be sure to update the above command with the actual path to your Microsoft Spark jar file. 上記のコマンドでは、netcat サーバーが localhost ポート 9999 で実行されていることも前提としています。The above command also assumes your netcat server is running on localhost port 9999.

コードの入手Get the code

このチュートリアルでは StructuredNetworkCharacterCount.cs の例を使用していますが、GitHub には、他に次の 3 つの完全なストリームの処理例があります。This tutorial uses the StructuredNetworkCharacterCount.cs example, but there are three other full stream processing examples on GitHub:

次のステップNext steps

次の記事に進んで、.NET for Apache Spark アプリケーションを Databricks にデプロイする方法を確認してください。Advance to the next article to learn how to deploy your .NET for Apache Spark application to Databricks.