Öğretici: Apache Spark için .NET ile yapılandırılmış akış

Bu öğreticide, Apache Spark için .NET kullanarak Spark yapılandırılmış akış çağırma öğretilir. Spark yapısal akışı, gerçek zamanlı veri akışlarını işleme Apache Spark. Akış işleme, canlı verileri üretildiğinde analiz anlamına gelir.

Bu öğreticide şunların nasıl yapıldığını öğreneceksiniz:

  • Apache Spark uygulaması için .NET oluşturma ve çalıştırma
  • Netcat kullanarak bir veri akışı oluşturun
  • Akış verilerini çözümlemek için Kullanıcı tanımlı işlevler ve parlak SQL kullanma

Önkoşullar

Apache Spark uygulama için ilk .NET ise, temel bilgileri öğrenecek başlangıç öğreticisiyle başlayın.

Konsol uygulaması oluşturma

  1. Komut istemindeki yeni bir konsol uygulaması oluşturmak için aşağıdaki komutları çalıştırın:

    dotnet new console -o mySparkStreamingApp
    cd mySparkStreamingApp
    

    dotnetKomut new sizin için türünde bir uygulama oluşturur console . -oParametresi, uygulamanızın depolandığı Mymini Streamingapp adlı bir dizin oluşturur ve gerekli dosyalarla doldurur. cd mySparkStreamingAppKomutu, dizini yeni oluşturduğunuz uygulama dizini olarak değiştirir.

  2. .NET uygulamasını bir uygulamada Apache Spark için kullanmak üzere Microsoft. Spark paketini yüklemek için. Konsolunuza aşağıdaki komutu çalıştırın:

    dotnet add package Microsoft.Spark
    

Bir veri akışı oluşturun ve bu akışa bağlanın

Akış işlemeyi test etmenin popüler bir yolu netcat kullanmaktır. netcat ( NC olarak da bilinir) ağ bağlantılarından okuyup yazmanızı sağlar. Bir Terminal penceresi aracılığıyla netcat ile bir ağ bağlantısı kurarsınız.

Netcat ile veri akışı oluşturma

  1. Netcat 'ı indirin. Sonra, dosyayı ZIP indirsitesinden ayıklayın ve "PATH" ortam değişkeninizden ayıkladığınız dizini ekleyin.

  2. Yeni bir bağlantı başlatmak için yeni bir konsol açın ve 9999 numaralı bağlantı noktasında localhost 'a bağlanan aşağıdaki komutu çalıştırın.

    Windows'da:

    nc -vvv -l -p 9999
    

    Linux 'ta:

    nc -lk 9999
    

    Spark programınız, bu komut istemine yazdığınız girişi dinler.

Mini oturum oluşturma

  1. Aşağıdaki ek using deyimlerini Mymini Streamingapp içindeki program. cs dosyasının en üstüne ekleyin:

    using System;
    using Microsoft.Spark.Sql;
    using Microsoft.Spark.Sql.Streaming;
    using static Microsoft.Spark.Sql.Functions;
    
  2. MainYeni bir oluşturma yöntemine aşağıdaki kodu ekleyin SparkSession . Spark oturumu, veri kümesi ve DataFrame API 'SI ile Spark programlamanın giriş noktasıdır.

    SparkSession spark = SparkSession
         .Builder()
         .AppName("Streaming example with a UDF")
         .GetOrCreate();
    

    Yukarıda oluşturulan Spark nesnesini çağırmak, programınızın tamamında Spark ve dataframe işlevlerine erişmenize olanak tanır.

readstream () ile bir akışa Bağlan

ReadStream()Yöntemi, DataStreamReader ' de akış verilerini okumak için kullanılabilecek bir döndürür DataFrame . Spark uygulamanıza akış verilerinin nerede beklendiğini bildirmek için konak ve bağlantı noktası bilgilerini ekleyin.

