Azure Stream Analytics ile bir akış işleme işlem hattı oluşturmaCreate a stream processing pipeline with Azure Stream Analytics

Bu başvuru mimarisi bir uçtan uca gösterir akış işleme işlem hattı.This reference architecture shows an end-to-end stream processing pipeline. İşlem hattı iki farklı kaynaktan gelen verileri alır, iki akışlarında kayıtları ilişkilendirir ve bir zaman penceresi hareketli ortalamanın hesaplar.The pipeline ingests data from two sources, correlates records in the two streams, and calculates a rolling average across a time window. Daha fazla analiz için sonuçlar depolanır.The results are stored for further analysis.

Bu mimari için bir başvuru uygulaması kullanılabilir GitHub.A reference implementation for this architecture is available on GitHub.

Azure Stream Analytics ile bir akış işleme işlem hattı oluşturmak için başvuru mimarisi

Senaryo: Taksi şirket her taksi seyahat ilgili verileri toplar.Scenario: A taxi company collects data about each taxi trip. Bu senaryo için veri gönderen iki ayrı cihaz varsayıyoruz.For this scenario, we assume there are two separate devices sending data. Taksi her kıl hakkında bilgi gönderen bir ölçüm olan — süresi, uzaklığı ve toplama ve dropoff konumları.The taxi has a meter that sends information about each ride — the duration, distance, and pickup and dropoff locations. Ayrı bir cihaz, Müşteri ödemelerini kabul eder ve fares ilgili verileri gönderir.A separate device accepts payments from customers and sends data about fares. Taksi şirket, eğilimleri belirlemek için sırada gerçek zamanlı temelli Mil başına ortalama ipucu hesaplamak istiyor.The taxi company wants to calculate the average tip per mile driven, in real time, in order to spot trends.

MimariArchitecture

Mimari aşağıdaki bileşenlerden oluşur.The architecture consists of the following components.

Veri kaynakları.Data sources. Bu mimaride, gerçek zamanlı olarak veri akışları oluşturan iki veri kaynağı vardır.In this architecture, there are two data sources that generate data streams in real time. İlk stream kılma bilgilerini içeren ve ikinci taksi bilgileri içerir.The first stream contains ride information, and the second contains fare information. Başvuru mimarisi, statik dosyalar kümesinden okur ve verileri Event Hubs'a gönderir, bir sanal veri oluşturucuyu içerir.The reference architecture includes a simulated data generator that reads from a set of static files and pushes the data to Event Hubs. Gerçek bir uygulamada, veri kaynakları taxi cab dosyaları yüklü cihazlar olacaktır.In a real application, the data sources would be devices installed in the taxi cabs.

Azure Event Hubs.Azure Event Hubs. Olay hub'ları bir olay alma hizmetidir.Event Hubs is an event ingestion service. Bu mimaride, iki olay hub'ı örnekleri, her veri kaynağı için kullanılır.This architecture uses two event hub instances, one for each data source. Her veri kaynağı bir veri akışı, ilişkili olay hub'ına gönderir.Each data source sends a stream of data to the associated event hub.

Azure Stream Analytics.Azure Stream Analytics. Stream Analytics olay işleme motorudur.Stream Analytics is an event-processing engine. Bir Stream Analytics işi, verileri iki event hubs'dan olay akışları ve akış işleme gerçekleştirir okur.A Stream Analytics job reads the data streams from the two event hubs and performs stream processing.

Cosmos DB.Cosmos DB. Stream Analytics iş çıktısını JSON belgeleri olarak bir Cosmos DB belge veritabanına yazılan kayıtlar dizisidir.The output from the Stream Analytics job is a series of records, which are written as JSON documents to a Cosmos DB document database.

