Azure Databricks ile bir akış işleme işlem hattı oluşturmaCreate a stream processing pipeline with Azure Databricks

Bu başvuru mimarisi bir uçtan uca gösterir akış işleme işlem hattı.This reference architecture shows an end-to-end stream processing pipeline. Bu tür bir işlem hattı dört aşamadan oluşur: içe alma, işlem, depolama, analiz ve raporlama.This type of pipeline has four stages: ingest, process, store, and analysis and reporting. Bu başvuru mimarisi için işlem hattı iki farklı kaynaktan gelen verileri alır, her akışından ilgili kayıtlar üzerinde birleştirme gerçekleştirir, sonuç zenginleştirir ve gerçek zamanlı ortalamasını hesaplar.For this reference architecture, the pipeline ingests data from two sources, performs a join on related records from each stream, enriches the result, and calculates an average in real time. Daha fazla analiz için sonuçlar depolanır.The results are stored for further analysis. Bu çözümü dağıtın.Deploy this solution.

Başvuru mimarisi için akış Azure Databricks ile işleme

Senaryo: Taksi şirket her taksi seyahat ilgili verileri toplar.Scenario: A taxi company collects data about each taxi trip. Bu senaryo için veri gönderen iki ayrı cihaz varsayıyoruz.For this scenario, we assume there are two separate devices sending data. Taksi her kıl hakkında bilgi gönderen bir ölçüm olan — süresi, uzaklığı ve toplama ve dropoff konumları.The taxi has a meter that sends information about each ride — the duration, distance, and pickup and dropoff locations. Ayrı bir cihaz, Müşteri ödemelerini kabul eder ve fares ilgili verileri gönderir.A separate device accepts payments from customers and sends data about fares. Spot ridership eğilimleri, gerçek zamanlı olarak her Komşuları temelli Mil başına ortalama ipucu hesaplamak için taksi şirket istediği için.To spot ridership trends, the taxi company wants to calculate the average tip per mile driven, in real time, for each neighborhood.

MimariArchitecture

Mimari aşağıdaki bileşenlerden oluşur.The architecture consists of the following components.

Veri kaynakları.Data sources. Bu mimaride, gerçek zamanlı olarak veri akışları oluşturan iki veri kaynağı vardır.In this architecture, there are two data sources that generate data streams in real time. İlk stream kılma bilgilerini içeren ve ikinci taksi bilgileri içerir.The first stream contains ride information, and the second contains fare information. Başvuru mimarisi, statik dosyalar kümesinden okur ve verileri Event Hubs'a gönderir, bir sanal veri oluşturucuyu içerir.The reference architecture includes a simulated data generator that reads from a set of static files and pushes the data to Event Hubs. Veri kaynakları, gerçek bir uygulamada taxi cab dosyaları yüklü cihazlar olacaktır.The data sources in a real application would be devices installed in the taxi cabs.

Azure Event Hubs.Azure Event Hubs. Olay hub'ları bir olay alma hizmetidir.Event Hubs is an event ingestion service. Bu mimaride, iki olay hub'ı örnekleri, her veri kaynağı için kullanılır.This architecture uses two event hub instances, one for each data source. Her veri kaynağı bir veri akışı, ilişkili olay hub'ına gönderir.Each data source sends a stream of data to the associated event hub.

Azure Databricks.Azure Databricks. Databricks , Microsoft Azure bulut hizmetleri platformu için iyileştirilen bir Apache Spark temelli analiz platformudur.Databricks is an Apache Spark-based analytics platform optimized for the Microsoft Azure cloud services platform. Databricks taksi kılma ilişkilendirin ve veri masrafları ve Databricks dosya sisteminde depolanan Komşuları verilerle ilişkili verileri zenginleştirmek için kullanılır.Databricks is used to correlate of the taxi ride and fare data, and also to enrich the correlated data with neighborhood data stored in the Databricks file system.

Cosmos DB.Cosmos DB. Azure Databricks işi çıkışı için yazılmış kayıtlar dizisidir Cosmos DB Cassandra API'yi kullanarak.The output from Azure Databricks job is a series of records, which are written to Cosmos DB using the Cassandra API. Zaman serisi verilerini modelleme desteklediğinden, Cassandra API'si kullanılır.The Cassandra API is used because it supports time series data modeling.

