Учебник. Структурированная потоковая передача с помощью .NET для Apache SparkTutorial: Structured Streaming with .NET for Apache Spark

В этом руководстве показано, как запустить структурированную потоковую передачу Spark с помощью .NET для Apache Spark.This tutorial teaches you how to invoke Spark Structured Streaming using .NET for Apache Spark. Структурированная потоковая передача в Apache Spark предназначена для обработки потоков данных в режиме реального времени.Spark Structured Streaming is Apache Spark's support for processing real-time data streams. Потоковая обработка означает анализ актуальных данных по мере их создания.Stream processing means analyzing live data as it's being produced.

В этом руководстве вы узнаете, как:In this tutorial, you learn how to:

  • создать и выполнить приложение .NET для Apache Spark;Create and run a .NET for Apache Spark application
  • создать поток данных с помощью netcat;Use netcat to create a data stream
  • применить пользовательские функции и SparkSQL для анализа потоковых данных.Use user-defined functions and SparkSQL to analyze streaming data

Предварительные требованияPrerequisites

Если это ваше первое приложение .NET для Apache Spark, начните с руководства по началу работы, чтобы ознакомиться с основами.If this is your first .NET for Apache Spark application, start with the Getting Started tutorial to become familiar with the basics.

Создание консольного приложенияCreate a console application

  1. В командной строке выполните следующие команды, чтобы создать новое консольное приложение:In your command prompt, run the following commands to create a new console application:

    dotnet new console -o mySparkStreamingApp
    cd mySparkStreamingApp
    

    Команда dotnet создаст для вас приложение new типа console.The dotnet command creates a new application of type console for you. Параметр -o создаст каталог с именем mySparkStreamingApp, в котором хранится приложение и требуемые файлы.The -o parameter creates a directory named mySparkStreamingApp where your app is stored and populates it with the required files. Команда cd mySparkStreamingApp изменит каталог на только что созданный каталог приложения.The cd mySparkStreamingApp command changes the directory to the app directory you just created.

  2. Чтобы использовать .NET для Apache Spark в приложении, установите пакет Microsoft.Spark.To use .NET for Apache Spark in an app, install the Microsoft.Spark package. В консоли выполните следующую команду:In your console, run the following command:

    dotnet add package Microsoft.Spark
    

Создание потока данных и подключение к немуEstablish and connect to a data stream

Одним из популярных методов проверки потоковой обработки является netcat.One popular way to test stream processing is through netcat. Средство netcat (или nc) позволяет считывать данные из сетевых подключений и записывать эти данные в них.netcat (also known as nc) allows you to read from and write to network connections. Сетевое подключение в netcat устанавливается через окно терминала.You establish a network connection with netcat through a terminal window.

Создание потока данных с помощью netcatCreate a data stream with netcat

  1. Скачайте netcat.Download netcat. Затем извлеките файл из скачанного ZIP-файла и добавьте каталог, в который вы его извлекли, в переменную среды PATH.Then, extract the file from the zip download and append the directory you extracted to your "PATH" environment variable.

  2. Чтобы создать подключение, откройте новую консоль и выполните следующую команду, которая подключается к localhost через порт 9999.To start a new connection, open a new console and run the following command which connects to localhost on port 9999.

    Windows:On Windows:

    nc -vvv -l -p 9999
    

    В LinuxOn Linux:

    nc -lk 9999
    

    Программа Spark прослушивает входные данные, которые вы вводите в командную строку.Your Spark program listens for the input you type into this command prompt.

Создание SparkSessionCreate a SparkSession

  1. Добавьте следующие дополнительные инструкции using в начало файла Program.cs приложения mySparkStreamingApp:Add the following additional using statements to the top of the Program.cs file in mySparkStreamingApp:

    using System;
    using Microsoft.Spark.Sql;
    using Microsoft.Spark.Sql.Streaming;
    using static Microsoft.Spark.Sql.Functions;
    
  2. Добавьте приведенный ниже код в метод Main, чтобы создать новый экземпляр SparkSession.Add the following code to your Main method to create a new SparkSession. Сеанс Spark является точкой входа для программирования Spark через API DataSet и DataFrame.The Spark Session is the entry point to programming Spark with the Dataset and DataFrame API.

    SparkSession spark = SparkSession
         .Builder()
         .AppName("Streaming example with a UDF")
         .GetOrCreate();
    

    Вызов созданного выше объекта spark позволяет получить доступ к функциям Spark и DataFrame в любом месте вашей программы.Calling the spark object created above allows you to access Spark and DataFrame functionality throughout your program.

Подключение к потоку с помощью ReadStream()Connect to a stream with ReadStream()

Метод ReadStream() возвращает DataStreamReader для чтения потоковых данных в виде DataFrame.The ReadStream() method returns a DataStreamReader that can be used to read streaming data in as a DataFrame. Включите сведения об узле и порте, чтобы приложение Spark знало, где ожидать данные потоковой передачи.Include the host and port information to tell your Spark app where to expect its streaming data.

