Tutorial: Structured Streaming with .NET for Apache Spark

This tutorial teaches you how to invoke Spark Structured Streaming using .NET for Apache Spark. Spark Structured Streaming is Apache Spark's support for processing real-time data streams. Stream processing means analyzing live data as it's being produced.

In this tutorial, you learn how to:

  • Create and run a .NET for Apache Spark application
  • Use netcat to create a data stream
  • Use user-defined functions and SparkSQL to analyze streaming data

Prerequisites

If this is your first .NET for Apache Spark application, start with the Getting Started tutorial to become familiar with the basics.

Create a console application

  1. In your command prompt, run the following commands to create a new console application:

    dotnet new console -o mySparkStreamingApp
    cd mySparkStreamingApp
    

    The dotnet command creates a new application of type console for you. The -o parameter creates a directory named mySparkStreamingApp where your app is stored and populates it with the required files. The cd mySparkStreamingApp command changes the directory to the app directory you just created.

  2. To use .NET for Apache Spark in an app, install the Microsoft.Spark package. In your console, run the following command:

    dotnet add package Microsoft.Spark
    

Establish and connect to a data stream

One popular way to test stream processing is through netcat. netcat (also known as nc) allows you to read from and write to network connections. You establish a network connection with netcat through a terminal window.

Create a data stream with netcat

  1. Download netcat. Then, extract the file from the zip download and append the directory you extracted to your "PATH" environment variable.

  2. To start a new connection, open a new console and run the following command which connects to localhost on port 9999.

    On Windows:

    nc -vvv -l -p 9999
    

    On Linux:

    nc -lk 9999
    

    Your Spark program listens for the input you type into this command prompt.

Create a SparkSession

  1. Add the following additional using statements to the top of the Program.cs file in mySparkStreamingApp:

    using System;
    using Microsoft.Spark.Sql;
    using Microsoft.Spark.Sql.Streaming;
    using static Microsoft.Spark.Sql.Functions;
    
  2. Add the following code to your Main method to create a new SparkSession. The Spark Session is the entry point to programming Spark with the Dataset and DataFrame API.

    SparkSession spark = SparkSession
         .Builder()
         .AppName("Streaming example with a UDF")
         .GetOrCreate();
    

    Calling the spark object created above allows you to access Spark and DataFrame functionality throughout your program.

Connect to a stream with ReadStream()

The ReadStream() method returns a DataStreamReader that can be used to read streaming data in as a DataFrame. Include the host and port information to tell your Spark app where to expect its streaming data.

DataFrame lines = spark
    .ReadStream()
    .Format("socket")
    .Option("host", hostname)
    .Option("port", port)
    .Load();

Register a user-defined function

You can use UDFs, user-defined functions, in Spark applications to perform calculations and analysis on your data.

Add the following code to your Main method to register a UDF called udfArray.

Func<Column, Column> udfArray =
    Udf<string, string[]>((str) => new string[] { str, $"{str} {str.Length}" });

This UDF processes each string it receives from the netcat terminal to produce an array that includes the original string (contained in str), followed by the original string concatenated with the length of the original string.

For example, entering Hello world in the netcat terminal produces an array where:

  • array[0] = Hello world
  • array[1] = Hello world 11

Use SparkSQL

Use SparkSQL to perform various functions on the data stored in your DataFrame. It's common to combine UDFs and SparkSQL to apply a UDF to each row of a DataFrame.

DataFrame arrayDF = lines.Select(Explode(udfArray(lines["value"])));

This code snippet applies udfArray to each value in your DataFrame, which represents each string read from your netcat terminal. Apply the SparkSQL method Explode to put each entry of your array in its own row. Finally, use Select to place the columns you've produced in the new DataFrame arrayDF.

Display your stream

Use WriteStream() to establish characteristics of your output, such as printing results to the console and displaying only the most recent output.

StreamingQuery query = arrayDf
    .WriteStream()
    .Format("console")
    .Start();

Run your code

Structured streaming in Spark processes data through a series of small batches. When you run your program, the command prompt where you establish the netcat connection allows you to start typing. Each time you press the Enter key after typing data in that command prompt, Spark considers your entry a batch and runs the UDF.

Use spark-submit to run your app

After starting a new netcat session, open a new terminal and run your spark-submit command, similar to the following command:

spark-submit --class org.apache.spark.deploy.dotnet.DotnetRunner --master local /path/to/microsoft-spark-<version>.jar Microsoft.Spark.CSharp.Examples.exe Sql.Streaming.StructuredNetworkCharacterCount localhost 9999

Note

Be sure to update the above command with the actual path to your Microsoft Spark jar file. The above command also assumes your netcat server is running on localhost port 9999.

Get the code

This tutorial uses the StructuredNetworkCharacterCount.cs example, but there are three other full stream processing examples on GitHub:

Next steps

Advance to the next article to learn how to deploy your .NET for Apache Spark application to Databricks.