Azure Log Analytics.Azure Log Analytics. Uygulama günlüğü veri tarafından toplanan Azure İzleyici depolanan bir Log Analytics çalışma alanı.Application log data collected by Azure Monitor is stored in a Log Analytics workspace. Log Analytics sorguları analiz ölçümleri görselleştirmenize ve uygulamadaki sorunları belirleyebilmeniz için günlük iletilerini incelemek için kullanılabilir.Log Analytics queries can be used to analyze and visualize metrics and inspect log messages to identify issues within the application.

Veri alımıData ingestion

Bir veri kaynağı benzetimini yapmak için bu başvuru mimarisi kullanan New York taksi verilerini veri kümesi[1].To simulate a data source, this reference architecture uses the New York City Taxi Data dataset[1]. Bu veri kümesi bir dört yıllık döneme taksi gelişlerin da oturan hakkında veri içerir (2010 – 2013).This dataset contains data about taxi trips in New York City over a four-year period (2010 – 2013). İki tür kayıt içerir: Bulutumuzda verileri ve veri masrafları.It contains two types of record: Ride data and fare data. Seyahat süresi, seyahat uzaklık ve toplama ve dropoff konumu kılma veriler içerir.Ride data includes trip duration, trip distance, and pickup and dropoff location. Taksi verilerini içeren taksi, vergi ve ipucu tutarlarıdır.Fare data includes fare, tax, and tip amounts. Her iki kayıt türü ortak alanları medallion sayısı, hack lisans ve satıcı kimliği içerir.Common fields in both record types include medallion number, hack license, and vendor ID. Birlikte bir taksi artı bir sürücü bu üç alanı benzersiz şekilde tanımlar.Together these three fields uniquely identify a taxi plus a driver. Veriler, CSV biçiminde depolanır.The data is stored in CSV format.

[1] Donovan, Brian; Çalışma sayfasını (2016): New York City taksi seyahat verilerini (2010-2013).[1] Donovan, Brian; Work, Dan (2016): New York City Taxi Trip Data (2010-2013). Üniversite, Illinois Urbana Champaign'deki.University of Illinois at Urbana-Champaign. https://doi.org/10.13012/J8PN93H8

Veri oluşturucuyu kayıtları okur ve bunları Azure Event Hubs'a gönderen bir .NET Core uygulamasıdır.The data generator is a .NET Core application that reads the records and sends them to Azure Event Hubs. Oluşturucu, JSON biçimi ve taksi verilerini CSV biçiminde kılma verileri gönderir.The generator sends ride data in JSON format and fare data in CSV format.

Event Hubs kullanan bölümleri veri segmentlere ayırmak için.Event Hubs uses partitions to segment the data. Bölümler bir tüketici her bölüm paralel okuma izin verin.Partitions allow a consumer to read each partition in parallel. Verileri Event Hubs'a gönderme, bölüm anahtarı açıkça belirtebilirsiniz.When you send data to Event Hubs, you can specify the partition key explicitly. Aksi takdirde, kayıtlar, bölüm ettirirsiniz olarak atanır.Otherwise, records are assigned to partitions in round-robin fashion.

Bu senaryoda, veri bulutumuzda ve taksi verileri için belirli taxi cab aynı bölüm kimlikli görmeniz gerekir.In this scenario, ride data and fare data should end up with the same partition ID for a given taxi cab. Bu iki akışları bağıntılandırır, paralellik derecesini uygulamak Databricks sağlar.This enables Databricks to apply a degree of parallelism when it correlates the two streams. Bölüm bir kayıtta n kılma veri bölümündeki bir kaydı eşleşecektir n taksi verileri.A record in partition n of the ride data will match a record in partition n of the fare data.

Azure Databricks ile Event Hubs işleme Akış Diyagramı

Verileri oluşturma Aracı'nda, her iki kayıt türleri için ortak veri modeli olan bir PartitionKey birleşimi olan özellik Medallion, HackLicense, ve VendorId.In the data generator, the common data model for both record types has a PartitionKey property that is the concatenation of Medallion, HackLicense, and VendorId.