Microsoft Power BI.Microsoft Power BI. Power BI, iş öngörüleri için verileri çözümlemek için İş analizi araçları paketidir.Power BI is a suite of business analytics tools to analyze data for business insights. Bu mimaride, Cosmos DB'den verileri yükler.In this architecture, it loads the data from Cosmos DB. Bu, kullanıcıların eksiksiz toplanmış geçmiş verileri çözümlemek sağlar.This allows users to analyze the complete set of historical data that's been collected. Ayrıca, Stream Analytics'ten alınan doğrudan Power BI'da sonuçların verilerin gerçek zamanlı bir görünüm için akış.You could also stream the results directly from Stream Analytics to Power BI for a real-time view of the data. Daha fazla bilgi için Power BI'da gerçek zamanlı akış.For more information, see Real-time streaming in Power BI.

Azure İzleyici.Azure Monitor. Azure İzleyici çözümünde dağıtılmış Azure Hizmetleri ile ilgili performans ölçümleri toplar.Azure Monitor collects performance metrics about the Azure services deployed in the solution. Bu Panoda görselleştirmek için çözümün sistem durumu hakkında Öngörüler elde edebilirsiniz.By visualizing these in a dashboard, you can get insights into the health of the solution.

Veri alımıData ingestion

Bir veri kaynağı benzetimini yapmak için bu başvuru mimarisi kullanan New York taksi verilerini veri kümesi[1].To simulate a data source, this reference architecture uses the New York City Taxi Data dataset[1]. Bu veri kümesi bir 4 yıllık döneme taksi gelişlerin da oturan hakkında veri içerir (2010 – 2013).This dataset contains data about taxi trips in New York City over a 4-year period (2010 – 2013). İki tür kayıt içerir: Bulutumuzda verileri ve veri masrafları.It contains two types of record: Ride data and fare data. Seyahat süresi, seyahat uzaklık ve toplama ve dropoff konumu kılma veriler içerir.Ride data includes trip duration, trip distance, and pickup and dropoff location. Taksi verilerini içeren taksi, vergi ve ipucu tutarlarıdır.Fare data includes fare, tax, and tip amounts. Her iki kayıt türü ortak alanları medallion sayısı, hack lisans ve satıcı kimliği içerir.Common fields in both record types include medallion number, hack license, and vendor ID. Birlikte bir taksi artı bir sürücü bu üç alanı benzersiz şekilde tanımlar.Together these three fields uniquely identify a taxi plus a driver. Veriler, CSV biçiminde depolanır.The data is stored in CSV format.

[1] Donovan, Brian; Çalışma sayfasını (2016): New York City taksi seyahat verilerini (2010-2013).[1] Donovan, Brian; Work, Dan (2016): New York City Taxi Trip Data (2010-2013). Üniversite, Illinois Urbana Champaign'deki.University of Illinois at Urbana-Champaign. https://doi.org/10.13012/J8PN93H8

Veri oluşturucuyu kayıtları okur ve bunları Azure Event Hubs'a gönderen bir .NET Core uygulamasıdır.The data generator is a .NET Core application that reads the records and sends them to Azure Event Hubs. Oluşturucu, JSON biçimi ve taksi verilerini CSV biçiminde kılma verileri gönderir.The generator sends ride data in JSON format and fare data in CSV format.

Event Hubs kullanan bölümleri veri segmentlere ayırmak için.Event Hubs uses partitions to segment the data. Bölümler bir tüketici her bölüm paralel okuma izin verin.Partitions allow a consumer to read each partition in parallel. Verileri Event Hubs'a gönderme, bölüm anahtarı açıkça belirtebilirsiniz.When you send data to Event Hubs, you can specify the partition key explicitly. Aksi takdirde, kayıtlar, bölüm ettirirsiniz olarak atanır.Otherwise, records are assigned to partitions in round-robin fashion.

Belirli Bu senaryoda, veri bulutumuzda ve taksi verileri için belirli taxi cab aynı bölüm kimlikli görmeniz gerekir.In this particular scenario, ride data and fare data should end up with the same partition ID for a given taxi cab. Bu iki akışları bağıntılandırır, paralellik derecesini uygulamak Stream Analytics sağlar.This enables Stream Analytics to apply a degree of parallelism when it correlates the two streams. Bölüm bir kayıtta n kılma veri bölümündeki bir kaydı eşleşecektir n taksi verileri.A record in partition n of the ride data will match a record in partition n of the fare data.

