Oktatóanyag: Strukturált streamelés az Apache Sparkhoz készült .NET-tel
Ez az oktatóanyag bemutatja, hogyan hívhatja meg a Spark Structured Streaminget az Apache Sparkhoz készült .NET használatával. A Spark Structured Streaming az Apache Spark támogatása a valós idejű adatfolyamok feldolgozásához. A streamfeldolgozás élő adatok elemzését jelenti az előállításuk során.
Eben az oktatóanyagban az alábbiakkal fog megismerkedni:
- .NET létrehozása és futtatása Apache Spark-alkalmazáshoz
- Adatfolyam létrehozása a netcat használatával
- Felhasználó által definiált függvények és SparkSQL használata streamelési adatok elemzéséhez
Előfeltételek
Ha ez az első Apache Spark-alkalmazás .NET-alkalmazása, kezdje az Első lépések oktatóanyaggal, hogy megismerkedjen az alapokkal.
Konzolalkalmazás létrehozása
Új konzolalkalmazás létrehozásához futtassa a következő parancsokat a parancssorban:
dotnet new console -o mySparkStreamingApp cd mySparkStreamingApp
A
dotnet
parancs létrehoz egynew
típusúconsole
alkalmazást. A-o
paraméter létrehoz egy mySparkStreamingApp nevű könyvtárat, ahol az alkalmazás található, és feltölti a szükséges fájlokkal. Acd mySparkStreamingApp
parancs az imént létrehozott alkalmazáskönyvtárra módosítja a könyvtárat.Ha a .NET-et az Apache Sparkhoz szeretné használni egy alkalmazásban, telepítse a Microsoft.Spark csomagot. Futtassa a következő parancsot a konzolon:
dotnet add package Microsoft.Spark
Adatfolyam létrehozása és csatlakoztatása
A streamfeldolgozás tesztelésének egyik népszerű módja a netcat. A netcat (más néven nc) lehetővé teszi a hálózati kapcsolatokból való olvasást és írást. Hálózati kapcsolatot létesíthet a netcattel egy terminálablakon keresztül.
Adatfolyam létrehozása a netcat használatával
Töltse le a netcatet. Ezután bontsa ki a fájlt a zip-letöltésből, és fűzze hozzá a kibontott könyvtárat a "PATH" környezeti változóhoz.
Új kapcsolat indításához nyisson meg egy új konzolt, és futtassa a következő parancsot, amely a 9999-es porton csatlakozik a localhosthoz.
Windows rendszeren:
nc -vvv -l -p 9999
Linuxon:
nc -lk 9999
A Spark-program figyeli a parancssorba beírt bemenetet.
SparkSession létrehozása
Adja hozzá a következő további
using
utasításokat a Program.cs fájl elejéhez a mySparkStreamingApp alkalmazásban:using System; using Microsoft.Spark.Sql; using Microsoft.Spark.Sql.Streaming; using static Microsoft.Spark.Sql.Functions;
Új kód létrehozásához
SparkSession
adja hozzá a következő kódot aMain
metódushoz. A Spark-munkamenet a Spark adatkészlettel és DataFrame API-val való programozásának belépési pontja.SparkSession spark = SparkSession .Builder() .AppName("Streaming example with a UDF") .GetOrCreate();
A fent létrehozott Spark-objektum meghívásával elérheti a Spark és a DataFrame funkcióit a teljes programban.
Csatlakozás streambe a ReadStream() használatával
A ReadStream()
metódus egy DataStreamReader
olyan értéket ad vissza, amely a streamelési adatok DataFrame
beolvasására használható. Adja meg a gazdagép és a port adatait, hogy a Spark-alkalmazás honnan várja a streamelési adatokat.
DataFrame lines = spark
.ReadStream()
.Format("socket")
.Option("host", hostname)
.Option("port", port)
.Load();
Felhasználó által definiált függvény regisztrálása
A Spark-alkalmazásokban felhasználói függvényekkel és felhasználói függvényekkel számításokat és elemzéseket végezhet az adatokon.
Adja hozzá a következő kódot a Main
metódushoz egy úgynevezett udfArray
UDF regisztrálásához.
Func<Column, Column> udfArray =
Udf<string, string[]>((str) => new string[] { str, $"{str} {str.Length}" });
Ez az UDF feldolgozza a netcat termináltól kapott összes sztringet, és létrehoz egy tömböt, amely tartalmazza az eredeti sztringet ( str-ben), majd az eredeti sztringet összefűzve az eredeti sztring hosszával.
Ha például beírja a Hello world kifejezést a netcat terminálba, egy tömböt hoz létre, ahol:
- array[0] = Hello world
- array[1] = Hello world 11
A SparkSQL használata
A SparkSQL használatával különféle funkciókat hajthat végre a DataFrame-ben tárolt adatokon. Gyakori, hogy az UDF-eket és a SparkSQL-t kombinálva egy UDF-et alkalmaz egy DataFrame minden sorára.
DataFrame arrayDF = lines.Select(Explode(udfArray(lines["value"])));
Ez a kódrészlet udfArray-t alkalmaz a DataFrame minden értékére, amely a netcat terminálból beolvasott sztringeket jelöli. Alkalmazza a SparkSQL-metódust Explode , hogy a tömb minden bejegyzését a saját sorába helyezze. Végül helyezze Select el a létrehozott oszlopokat az új DataFrame arrayDF-ben.
A stream megjelenítése
A kimenet jellemzőinek meghatározására szolgál WriteStream() , például az eredmények konzolra való nyomtatására és csak a legújabb kimenet megjelenítésére.
StreamingQuery query = arrayDf
.WriteStream()
.Format("console")
.Start();
A kód futtatása
A Spark strukturált streamelése kis kötegeken keresztül dolgozza fel az adatokat. A program futtatásakor a netcat-kapcsolat létrehozására szolgáló parancssor lehetővé teszi a gépelés megkezdését. Minden alkalommal, amikor az Enter billentyűt lenyomja, miután beírta az adatokat a parancssorba, a Spark kötegnek tekinti a bejegyzést, és futtatja az UDF-et.
A spark-submit használata az alkalmazás futtatásához
Egy új netcat-munkamenet elindítása után nyisson meg egy új terminált, és futtassa a spark-submit
parancsot az alábbi parancshoz hasonlóan:
spark-submit --class org.apache.spark.deploy.dotnet.DotnetRunner --master local /path/to/microsoft-spark-<spark_majorversion-spark_minorversion>_<scala_majorversion.scala_minorversion>-<spark_dotnet_version>.jar Microsoft.Spark.CSharp.Examples.exe Sql.Streaming.StructuredNetworkCharacterCount localhost 9999
Megjegyzés
Mindenképpen frissítse a fenti parancsot a Microsoft Spark JAR-fájl tényleges elérési útjával. A fenti parancs azt is feltételezi, hogy a netcat-kiszolgáló a 9999-es localhost porton fut.
A kód letöltése
Ez az oktatóanyag a StructuredNetworkCharacterCount.cs példát használja, de a GitHub három további teljes streamfeldolgozási példát is bemutat:
- StructuredNetworkWordCount.cs: a bármely forrásból streamelt adatok szószáma
- StructuredNetworkWordCountWindowed.cs: a szavak száma az adatokon ablakos logikával
- StructuredKafkaWordCount.cs: a Kafkából streamelt adatok szószáma
Következő lépések
Folytassa a következő cikkel, amelyből megtudhatja, hogyan helyezheti üzembe az Apache Spark-alkalmazáshoz készült .NET-et a Databricksben.