public abstract class TaxiData
{
    public TaxiData()
    {
    }

    [JsonProperty]
    public long Medallion { get; set; }

    [JsonProperty]
    public long HackLicense { get; set; }

    [JsonProperty]
    public string VendorId { get; set; }

    [JsonProperty]
    public DateTimeOffset PickupTime { get; set; }

    [JsonIgnore]
    public string PartitionKey
    {
        get => $"{Medallion}_{HackLicense}_{VendorId}";
    }

Bu özellik, Event Hubs'a gönderirken bir açık bölüm anahtarı sağlamak için kullanılır:This property is used to provide an explicit partition key when sending to Event Hubs:

using (var client = pool.GetObject())
{
    return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
        t.GetData(dataFormat))), t.PartitionKey);
}

Event HubsEvent Hubs

Event hubs işleme kapasitesi ölçülür üretilen iş birimleri.The throughput capacity of Event Hubs is measured in throughput units. Otomatik ölçeklendirme bir olay hub'ı etkinleştirerek yapabilecekleriniz otomatik şişme, hangi otomatik olarak ölçeklenen trafiği, yapılandırılmış en temel üretilen iş birimleri.You can autoscale an event hub by enabling auto-inflate, which automatically scales the throughput units based on traffic, up to a configured maximum.

Akış işlemeStream processing

Azure Databricks'te veri işleme işi tarafından gerçekleştirilir.In Azure Databricks, data processing is performed by a job. İş öğesine atanan ve bir kümede çalışır.The job is assigned to and runs on a cluster. İşin olabilir Java veya Spark ile yazılan özel kod not defteri.The job can either be custom code written in Java, or a Spark notebook.

Bu başvuru mimarisinde, iş, hem Java hem de Scala içinde yazılmış sınıflarıyla bir Java arşividir.In this reference architecture, the job is a Java archive with classes written in both Java and Scala. Bir Databricks işi için Java arşiv belirtirken, sınıf yürütme için Databricks küme tarafından belirtilir.When specifying the Java archive for a Databricks job, the class is specified for execution by the Databricks cluster. Burada, ana yöntemi com.microsoft.pnp.TaxiCabReader sınıfı, veri işleme mantığı içerir.Here, the main method of the com.microsoft.pnp.TaxiCabReader class contains the data processing logic.

İki olay hub'ı örneklerden akışı okumaReading the stream from the two event hub instances

Veri işleme mantığı kullanır Spark yapılandırılmış akış iki Azure olay hub'ı örneklerinden okumak için:The data processing logic uses Spark structured streaming to read from the two Azure event hub instances:

val rideEventHubOptions = EventHubsConf(rideEventHubConnectionString)
      .setConsumerGroup(conf.taxiRideConsumerGroup())
      .setStartingPosition(EventPosition.fromStartOfStream)
    val rideEvents = spark.readStream
      .format("eventhubs")
      .options(rideEventHubOptions.toMap)
      .load

    val fareEventHubOptions = EventHubsConf(fareEventHubConnectionString)
      .setConsumerGroup(conf.taxiFareConsumerGroup())
      .setStartingPosition(EventPosition.fromStartOfStream)
    val fareEvents = spark.readStream
      .format("eventhubs")
      .options(fareEventHubOptions.toMap)
      .load

Veri Komşuları bilgilerle zenginleştirmeEnriching the data with the neighborhood information

Kılma verileri oluşturan çekme enlem ve boylam koordinatlarını içerir ve konumları devre dışı bırakın.The ride data includes the latitude and longitude coordinates of the pick up and drop off locations. Bu koordinatları kullanışlı olsa da, bunlar kolayca analiz için tüketilen değil.While these coordinates are useful, they are not easily consumed for analysis. Bu nedenle, bu verileri okuma Komşuları verilerle zenginleştirilmiş bir şekil dosyası.Therefore, this data is enriched with neighborhood data that is read from a shapefile.