Azure Stream Analytics ve Event Hubs ile akış diyagramı

Verileri oluşturma Aracı'nda, her iki kayıt türleri için ortak veri modeli olan bir PartitionKey birleştirme özelliğinin, Medallion, HackLicense, ve VendorId.In the data generator, the common data model for both record types has a PartitionKey property which is the concatenation of Medallion, HackLicense, and VendorId.

public abstract class TaxiData
{
    public TaxiData()
    {
    }

    [JsonProperty]
    public long Medallion { get; set; }

    [JsonProperty]
    public long HackLicense { get; set; }

    [JsonProperty]
    public string VendorId { get; set; }

    [JsonProperty]
    public DateTimeOffset PickupTime { get; set; }

    [JsonIgnore]
    public string PartitionKey
    {
        get => $"{Medallion}_{HackLicense}_{VendorId}";
    }

Bu özellik, Event Hubs'a gönderirken bir açık bölüm anahtarı sağlamak için kullanılır:This property is used to provide an explicit partition key when sending to Event Hubs:

using (var client = pool.GetObject())
{
    return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
        t.GetData(dataFormat))), t.PartitionKey);
}

Akış işlemeStream processing

Akış işleme işi ayrı adımlarla bir SQL sorgusu kullanılarak tanımlanır.The stream processing job is defined using a SQL query with several distinct steps. İlk iki adımını kayıtları iki giriş akışlarından seçmeniz yeterlidir.The first two steps simply select records from the two input streams.

WITH
Step1 AS (
    SELECT PartitionId,
           TRY_CAST(Medallion AS nvarchar(max)) AS Medallion,
           TRY_CAST(HackLicense AS nvarchar(max)) AS HackLicense,
           VendorId,
           TRY_CAST(PickupTime AS datetime) AS PickupTime,
           TripDistanceInMiles
    FROM [TaxiRide] PARTITION BY PartitionId
),
Step2 AS (
    SELECT PartitionId,
           medallion AS Medallion,
           hack_license AS HackLicense,
           vendor_id AS VendorId,
           TRY_CAST(pickup_datetime AS datetime) AS PickupTime,
           tip_amount AS TipAmount
    FROM [TaxiFare] PARTITION BY PartitionId
),

Sonraki adım her akıştan eşleşen kayıtları seçmek için iki giriş akışları birleştirir.The next step joins the two input streams to select matching records from each stream.

Step3 AS (
  SELECT
         tr.Medallion,
         tr.HackLicense,
         tr.VendorId,
         tr.PickupTime,
         tr.TripDistanceInMiles,
         tf.TipAmount
    FROM [Step1] tr
    PARTITION BY PartitionId
    JOIN [Step2] tf PARTITION BY PartitionId
      ON tr.Medallion = tf.Medallion
     AND tr.HackLicense = tf.HackLicense
     AND tr.VendorId = tf.VendorId
     AND tr.PickupTime = tf.PickupTime
     AND tr.PartitionId = tf.PartitionId
     AND DATEDIFF(minute, tr, tf) BETWEEN 0 AND 15
)

Bu sorgu, eşleşen kayıtları (Medallion, HackLicense, VendorId ve PickupTime) benzersiz şekilde tanımlayan alanların kümesinde kayıtları birleştirir.This query joins records on a set of fields that uniquely identify matching records (Medallion, HackLicense, VendorId, and PickupTime). JOIN Deyimi bölüm kimliğini de içerirThe JOIN statement also includes the partition ID. Belirtildiği gibi bu eşleşen kayıtları Bu senaryoda, her zaman aynı bölüm kimliği olan yararlanır.As mentioned, this takes advantage of the fact that matching records always have the same partition ID in this scenario.

