Öğretici: Apache Spark için .NET ile Batch işleme
Bu öğreticide, Apache Spark için .NET kullanarak toplu işleme yapmayı öğreneceksiniz. Toplu işlem, kaynak verilerin daha önce veri depolamaya yüklenmiş olduğu anlamına gelen verilerin bekleyen dönüşümdedir.
Toplu işleme genellikle daha fazla analiz için hazırlanması gereken büyük ve düz veri kümelerinde gerçekleştirilir. Günlük işleme ve veri ambarı, yaygın toplu işleme senaryolardır. bu senaryoda, farklı proje veya son projelerin güncelleştirildiği zaman sayısı gibi GitHub projelerle ilgili bilgileri analiz edersiniz.
Bu öğreticide şunların nasıl yapıldığını öğreneceksiniz:
- Apache Spark uygulaması için .NET oluşturma ve çalıştırma
- Verileri bir veri çerçevesine okuyun ve analiz için hazırlayın
- Spark SQL kullanarak verileri işleme
Önkoşullar
Apache Spark için .NET 'i ilk kez kullanıyorsanız, ortamınızı nasıl hazırlayacağınızı ve Apache Spark uygulama için ilk .NET uygulamanızı nasıl çalıştıracağınızı öğrenmek için .NET ile çalışmaya başlama hakkında bilgi edinmek için Apache Spark öğreticisine göz atın.
Örnek verileri indirme
ghtorkiralık , projeler, işlemeler ve izleyicileri hakkında bilgi gibi tüm genel GitHub olaylarını izler ve olayları ve bunların yapısını veritabanlarında depolar. Farklı zaman dilimlerinde toplanan veriler indirilebilir Arşivler olarak kullanılabilir. Döküm dosyaları çok büyük olduğundan, bu kılavuz, GitHub 'den indirilebilen, döküm dosyasının kesilmiş bir sürümünü kullanır.
Not
Ghtorıo veri kümesi, bir çift lisanslama şeması (Creative Commons +) altında dağıtılır. Ticari olmayan kullanımlar için (eğitim, araştırma veya kişisel kullanımları dahil, ancak bunlarla sınırlı olmamak üzere), veri kümesi, bilgi-SA lisansıaltında dağıtılır.
Konsol uygulaması oluşturma
Komut istemindeki yeni bir konsol uygulaması oluşturmak için aşağıdaki komutları çalıştırın:
dotnet new console -o mySparkBatchApp cd mySparkBatchAppdotnetKomutnewsizin için türünde bir uygulama oluştururconsole.-oParametresi, uygulamanızın depolandığı Mymini batchapp adlı bir dizin oluşturur ve gerekli dosyalarla doldurur.cd mySparkBatchAppKomutu, dizini yeni oluşturduğunuz uygulama dizini olarak değiştirir..NET uygulamasını bir uygulamada Apache Spark için kullanmak üzere Microsoft. Spark paketini yüklemek için. Konsolunuza aşağıdaki komutu çalıştırın:
dotnet add package Microsoft.Spark
Mini oturum oluşturma
Aşağıdaki ek
usingdeyimlerini Mymini batchapp'teki program. cs dosyasının en üstüne ekleyin.using System; using Microsoft.Spark.Sql; using static Microsoft.Spark.Sql.Functions;Aşağıdaki kodu proje ad alanına ekleyin. s_referenceData , programın daha sonra tarihe göre filtrelemek için kullanılır.
static readonly DateTime s_referenceDate = new DateTime(2015, 10, 20);Yeni bir mini oturum oluşturmak için aşağıdaki kodu Main yönteminizin içine ekleyin. Mini veri kümesi ve DataFrame API 'SI ile Spark programlama için mini oturum giriş noktasıdır.
sparkNesnesini çağırarak, program genelinde Spark ve DataFrame işlevlerine erişebilirsiniz.SparkSession spark = SparkSession .Builder() .AppName("GitHub and Spark Batch") .GetOrCreate();
Verileri hazırlama
Giriş dosyasını
DataFrame, adlandırılmış sütunlara düzenlenmiş, dağıtılmış bir veri koleksiyonu olan öğesine okuyun. Verilerinize ilişkin sütunları aracılığıyla ayarlayabilirsiniz Schema . ShowVeri çerçevinizdeki verileri göstermek için yöntemini kullanın. CSV dosya yolunu indirdiğiniz GitHub verilerinin konumuyla güncelleştirdiğinizden emin olun.DataFrame projectsDf = spark .Read() .Schema("id INT, url STRING, owner_id INT, " + "name STRING, descriptor STRING, language STRING, " + "created_at STRING, forked_from INT, deleted STRING," + "updated_at STRING") .Csv("filepath"); projectsDf.Show();NaVeri (null) değerleriyle satırları bırakmak için yöntemini ve Drop belirli sütunları verilerinden kaldırmak için yöntemini kullanın. Bu, son analiziyle ilgili olmayan null verileri veya sütunları çözümlemeyi denerseniz hataları önlemeye yardımcı olur.
// Drop any rows with NA values DataFrameNaFunctions dropEmptyProjects = projectsDf.Na(); DataFrame cleanedProjects = dropEmptyProjects.Drop("any"); // Remove unnecessary columns cleanedProjects = cleanedProjects.Drop("id", "url", "owner_id"); cleanedProjects.Show();
Verileri analiz etme
Spark SQL, verilerinizde SQL çağrılar yapmanıza olanak sağlar. veri çerçevinizdeki tüm satırlara kullanıcı tanımlı bir işlev uygulamak için kullanıcı tanımlı işlevleri ve Spark SQL birleştirmek yaygındır.
spark.Sqlbaşka türde uygulamalarda görülen standart SQL çağrılarını taklit etmek için özel olarak çağrı yapabilirsiniz. Ayrıca GroupBy Agg , ve verileriniz üzerinde hesaplamalar yapmak, filtre uygulamak ve bunları gerçekleştirmek için gibi yöntemleri de çağırabilirsiniz.
bu uygulamanın amacı GitHub proje verileri hakkında bazı öngörülere sahip olmaktır. Verileri çözümlemek için programınıza aşağıdaki kod parçacıklarını ekleyin.
Aşağıdaki kod bloğunu eklemek, her dilin ne kadar ele geçirilmiş olduğunu bulur. İlk olarak, veriler dile göre gruplandırılır. Ardından, her dilden ortalama çatalların sayısı alınır.
// Average number of times each language has been forked DataFrame groupedDF = cleanedProjects .GroupBy("language") .Agg(Avg(cleanedProjects["forked_from"]));En çok kullanılan dilleri görmek için Ortalama çatalların sayısını azalan sırada sıralamak üzere aşağıdaki kod bloğunu ekleyin. Yani, en fazla çatallar ilk olarak görünür.
// Sort by most forked languages first groupedDF.OrderBy(Desc("avg(forked_from)")).Show();Sonraki kod bloğu, son projelerin güncelleştirilme şeklini gösterir. MyUDF adlı yeni bir Kullanıcı tanımlı işlevi kaydeder ve öğreticinin başlangıcında bildirildiği bir tarih, s_referenceDate ile karşılaştırın. Her projenin tarihi başvuru tarihine göre karşılaştırılır. Daha sonra Spark SQL, veri kümesindeki her bir projeyi analiz etmek için verilerin her bir satırında UDF 'yi çağırmak için kullanılır.
spark.Udf().Register<string, bool>( "MyUDF", (date) => DateTime.TryParse(date, out DateTime convertedDate) && (convertedDate > s_referenceDate)); cleanedProjects.CreateOrReplaceTempView("dateView"); DataFrame dateDf = spark.Sql( "SELECT *, MyUDF(dateView.updated_at) AS datebefore FROM dateView"); dateDf.Show();spark.Stop()Mini oturumu sona erdirmek için çağırın.
Uygulamanızı çalıştırmak için Spark-Gönder kullanın
.NET uygulamanızı derlemek için aşağıdaki komutu kullanın:
dotnet buildUygulamanızı ile çalıştırın
spark-submit. Aşağıdaki komutu Microsoft Spark jar dosyanıza yönelik gerçek yollarla güncelleştirdiğinizden emin olun.spark-submit --class org.apache.spark.deploy.dotnet.DotnetRunner --master local /<path>/to/microsoft-spark-<spark_majorversion-spark_minorversion>_<scala_majorversion.scala_minorversion>-<spark_dotnet_version>.jar dotnet /<path>/to/netcoreapp<version>/mySparkBatchApp.dll
Kodu alma
GitHub çözümü tam olarak görebilirsiniz.
Sonraki adımlar
Apache Spark için .NET ile akış verilerini nasıl işleyebileceğinizi öğrenmek için sonraki makaleye ilerleyin.