Şekil dosyası biçimi ikili ve değil kolayca ayrıştırılmış, ancak GeoTools Kitaplığı Şekil dosyası biçimini kullanan Jeo-uzamsal veri için araçlar sağlar.The shapefile format is binary and not easily parsed, but the GeoTools library provides tools for geospatial data that use the shapefile format. Bu kitaplık kullanılan com.microsoft.pnp.GeoFinder sınıfı oluşturan çekme üzerinde temel Komşuları adını belirleyin ve koordinatları devre dışı bırakın.This library is used in the com.microsoft.pnp.GeoFinder class to determine the neighborhood name based on the pick up and drop off coordinates.

val neighborhoodFinder = (lon: Double, lat: Double) => {
      NeighborhoodFinder.getNeighborhood(lon, lat).get()
    }

Kılma ve taksi verileri birleştirmeJoining the ride and fare data

İlk turu ve taksi verileri dönüştürülür:First the ride and fare data is transformed:

    val rides = transformedRides
      .filter(r => {
        if (r.isNullAt(r.fieldIndex("errorMessage"))) {
          true
        }
        else {
          malformedRides.add(1)
          false
        }
      })
      .select(
        $"ride.*",
        to_neighborhood($"ride.pickupLon", $"ride.pickupLat")
          .as("pickupNeighborhood"),
        to_neighborhood($"ride.dropoffLon", $"ride.dropoffLat")
          .as("dropoffNeighborhood")
      )
      .withWatermark("pickupTime", conf.taxiRideWatermarkInterval())

    val fares = transformedFares
      .filter(r => {
        if (r.isNullAt(r.fieldIndex("errorMessage"))) {
          true
        }
        else {
          malformedFares.add(1)
          false
        }
      })
      .select(
        $"fare.*",
        $"pickupTime"
      )
      .withWatermark("pickupTime", conf.taxiFareWatermarkInterval())

Ve ardından kılma veri ile taksi verileri birleştirilir:And then the ride data is joined with the fare data:

val mergedTaxiTrip = rides.join(fares, Seq("medallion", "hackLicense", "vendorId", "pickupTime"))

Veri işleme ve Cosmos DB'ye eklemeProcessing the data and inserting into Cosmos DB

Belirtilen zaman aralığı için her Komşuları ortalama ücreti tutarındaki hesaplanır:The average fare amount for each neighborhood is calculated for a given time interval:

val maxAvgFarePerNeighborhood = mergedTaxiTrip.selectExpr("medallion", "hackLicense", "vendorId", "pickupTime", "rateCode", "storeAndForwardFlag", "dropoffTime", "passengerCount", "tripTimeInSeconds", "tripDistanceInMiles", "pickupLon", "pickupLat", "dropoffLon", "dropoffLat", "paymentType", "fareAmount", "surcharge", "mtaTax", "tipAmount", "tollsAmount", "totalAmount", "pickupNeighborhood", "dropoffNeighborhood")
      .groupBy(window($"pickupTime", conf.windowInterval()), $"pickupNeighborhood")
      .agg(
        count("*").as("rideCount"),
        sum($"fareAmount").as("totalFareAmount"),
        sum($"tipAmount").as("totalTipAmount")
      )
      .select($"window.start", $"window.end", $"pickupNeighborhood", $"rideCount", $"totalFareAmount", $"totalTipAmount")

Ardından, hangi Cosmos DB'ye eklenir:Which is then inserted into Cosmos DB:

maxAvgFarePerNeighborhood
      .writeStream
      .queryName("maxAvgFarePerNeighborhood_cassandra_insert")
      .outputMode(OutputMode.Append())
      .foreach(new CassandraSinkForeach(connector))
      .start()
      .awaitTermination()

Güvenlikle ilgili dikkat edilmesi gerekenlerSecurity considerations

Azure veritabanı çalışma alanına erişim kullanılarak denetlenir Yönetici Konsolu.Access to the Azure Database workspace is controlled using the administrator console. Yönetici Konsolu kullanıcıları eklemek için kullanıcı izinlerini yönetme ve çoklu oturum açmayı ayarlama işlevselliğini içerir.The administrator console includes functionality to add users, manage user permissions, and set up single sign-on. Çalışma alanları, kümeler, işler ve tablolar için erişim denetimi, ayrıca Yönetici Konsolu aracılığıyla ayarlanabilir.Access control for workspaces, clusters, jobs, and tables can also be set through the administrator console.