Stream Analytics'te birleştirmeler olan zamana bağlı, belirli bir zaman penceresi içinde anlamı kayıtlar birleştirilir.In Stream Analytics, joins are temporal, meaning records are joined within a particular window of time. Aksi takdirde, işi için bir eşleşme süresiz beklemesi gerekebilir.Otherwise, the job might need to wait indefinitely for a match. DATEDIFF işlevini belirten bir eşleşme için ne kadar iki eşleşen kayıtları sürede ayrılabilir.The DATEDIFF function specifies how far two matching records can be separated in time for a match.

Son adımda iş başına mil, 5 dakikalık bir atlamalı pencere göre gruplanmış ortalama ipucu hesaplar.The last step in the job computes the average tip per mile, grouped by a hopping window of 5 minutes.

SELECT System.Timestamp AS WindowTime,
       SUM(tr.TipAmount) / SUM(tr.TripDistanceInMiles) AS AverageTipPerMile
  INTO [TaxiDrain]
  FROM [Step3] tr
  GROUP BY HoppingWindow(Duration(minute, 5), Hop(minute, 1))

Stream Analytics sağlayan çeşitli Pencereleme işlevleri.Stream Analytics provides several windowing functions. Atlayan bir atlamalı pencere ileri zamanında sabit bir nokta, bu durumda 1 dakika başına atlama taşır.A hopping window moves forward in time by a fixed period, in this case 1 minute per hop. Son 5 dakika boyunca hareketli ortalamanın hesaplanacağı sonucudur.The result is to calculate a moving average over the past 5 minutes.

Burada gösterilen mimaride, Stream Analytics işi sonuçlarını yalnızca Cosmos DB'ye kaydedilir.In the architecture shown here, only the results of the Stream Analytics job are saved to Cosmos DB. Büyük veri senaryo için de kullanmayı Event Hubs yakalama ham olay verileri Azure Blob depolama alanına kaydedin.For a big data scenario, consider also using Event Hubs Capture to save the raw event data into Azure Blob storage. Ham verileri tutma verilerden yeni içgörüler türetmek için daha sonra toplu işlem sorguları geçmiş verileriniz üzerinde çalıştırmanıza izin verir.Keeping the raw data will allow you to run batch queries over your historical data at later time, in order to derive new insights from the data.

Ölçeklenebilirlik konusunda dikkat edilmesi gerekenlerScalability considerations

Event HubsEvent Hubs

Event hubs işleme kapasitesi ölçülür üretilen iş birimleri.The throughput capacity of Event Hubs is measured in throughput units. Otomatik ölçeklendirme bir olay hub'ı etkinleştirerek yapabilecekleriniz otomatik şişme, hangi otomatik olarak ölçeklenen trafiği, yapılandırılmış en temel üretilen iş birimleri.You can autoscale an event hub by enabling auto-inflate, which automatically scales the throughput units based on traffic, up to a configured maximum.

Stream AnalyticsStream Analytics

Stream Analytics için bir işlem için ayrılan işlem kaynakları akış birimleriyle ölçülür.For Stream Analytics, the computing resources allocated to a job are measured in Streaming Units. İş paralelleştirilebilir, Stream Analytics işlerini en iyi şekilde ölçeklendirin.Stream Analytics jobs scale best if the job can be parallelized. Bu şekilde, Stream Analytics işi birden çok işlem düğümleri arasında dağıtabilirsiniz.That way, Stream Analytics can distribute the job across multiple compute nodes.

Event Hubs giriş için PARTITION BY Stream Analytics işi bölümlemek için anahtar sözcüğü.For Event Hubs input, use the PARTITION BY keyword to partition the Stream Analytics job. Verileri Event Hubs bölüme göre alt kümeler halinde bölünür.The data will be divided into subsets based on the Event Hubs partitions.

Pencereleme işlevleri ve zamana bağlı birleştirmeler ek SU gerektirir.Windowing functions and temporal joins require additional SU. Mümkün olduğunda, kullanın PARTITION BY böylece her bölüm ayrı ayrı işlenir.When possible, use PARTITION BY so that each partition is processed separately. Daha fazla bilgi için anlayın ve akış birimi Ayarla.For more information, see Understand and adjust Streaming Units.

