Учебник. Выполнение пакетной обработки с помощью .NET для Apache SparkTutorial: Do batch processing with .NET for Apache Spark

В этом учебнике вы узнаете, как выполнять пакетную обработку с помощью .NET для Apache Spark.In this tutorial, you learn how to do batch processing using .NET for Apache Spark. Пакетная обработка — это преобразование неактивных данных. Это означает, что исходные данные уже загружены в хранилище данных.Batch processing is the transformation of data at rest, meaning that the source data has already been loaded into data storage.

Обычно выполняется пакетная обработка больших неструктурированных наборов данных, которые необходимо подготовить для дальнейшего анализа.Batch processing is generally performed over large, flat datasets that need to be prepared for further analysis. Обработка журналов и хранение данных — это типичные сценарии пакетной обработки.Log processing and data warehousing are common batch processing scenarios. В таких сценариях вы анализируете сведения о проектах GitHub, например количество вилок различных проектов или сведения об обновлении проектов.In this scenario, you analyze information about GitHub projects, such as the number of time different projects have been forked or how recently projects have been updated.

В этом руководстве вы узнаете, как:In this tutorial, you learn how to:

  • создать и выполнить приложение .NET для Apache Spark;Create and run a .NET for Apache Spark application
  • считывать данные в DataFrame и подготовить их для анализа;Read data into a DataFrame and prepare it for analysis
  • обрабатывать данные с помощью Spark SQL.Process the data using Spark SQL

Предварительные требованияPrerequisites

Если вы впервые используете .NET для Apache Spark, ознакомьтесь с учебником Учебник. Начало работы с .NET для Apache Spark, чтобы узнать, как подготовить среду и запустить первое приложение .NET для Apache Spark.If this is your first time using .NET for Apache Spark, check out the Get started with .NET for Apache Spark tutorial to learn how to prepare your environment and run your first .NET for Apache Spark application.

Скачивание примера данныхDownload the sample data

GHTorrent отслеживает все открытые события GitHub, например информацию о проектах, фиксации и наблюдателях, а также сохраняет события и их структуру в базах данных.GHTorrent monitors all public GitHub events, such as info about projects, commits, and watchers, and stores the events and their structure in databases. Данные, собранные за разные временные периоды, доступны в виде загружаемых архивов.Data collected over different time periods is available as downloadable archives. Так как файлы дампа очень большие, в этом учебнике используется их сокращенная версия, которую можно скачать на сайте GitHub.Because the dump files are very large, this guide uses a truncated version of the dump file that can be downloaded from GitHub.

Примечание

Набор данных GHTorrent распространяется по схеме двойного лицензирования (Creative Commons +).The GHTorrent dataset is distributed under a dual licensing scheme (Creative Commons +). Для некоммерческих целей (в том числе для образовательных, исследовательских и личных) набор данных распространяется по лицензии CC-BY-SA.For non-commercial uses (including, but not limited to, educational, research or personal uses), the dataset is distributed under the CC-BY-SA license.

Создание консольного приложенияCreate a console application

  1. В командной строке выполните следующие команды, чтобы создать новое консольное приложение:In your command prompt, run the following commands to create a new console application:

    dotnet new console -o mySparkBatchApp
    cd mySparkBatchApp
    

    Команда dotnet создаст для вас приложение new типа console.The dotnet command creates a new application of type console for you. Параметр -o создаст каталог с именем mySparkBatchApp, в котором хранится приложение и используемые им файлы.The -o parameter creates a directory named mySparkBatchApp where your app is stored and populates it with the required files. Команда cd mySparkBatchApp изменит каталог на только что созданный каталог приложения.The cd mySparkBatchApp command changes the directory to the app directory you just created.

  2. Чтобы использовать .NET для Apache Spark в приложении, установите пакет Microsoft.Spark.To use .NET for Apache Spark in an app, install the Microsoft.Spark package. В консоли выполните следующую команду:In your console, run the following command:

    dotnet add package Microsoft.Spark
    

Создание SparkSessionCreate a SparkSession

  1. Добавьте следующие дополнительные инструкции using в начало файла Program.cs приложения mySparkBatchApp.Add the following additional using statements to the top of the Program.cs file in mySparkBatchApp.

    using System;
    using Microsoft.Spark.Sql;
    using static Microsoft.Spark.Sql.Functions;
    
  2. Добавьте приведенный ниже код к пространству имен проекта.Add the following code to your project namespace. s_referenceData будет использоваться позже в программе для фильтрации на основе даты.s_referenceData is used later in the program to filter based on date.

    static readonly DateTime s_referenceDate = new DateTime(2015, 10, 20);
    
  3. Добавьте следующий код в метод Main, чтобы установить новый сеанс SparkSession.Add the following code inside your Main method to establish a new SparkSession. SparkSession является точкой входа для программирования Spark через API DataSet и DataFrame.The SparkSession is the entry point to programming Spark with the Dataset and DataFrame API. Вызов объекта spark позволяет получить доступ к функциям Spark и DataFrame в любом месте вашей программы.By calling the spark object, you can access Spark and DataFrame functionality throughout your program.

    SparkSession spark = SparkSession
         .Builder()
         .AppName("GitHub and Spark Batch")
         .GetOrCreate();
    