Gizli anahtarları yönetmeManaging secrets

Azure Databricks içeren bir gizli dizi deposu bağlantı dizeleri, erişim anahtarları, kullanıcı adları ve parolalar gibi gizli depolamak için kullanılır.Azure Databricks includes a secret store that is used to store secrets, including connection strings, access keys, user names, and passwords. Azure Databricks gizli dizi deposu içindeki gizli dizileri bölümlere ayrılmış kapsamları:Secrets within the Azure Databricks secret store are partitioned by scopes:

databricks secrets create-scope --scope "azure-databricks-job"

Gizli dizileri kapsam düzeyinde eklenir:Secrets are added at the scope level:

databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"

Not

Azure Key Vault tarafından desteklenen bir kapsam, yerel Azure Databricks kapsamı yerine kullanılabilir.An Azure Key Vault-backed scope can be used instead of the native Azure Databricks scope. Daha fazla bilgi için bkz. Azure anahtar kasası tarafından desteklenen kapsamların.To learn more, see Azure Key Vault-backed scopes.

Kodda, gizli dizileri Azure Databricks ile erişilen gizli dizileri yardımcı programları.In code, secrets are accessed via the Azure Databricks secrets utilities.

İzleme konularıMonitoring considerations

Azure Databricks üzerinde Apache Spark tabanlı ve ikisi de log4j günlüğe kaydetme için standart kitaplık olarak.Azure Databricks is based on Apache Spark, and both use log4j as the standard library for logging. Apache Spark tarafından sağlanan varsayılan günlüğü ek olarak, bu başvuru mimarisi, günlükleri ve ölçümler için gönderir. Azure Log Analytics.In addition to the default logging provided by Apache Spark, this reference architecture sends logs and metrics to Azure Log Analytics.

Com.microsoft.pnp.TaxiCabReader sınıfı yapılandırır, günlükleri değerleri kullanarak Azure Log analytics'e göndermek için Apache Spark günlük sisteminin log4j.properties dosya.The com.microsoft.pnp.TaxiCabReader class configures the Apache Spark logging system to send its logs to Azure Log Analytics using the values in the log4j.properties file. Apache Spark Günlükçü iletileri dizeleri olsa da, Azure Log Analytics günlük iletilerini JSON olarak biçimlendirilecek gerektirir.While the Apache Spark logger messages are strings, Azure Log Analytics requires log messages to be formatted as JSON. Com.microsoft.pnp.log4j.LogAnalyticsAppender sınıfı bu iletileri JSON dönüştürür:The com.microsoft.pnp.log4j.LogAnalyticsAppender class transforms these messages to JSON:


    @Override
    protected void append(LoggingEvent loggingEvent) {
        if (this.layout == null) {
            this.setLayout(new JSONLayout());
        }

        String json = this.getLayout().format(loggingEvent);
        try {
            this.client.send(json, this.logType);
        } catch(IOException ioe) {
            LogLog.warn("Error sending LoggingEvent to Log Analytics", ioe);
        }
    }

Olarak com.microsoft.pnp.TaxiCabReader sınıfı kılma ve taksi iletileri işler mümkündür, aşağıdakilerden biri olabilir hatalı biçimlendirilmiş ve bu nedenle geçerli değil.As the com.microsoft.pnp.TaxiCabReader class processes ride and fare messages, it's possible that either one may be malformed and therefore not valid. Bir üretim ortamında, veri kaybını önlemek için hızla düzeltilebilir için veri kaynakları ile ilgili bir sorun tanımlamak için bu hatalı biçimlendirilmiş iletiler çözümlemek önemlidir.In a production environment, it's important to analyze these malformed messages to identify a problem with the data sources so it can be fixed quickly to prevent data loss. Com.microsoft.pnp.TaxiCabReader sınıfı hatalı biçimlendirilmiş taksi ve kılma kayıt sayısını ve bir Apache Spark Accumulator kaydeder:The com.microsoft.pnp.TaxiCabReader class registers an Apache Spark Accumulator that keeps track of the number of malformed fare and ride records:

    @transient val appMetrics = new AppMetrics(spark.sparkContext)
    appMetrics.registerGauge("metrics.malformedrides", AppAccumulators.getRideInstance(spark.sparkContext))
    appMetrics.registerGauge("metrics.malformedfares", AppAccumulators.getFareInstance(spark.sparkContext))
    SparkEnv.get.metricsSystem.registerSource(appMetrics)