Tüm Stream Analytics işi paralel hale getirmek mümkün değilse, işi bir veya daha fazla paralel adımlarla işe koyulmadan birden çok adım bölüneceği deneyin.If it's not possible to parallelize the entire Stream Analytics job, try to break the job into multiple steps, starting with one or more parallel steps. Bu şekilde, ilk adım paralel olarak çalıştırılabilir.That way, the first steps can run in parallel. Örneğin, bu başvuru mimarisinde:For example, in this reference architecture:

  • Adım 1 ve 2 basit SELECT tek bölüm içindeki kayıtları seçme deyimleri.Steps 1 and 2 are simple SELECT statements that select records within a single partition.
  • 3. adım, iki giriş akışları arasında bölümlenmiş bir birleşim gerçekleştirir.Step 3 performs a partitioned join across two input streams. Bu adım, eşleşen kayıtları aynı bölüm anahtarına paylaşın ve bu nedenle aynı bölüm kimliği her giriş akışında sahip olacağı garanti edilir olgu avantajlarından yararlanır.This step takes advantage of the fact that matching records share the same partition key, and so are guaranteed to have the same partition ID in each input stream.
  • Tüm bölümlerin 4. adım toplar.Step 4 aggregates across all of the partitions. Bu adım paralel hale.This step cannot be parallelized.

Stream Analytics'i iş diyagramı kaç bölümleri için işin her adımda atandığını görmek için.Use the Stream Analytics job diagram to see how many partitions are assigned to each step in the job. Bu başvuru mimarisi için iş diyagramında Aşağıdaki diyagramda gösterilmiştir:The following diagram shows the job diagram for this reference architecture:

İş diyagramı

Cosmos DBCosmos DB

Cosmos DB için aktarım hızı kapasitesine ölçülür istek birimi (RU).Throughput capacity for Cosmos DB is measured in Request Units (RU). Bir Cosmos DB kapsayıcısında 10.000 geçmiş ölçeklendirmek için RU, belirtmelisiniz bir bölüm anahtarı oluşturduğunuzda, kapsayıcı ve her belgede bölüm anahtarı içerir.In order to scale a Cosmos DB container past 10,000 RU, you must specify a partition key when you create the container, and include the partition key in every document.

Bu başvuru mimarisinde, yeni belgeler yalnızca bir kez başına oluşturulan aktarım hızı gereksinimleri oldukça düşük olacak şekilde dakika (atlamalı pencere aralığı).In this reference architecture, new documents are created only once per minute (the hopping window interval), so the throughput requirements are quite low. Bu nedenle, bu senaryoda bir bölüm anahtarı atamak için gerek yoktur.For that reason, there's no need to assign a partition key in this scenario.

İzleme konularıMonitoring considerations

Çözüm işleme herhangi bir akışı ile performans ve sistem durumunu izlemek önemlidir.With any stream processing solution, it's important to monitor the performance and health of the system. Azure İzleyici mimarisinde kullanılan Azure Hizmetleri için ölçümleri ve tanılama günlüklerini toplar.Azure Monitor collects metrics and diagnostics logs for the Azure services used in the architecture. Azure İzleyici, Azure platformundaki yerleşik olarak bulunur ve uygulamanızdaki herhangi bir ek kod gerektirmez.Azure Monitor is built into the Azure platform and does not require any additional code in your application.

Aşağıdaki uyarı sinyaller birini, ilgili Azure kaynağı kullanıma ölçeklendirilmesi gereken belirtin:Any of the following warning signals indicate that you should scale out the relevant Azure resource:

  • Olay hub'ları istekleri kısıtlar veya günlük iletisi kotası yakın.Event Hubs throttles requests or is close to the daily message quota.
  • Stream Analytics işi, tutarlı bir şekilde daha % 80'den, ayrılmış akış birimleri (SU) kullanır.The Stream Analytics job consistently uses more than 80% of allocated Streaming Units (SU).
  • Cosmos DB istek kısıtlama başlar.Cosmos DB begins to throttle requests.