Подготовка данныхPrepare the data

  1. Считайте входной файл в DataFrame, который представляет собой распределенную коллекцию данных в виде именованных столбцов.Read the input file into a DataFrame, which is a distributed collection of data organized into named columns. Столбцы для данных можно задать с помощью Schema.You can set the columns for your data through Schema. Для отображения данных в DataFrame можно применять метод Show.Use the Show method to display the data in your DataFrame. Не забудьте изменить путь к CSV-файлу на расположение скачанных данных GitHub.Be sure to update the CSV file path to the location of the GitHub data you downloaded.

    DataFrame projectsDf = spark
        .Read()
        .Schema("id INT, url STRING, owner_id INT, " +
        "name STRING, descriptor STRING, language STRING, " +
        "created_at STRING, forked_from INT, deleted STRING" +
        "updated_at STRING")
        .Csv("filepath");
    
    projectsDf.Show();
    
  2. Используйте метод Na, чтобы удалить строки со значениями NA (NULL), и метод Drop, чтобы удалить определенные столбцы из данных.Use the Na method to drop rows with NA (null) values, and the Drop method to remove certain columns from your data. Это позволяет избежать ошибок при попытке анализа данных со значением NULL или столбцов, не относящихся к конечному анализу.This helps prevent errors if you try to analyze null data or columns that are not relevant to your final analysis.

    // Drop any rows with NA values
    DataFrameNaFunctions dropEmptyProjects = projectsDf.Na();
    DataFrame cleanedProjects = dropEmptyProjects.Drop("any");
    
    // Remove unnecessary columns
    cleanedProjects = cleanedProjects.Drop("id", "url", "owner_id");
    cleanedProjects.Show();
    

Анализ данныхAnalyze the data

Spark SQL позволяет выполнять SQL-вызовы данных.Spark SQL allows you to make SQL calls on your data. Общепринятой практикой является объединение пользовательских функций и Spark SQL для применения пользовательской функции ко всем строкам в DataFrame.It's common to combine user-defined functions and Spark SQL to apply a user-defined function to all rows of your DataFrame.

Вы можете специально вызвать spark.Sql, чтобы сымитировать стандартные вызовы SQL, которые встречаются в приложениях других типов.You can specifically call spark.Sql to mimic standard SQL calls seen in other types of apps. Кроме того, можно вызвать метод GroupBy и Agg, чтобы объединить и фильтровать данные, а также выполнить вычисления с ними.You can also call methods like GroupBy and Agg to specifically combine, filter, and perform calculations on your data.

Цель этого приложения — получить определенные сведения о данных проектов GitHub.The goal of this app is to gain some insights about the GitHub projects data. Добавьте следующие фрагменты кода в программу, чтобы проанализировать данные.Add the following code snippets to your program to analyze the data.

  1. Добавьте следующий блок кода, чтобы определить количество вилок для каждого языка.Add the following block of code finds the number of times each language has been forked. Сначала данные группируются по языку.First, the data is grouped by language. Затем из каждого языка берется среднее число вилок.Then, the average number of forks from each language is taken.

    // Average number of times each language has been forked
    DataFrame groupedDF = cleanedProjects
        .GroupBy("language")
        .Agg(Avg(cleanedProjects["forked_from"]);
    
  2. Добавьте следующий блок кода, чтобы упорядочить среднее число вилок по убыванию. Так вы узнаете, какие языки наиболее разветвленные.Add the following block of code to order the average number of forks in descending order to see which languages are the most forked. То есть сначала будет отображаться наибольшее число вилок.That is, the largest number of forks will appear first.

    // Sort by most forked languages first
    groupedDF.OrderBy(Desc("avg(forked_from)")).Show(); 
    
  3. В указанном ниже блоке кода показано, как давно были обновлены проекты.The next block of code shows you how recently projects have been updated. Вы регистрируете новую пользовательскую функцию MyUDF и сравниваете ее с датой (s_referenceDate), которая была объявлена в начале этого учебника.You register a new user-defined function called MyUDF and compare it with a date, s_referenceDate, which was declared at the beginning of the tutorial. Дата каждого проекта сравнивается с датой ссылки.The date for each project is compared against the reference date. Затем с помощью Spark SQL выполняется вызов определяемой пользователем функции в каждой строке данных, чтобы проанализировать каждый проект в наборе данных.Then, Spark SQL is used to call the UDF on each row of the data to analyze each project in the data set.

    spark.Udf().Register<string, bool>(
        "MyUDF",
        (date) => DateTime.TryParse(date, out DateTime convertedDate) &&
            (convertedDate > s_referenceDate);   
    cleanedProjects.CreateOrReplaceTempView("dateView"); 
    
    DataFrame dateDf = spark.Sql(
        "SELECT *, MyUDF(dateView.updated_at) AS datebefore FROM dateView");
    dateDf.Show();
    
  4. Вызовите spark.Stop(), чтобы завершить SparkSession.Call spark.Stop() to end the SparkSession.

Использование команды spark-submit для выполнения приложенияUse spark-submit to run your app

  1. Выполните следующую команду, чтобы создать приложение .NET:Use the following command to build your .NET app:

    dotnet build
    
  2. Запустите приложение с помощью команды spark-submit.Run your app with spark-submit. Не забудьте указать в приведенной ниже команде фактический путь к JAR-файлу Microsoft Spark.Be sure to update the following command with the actual paths to your Microsoft Spark jar file.

    spark-submit --class org.apache.spark.deploy.dotnet.DotnetRunner --master local /<path>/to/microsoft-spark-<version>.jar dotnet /<path>/to/netcoreapp<version>/GitHubProjects.dll
    

Получите кодGet the code

Полное решение можно просмотреть на сайте GitHub.You can see the full solution on GitHub.

Следующие шагиNext steps

В следующем учебнике описано, как обрабатывать потоковую передачу данных с помощью .NET для Apache Spark.Advance to the next article to learn how to process streaming data with .NET for Apache Spark.