Oktatóanyag: Strukturált streamelés az Apache Sparkhoz készült .NET-tel

Ez az oktatóanyag bemutatja, hogyan hívhatja meg a Spark Structured Streaminget az Apache Sparkhoz készült .NET használatával. A Spark Structured Streaming az Apache Spark támogatása a valós idejű adatfolyamok feldolgozásához. A streamfeldolgozás élő adatok elemzését jelenti az előállításuk során.

Eben az oktatóanyagban az alábbiakkal fog megismerkedni:

  • .NET létrehozása és futtatása Apache Spark-alkalmazáshoz
  • Adatfolyam létrehozása a netcat használatával
  • Felhasználó által definiált függvények és SparkSQL használata streamelési adatok elemzéséhez

Előfeltételek

Ha ez az első Apache Spark-alkalmazás .NET-alkalmazása, kezdje az Első lépések oktatóanyaggal, hogy megismerkedjen az alapokkal.

Konzolalkalmazás létrehozása

  1. Új konzolalkalmazás létrehozásához futtassa a következő parancsokat a parancssorban:

    dotnet new console -o mySparkStreamingApp
    cd mySparkStreamingApp
    

    A dotnet parancs létrehoz egy new típusú console alkalmazást. A -o paraméter létrehoz egy mySparkStreamingApp nevű könyvtárat, ahol az alkalmazás található, és feltölti a szükséges fájlokkal. A cd mySparkStreamingApp parancs az imént létrehozott alkalmazáskönyvtárra módosítja a könyvtárat.

  2. Ha a .NET-et az Apache Sparkhoz szeretné használni egy alkalmazásban, telepítse a Microsoft.Spark csomagot. Futtassa a következő parancsot a konzolon:

    dotnet add package Microsoft.Spark
    

Adatfolyam létrehozása és csatlakoztatása

A streamfeldolgozás tesztelésének egyik népszerű módja a netcat. A netcat (más néven nc) lehetővé teszi a hálózati kapcsolatokból való olvasást és írást. Hálózati kapcsolatot létesíthet a netcattel egy terminálablakon keresztül.

Adatfolyam létrehozása a netcat használatával

  1. Töltse le a netcatet. Ezután bontsa ki a fájlt a zip-letöltésből, és fűzze hozzá a kibontott könyvtárat a "PATH" környezeti változóhoz.

  2. Új kapcsolat indításához nyisson meg egy új konzolt, és futtassa a következő parancsot, amely a 9999-es porton csatlakozik a localhosthoz.

    Windows rendszeren:

    nc -vvv -l -p 9999
    

    Linuxon:

    nc -lk 9999
    

    A Spark-program figyeli a parancssorba beírt bemenetet.

SparkSession létrehozása

  1. Adja hozzá a következő további using utasításokat a Program.cs fájl elejéhez a mySparkStreamingApp alkalmazásban:

    using System;
    using Microsoft.Spark.Sql;
    using Microsoft.Spark.Sql.Streaming;
    using static Microsoft.Spark.Sql.Functions;
    
  2. Új kód létrehozásához SparkSessionadja hozzá a következő kódot a Main metódushoz. A Spark-munkamenet a Spark adatkészlettel és DataFrame API-val való programozásának belépési pontja.

    SparkSession spark = SparkSession
         .Builder()
         .AppName("Streaming example with a UDF")
         .GetOrCreate();
    

    A fent létrehozott Spark-objektum meghívásával elérheti a Spark és a DataFrame funkcióit a teljes programban.

Csatlakozás streambe a ReadStream() használatával

A ReadStream() metódus egy DataStreamReader olyan értéket ad vissza, amely a streamelési adatok DataFramebeolvasására használható. Adja meg a gazdagép és a port adatait, hogy a Spark-alkalmazás honnan várja a streamelési adatokat.

DataFrame lines = spark
    .ReadStream()
    .Format("socket")
    .Option("host", hostname)
    .Option("port", port)
    .Load();

Felhasználó által definiált függvény regisztrálása

A Spark-alkalmazásokban felhasználói függvényekkel és felhasználói függvényekkel számításokat és elemzéseket végezhet az adatokon.

Adja hozzá a következő kódot a Main metódushoz egy úgynevezett udfArrayUDF regisztrálásához.

Func<Column, Column> udfArray =
    Udf<string, string[]>((str) => new string[] { str, $"{str} {str.Length}" });

Ez az UDF feldolgozza a netcat termináltól kapott összes sztringet, és létrehoz egy tömböt, amely tartalmazza az eredeti sztringet ( str-ben), majd az eredeti sztringet összefűzve az eredeti sztring hosszával.

Ha például beírja a Hello world kifejezést a netcat terminálba, egy tömböt hoz létre, ahol:

  • array[0] = Hello world
  • array[1] = Hello world 11

A SparkSQL használata

A SparkSQL használatával különféle funkciókat hajthat végre a DataFrame-ben tárolt adatokon. Gyakori, hogy az UDF-eket és a SparkSQL-t kombinálva egy UDF-et alkalmaz egy DataFrame minden sorára.

DataFrame arrayDF = lines.Select(Explode(udfArray(lines["value"])));

Ez a kódrészlet udfArray-t alkalmaz a DataFrame minden értékére, amely a netcat terminálból beolvasott sztringeket jelöli. Alkalmazza a SparkSQL-metódust Explode , hogy a tömb minden bejegyzését a saját sorába helyezze. Végül helyezze Select el a létrehozott oszlopokat az új DataFrame arrayDF-ben.

A stream megjelenítése

A kimenet jellemzőinek meghatározására szolgál WriteStream() , például az eredmények konzolra való nyomtatására és csak a legújabb kimenet megjelenítésére.

StreamingQuery query = arrayDf
    .WriteStream()
    .Format("console")
    .Start();

A kód futtatása

A Spark strukturált streamelése kis kötegeken keresztül dolgozza fel az adatokat. A program futtatásakor a netcat-kapcsolat létrehozására szolgáló parancssor lehetővé teszi a gépelés megkezdését. Minden alkalommal, amikor az Enter billentyűt lenyomja, miután beírta az adatokat a parancssorba, a Spark kötegnek tekinti a bejegyzést, és futtatja az UDF-et.

A spark-submit használata az alkalmazás futtatásához

Egy új netcat-munkamenet elindítása után nyisson meg egy új terminált, és futtassa a spark-submit parancsot az alábbi parancshoz hasonlóan:

spark-submit --class org.apache.spark.deploy.dotnet.DotnetRunner --master local /path/to/microsoft-spark-<spark_majorversion-spark_minorversion>_<scala_majorversion.scala_minorversion>-<spark_dotnet_version>.jar Microsoft.Spark.CSharp.Examples.exe Sql.Streaming.StructuredNetworkCharacterCount localhost 9999

Megjegyzés

Mindenképpen frissítse a fenti parancsot a Microsoft Spark JAR-fájl tényleges elérési útjával. A fenti parancs azt is feltételezi, hogy a netcat-kiszolgáló a 9999-es localhost porton fut.

A kód letöltése

Ez az oktatóanyag a StructuredNetworkCharacterCount.cs példát használja, de a GitHub három további teljes streamfeldolgozási példát is bemutat:

Következő lépések

Folytassa a következő cikkel, amelyből megtudhatja, hogyan helyezheti üzembe az Apache Spark-alkalmazáshoz készült .NET-et a Databricksben.