DataFrame lines = spark
    .ReadStream()
    .Format("socket")
    .Option("host", hostname)
    .Option("port", port)
    .Load();

Регистрация определяемой пользователем функцииRegister a user-defined function

Для выполнения вычислений и анализа данных в приложениях Spark можно использовать определяемые пользователем функции.You can use UDFs, user-defined functions, in Spark applications to perform calculations and analysis on your data.

Добавьте следующий код в метод Main, чтобы зарегистрировать определяемую пользователем функцию с именем udfArray.Add the following code to your Main method to register a UDF called udfArray.

Func<Column, Column> udfArray =
    Udf<string, string[]>((str) => new string[] { str, $"{str} {str.Length}" });

Она обрабатывает каждую строку, полученную из терминала netcat, и создает массив, который содержит исходную строку (в str) и объединение исходной строки с длиной этой строки.This UDF processes each string it receives from the netcat terminal to produce an array that includes the original string (contained in str), followed by the original string concatenated with the length of the original string.

Например, если вы введете в терминале netcat строку Hello World, итоговый массив будет выглядеть так:For example, entering Hello world in the netcat terminal produces an array where:

  • array[0] = Hello worldarray[0] = Hello world
  • array[1] = Hello world 11array[1] = Hello world 11

Использование SparkSQLUse SparkSQL

С помощью SparkSQL можно выполнять разные функции с данными, сохраненными в DataFrame.Use SparkSQL to perform various functions on the data stored in your DataFrame. Достаточно распространен вариант совместного применения определяемых пользователем функций и SparkSQL, чтобы эта функция обрабатывала каждую строку в DataFrame.It's common to combine UDFs and SparkSQL to apply a UDF to each row of a DataFrame.

DataFrame arrayDF = lines.Select(Explode(udfArray(lines["value"])));

Следующий фрагмент кода применяет udfArray к каждому значению в DataFrame с представлением строк, считанных из терминала netcat.This code snippet applies udfArray to each value in your DataFrame, which represents each string read from your netcat terminal. Примените метод SparkSQL Explode, чтобы разместить каждую запись массива в отдельной строке.Apply the SparkSQL method Explode to put each entry of your array in its own row. Наконец, используйте Select, чтобы разместить столбцы, созданные в новом экземпляре arrayDF DataFrame.Finally, use Select to place the columns you've produced in the new DataFrame arrayDF.

Отображение потокаDisplay your stream

Используйте WriteStream(), чтобы установить характеристики выходных данных, например вывод результатов на консоль и отображение только самых свежих выходных данных.Use WriteStream() to establish characteristics of your output, such as printing results to the console and displaying only the most recent output.

StreamingQuery query = arrayDf
    .WriteStream()
    .Format("console")
    .Start();

Выполнение кодаRun your code

Структурированная потоковая передача в Spark обрабатывает данные в виде нескольких небольших пакетов.Structured streaming in Spark processes data through a series of small batches. При выполнении программы для ввода данных можно использовать командную строку, в которой вы установили подключение netcat.When you run your program, the command prompt where you establish the netcat connection allows you to start typing. После каждого нажатия клавиши ВВОД при вводе в командную строку любых данных Spark запускает определяемую пользователем функцию с пакетом данных, содержащим эти введенные данные.Each time you press the Enter key after typing data in that command prompt, Spark considers your entry a batch and runs the UDF.

Использование команды spark-submit для выполнения приложенияUse spark-submit to run your app

Запустив новый сеанс netcat, откройте новое окно терминала и выполните команду spark-submit, как показано ниже.After starting a new netcat session, open a new terminal and run your spark-submit command, similar to the following command:

spark-submit --class org.apache.spark.deploy.dotnet.DotnetRunner --master local /path/to/microsoft-spark-<version>.jar Microsoft.Spark.CSharp.Examples.exe Sql.Streaming.StructuredNetworkCharacterCount localhost 9999

Примечание

Не забудьте указать в представленной выше команде фактический путь к JAR-файлу Microsoft Spark.Be sure to update the above command with the actual path to your Microsoft Spark jar file. Приведенная выше команда также предполагает, что сервер netcat использует порт localhost 9999.The above command also assumes your netcat server is running on localhost port 9999.

Получите кодGet the code

В этом руководстве используется пример StructuredNetworkCharacterCount.cs, но на GitHub есть еще три полных примера обработки потока:This tutorial uses the StructuredNetworkCharacterCount.cs example, but there are three other full stream processing examples on GitHub:

Следующие шагиNext steps

Переходите к следующему руководстве, чтобы научиться развертывать приложение .NET для Apache Spark в среде Databricks.Advance to the next article to learn how to deploy your .NET for Apache Spark application to Databricks.