Apache Spark, ölçümleri göndermek için Dropwizard kitaplık kullanır ve yerel Dropwizard ölçümleri alanlardan bazılarını Azure Log Analytics ile uyumlu değildir.Apache Spark uses the Dropwizard library to send metrics, and some of the native Dropwizard metrics fields are incompatible with Azure Log Analytics. Bu nedenle, bu başvuru mimarisi, özel Dropwizard havuz ve muhabir içerir.Therefore, this reference architecture includes a custom Dropwizard sink and reporter. Azure Log Analytics tarafından beklenen biçimde ölçümleri biçimlendirir.It formats the metrics in the format expected by Azure Log Analytics. Apache Spark ölçümleri bildirdiğinde, hatalı biçimlendirilmiş kılma ve taksi verileri için özel ölçümleri de gönderilir.When Apache Spark reports metrics, the custom metrics for the malformed ride and fare data are also sent.

Toplu ilerleme Spark yapılandırılmış akış işi ilerleme durumunu Azure Log Analytics çalışma alanına günlüğe kaydedilecek son unsurdur.The last metric to be logged to the Azure Log Analytics workspace is the cumulative progress of the Spark Structured Streaming job progress. Bu yapılır özel StreamingQuery kullanarak, dinleyici uygulanan com.microsoft.pnp.StreamingMetricsListener sınıfı.This is done using a custom StreamingQuery listener implemented in the com.microsoft.pnp.StreamingMetricsListener class. İş çalıştırıldığında bu sınıf Apache Spark oturumuna kayıtlı:This class is registered to the Apache Spark Session when the job runs:

spark.streams.addListener(new StreamingMetricsListener())

Yapılandırılmış akış bir olay oluştuğunda StreamingMetricsListener yöntemlere Apache Spark çalışma zamanı tarafından gönderme olarak adlandırılan günlük iletileri ve ölçümler Azure Log Analytics çalışma alanına.The methods in the StreamingMetricsListener are called by the Apache Spark runtime whenever a structured steaming event occurs, sending log messages and metrics to the Azure Log Analytics workspace. Aşağıdaki sorgularda, çalışma alanınızda uygulamayı izlemek için kullanabilirsiniz:You can use the following queries in your workspace to monitor the application:

Gecikme süresi ve sorguları akış işlemeLatency and throughput for streaming queries

taxijob_CL
| where TimeGenerated > startofday(datetime(<date>)) and TimeGenerated < endofday(datetime(<date>))
| project  mdc_inputRowsPerSecond_d, mdc_durationms_triggerExecution_d
| render timechart

Akış sorgu yürütme işlemi sırasında günlüğe kaydedilen özel durumlarıExceptions logged during stream query execution

taxijob_CL
| where TimeGenerated > startofday(datetime(<date>)) and TimeGenerated < endofday(datetime(<date>))
| where Level contains "Error"

Hatalı biçimlendirilmiş taksi ve kılma veri birikmesiAccumulation of malformed fare and ride data

SparkMetric_CL
| where TimeGenerated > startofday(datetime(<date>)) and TimeGenerated < endofday(datetime(<date>))
| render timechart
| where name_s contains "metrics.malformedrides"

SparkMetric_CL
| where TimeGenerated > startofday(datetime(<date>)) and TimeGenerated < endofday(datetime(<date>))
| render timechart
| where name_s contains "metrics.malformedfares"

İzleme dayanıklılık için iş yürütmeJob execution to trace resiliency

SparkMetric_CL
| where TimeGenerated > startofday(datetime(<date>)) and TimeGenerated < endofday(datetime(<date>))
| render timechart
| where name_s contains "driver.DAGScheduler.job.allJobs"

Çözümü dağıtmaDeploy the solution

Dağıtma ve çalıştırma başvuru uygulaması için adımları GitHub Benioku.To the deploy and run the reference implementation, follow the steps in the GitHub readme.