DataFrame lines = spark
    .ReadStream()
    .Format("socket")
    .Option("host", hostname)
    .Option("port", port)
    .Load();

Kullanıcı tanımlı bir işlevi kaydetme

Verilerinizde hesaplamalar ve analiz yapmak için Spark uygulamalarında Kullanıcı tanımlı UDF 'ler, Kullanıcı tanımlı işlevler' i kullanabilirsiniz.

MainAdlı BIR UDF 'yi kaydetmek için aşağıdaki kodu yöntemine ekleyin udfArray .

Func<Column, Column> udfArray =
    Udf<string, string[]>((str) => new string[] { str, $"{str} {str.Length}" });

Bu UDF, özgün dizeyi ( Str içinde bulunur) içeren bir dizi oluşturmak için netcat terminalinden aldığı her dizeyi işler ve özgün dizenin uzunluğu ile birleştirilmiş özgün dizenin önüne gelir.

Örneğin, Netcat terminalinde Hello World girilmesi, şu durumlarda bir dizi üretir:

  • dizi [ 0] = Merhaba Dünya
  • dizi [ 1] = Merhaba Dünya 11

Mini kullanılan SQL kullan

Veri Çerçeverinizdeki depolanan veriler üzerinde çeşitli işlevler gerçekleştirmek için, parlak SQL kullanın. Veri çerçevesinin her satırına bir UDF uygulamak için UDF 'Leri ve parlak SQL 'i birleştirmek yaygındır.

DataFrame arrayDF = lines.Select(Explode(udfArray(lines["value"])));

Bu kod parçacığı, Netcat terminalinizden okunan her dizeyi temsil eden veri Çerçevenizle her bir değere Udfarray uygular. ExplodeDizinizin her girdisini kendi satırına yerleştirmek için, Mini SQL yöntemini uygulayın. Son olarak, Select Yeni DataFrame arraydf 'da üretilebilen sütunları yerleştirmek için kullanın.

Akışınızı görüntüleme

WriteStream()Sonuçları konsola yazdırma ve yalnızca en son çıktıyı görüntüleme gibi, çıktlarınızın özelliklerini oluşturmak için kullanın.

StreamingQuery query = arrayDf
    .WriteStream()
    .Format("console")
    .Start();

Kodunuzu çalıştırın

Spark 'ta yapılandırılmış akış, bir dizi küçük toplu iş aracılığıyla verileri işler. Programınızı çalıştırdığınızda, Netcat bağlantısını oluşturduğunuz komut istemi yazmaya başlayabilmeniz için izin verir. Bu komut istemine veri yazdıktan sonra ENTER tuşuna her bastığınızda, Spark girişi bir toplu iş olarak değerlendirir ve UDF 'yi çalıştırır.

Uygulamanızı çalıştırmak için Spark-Gönder kullanın

Yeni bir netcat oturumu başlattıktan sonra, yeni bir Terminal açın ve spark-submit aşağıdaki komuta benzer şekilde komutunu çalıştırın:

spark-submit --class org.apache.spark.deploy.dotnet.DotnetRunner --master local /path/to/microsoft-spark-<spark_majorversion-spark_minorversion>_<scala_majorversion.scala_minorversion>-<spark_dotnet_version>.jar Microsoft.Spark.CSharp.Examples.exe Sql.Streaming.StructuredNetworkCharacterCount localhost 9999

Not

Yukarıdaki komutu, Microsoft Spark jar dosyanızın gerçek yoluyla güncelleştirdiğinizden emin olun. Yukarıdaki komut Ayrıca, Netcat sunucunuzun localhost bağlantı noktası 9999 üzerinde çalıştığını varsayar.

Kodu alma

Bu öğretici, Structurednetworkkaraktercount. cs örneğini kullanır, ancak GitHub diğer üç tam akış işleme örneği vardır:

Sonraki adımlar

.NET Apache Spark uygulamanızı Databricks 'e dağıtmayı öğrenmek için sonraki makaleye ilerleyin.