Başvuru mimarisi, Azure portalında dağıtılan özel bir Pano içerir.The reference architecture includes a custom dashboard, which is deployed to the Azure portal. Mimarisini dağıttıktan sonra panoyu açarak görüntüleyebilirsiniz Azure portalı seçerek TaxiRidesDashboard Pano listesinden.After you deploy the architecture, you can view the dashboard by opening the Azure Portal and selecting TaxiRidesDashboard from list of dashboards. Azure portalında özel panolar oluşturma ve dağıtma hakkında daha fazla bilgi için bkz. programlamayla Azure panoları oluşturma.For more information about creating and deploying custom dashboards in the Azure portal, see Programmatically create Azure Dashboards.

Aşağıdaki görüntüde, yaklaşık bir saat için Stream Analytics işi çalıştıktan sonra Pano gösterilmektedir.The following image shows the dashboard after the Stream Analytics job ran for about an hour.

Taksi sürmeye panosunun ekran görüntüsü

Sol panelde, Stream Analytics işine ilişkin SU tüketim ilk 15 dakika yükseliyor ve düzeylerini gösterir.The panel on the lower left shows that the SU consumption for the Stream Analytics job climbs during the first 15 minutes and then levels off. Proje kararlı bir duruma ulaştığında gibi tipik bir desendir.This is a typical pattern as the job reaches a steady state.

Event Hubs üst sağ bölmede gösterilen istekleri azaltmadır dikkat edin.Notice that Event Hubs is throttling requests, shown in the upper right panel. Azaltma hata aldığında, Event Hubs istemci SDK'sı otomatik olarak yeniden denediğinden, bir zaman daraltılmış isteği bir sorun değildir.An occasional throttled request is not a problem, because the Event Hubs client SDK automatically retries when it receives a throttling error. Ancak tutarlı azaltma hataları görüyorsanız, olay hub'ı daha fazla işleme birimi gerekiyor anlamına gelir.However, if you see consistent throttling errors, it means the event hub needs more throughput units. Aşağıdaki grafikte gösterildiği bir test çalıştırması Event Hubs kullanarak otomatik aktarım hızı birimi gerektiği gibi otomatik olarak ölçeklenen özelliğini şişme.The following graph shows a test run using the Event Hubs auto-inflate feature, which automatically scales out the throughput units as needed.

Event Hubs ekran otomatik ölçeklendirme

Otomatik şişme etkinleştirildi 06:35 hakkında işaretleyin.Auto-inflate was enabled at about the 06:35 mark. Event Hubs işleme birimleri en fazla 3 otomatik olarak ölçeği şekilde daraltılmış istekler drop p görebilirsiniz.You can see the p drop in throttled requests, as Event Hubs automatically scaled up to 3 throughput units.

İlginçtir ki, yan etkisi, Stream Analytics işinde SU kullanımını artırmayı vardı.Interestingly, this had the side effect of increasing the SU utilization in the Stream Analytics job. Kısıtlanarak, Event Hubs sınırlarına Stream Analytics işine ilişkin alımı hızının düşürülmesi.By throttling, Event Hubs was artificially reducing the ingestion rate for the Stream Analytics job. Bir performans sorunu çözme başka ortaya çıkarır, aslında yaygındır.It's actually common that resolving one performance bottleneck reveals another. Bu durumda, ek SU ayırma için Stream Analytics işi, sorun çözümlenir.In this case, allocating additional SU for the Stream Analytics job resolved the issue.

Çözümü dağıtmaDeploy the solution

Dağıtma ve çalıştırma başvuru uygulaması için adımları GitHub Benioku.To the deploy and run the reference implementation, follow the steps in the GitHub readme.

Aşağıdaki gözden geçirmek isteyebilirsiniz Azure örnek senaryolar bazı teknolojiler kullanarak özel çözümler göstermektedir:You may wish to review the following Azure example scenarios that demonstrate specific solutions using some of the same technologies: