Criar um pipeline de processamento de fluxo com o Azure DatabricksCreate a stream processing pipeline with Azure Databricks

Esta arquitetura de referência mostra uma ponto a ponto processamento de fluxo pipeline.This reference architecture shows an end-to-end stream processing pipeline. Esse tipo de pipeline tem quatro fases: ingestão, processo, a loja e a análise e relatórios.This type of pipeline has four stages: ingest, process, store, and analysis and reporting. Para esta arquitetura de referência, o pipeline ingere dados de duas origens, executa uma junção em registros relacionados do cada fluxo, otimiza o resultado e calcula a média em tempo real.For this reference architecture, the pipeline ingests data from two sources, performs a join on related records from each stream, enriches the result, and calculates an average in real time. Os resultados são armazenados para análise adicional.The results are stored for further analysis.

Logótipo do GitHub uma implementação de referência para esta arquitetura está disponível no GitHub.GitHub logo A reference implementation for this architecture is available on GitHub.

Arquitetura de referência para o fluxo de processamento com o Azure Databricks

Cenário: Uma empresa de táxis recolhe dados sobre cada viagem de táxis.Scenario: A taxi company collects data about each taxi trip. Para este cenário, partimos do princípio existem dois dispositivos separados, o envio de dados.For this scenario, we assume there are two separate devices sending data. A táxis tem um medidor que envia informações sobre cada jornada — a duração, distância e localizações de recolha e de redução.The taxi has a meter that sends information about each ride — the duration, distance, and pickup and dropoff locations. Um dispositivo separado aceita pagamentos de clientes e envia dados sobre fares.A separate device accepts payments from customers and sends data about fares. As tendências do spot ridership, o quer da empresa de táxis para calcular a média dica por quilómetro condicionada por em tempo real, para cada ambiente.To spot ridership trends, the taxi company wants to calculate the average tip per mile driven, in real time, for each neighborhood.

ArquiteturaArchitecture

A arquitetura é composta pelos seguintes componentes.The architecture consists of the following components.

Origens de dados.Data sources. Nesta arquitetura, existem duas origens de dados que geram fluxos de dados em tempo real.In this architecture, there are two data sources that generate data streams in real time. O primeiro fluxo contém informações de jornada, e o segundo contém informações de Europeia.The first stream contains ride information, and the second contains fare information. A arquitetura de referência inclui um gerador de dados simulados que lê a partir de um conjunto de ficheiros estáticos e envia os dados para os Hubs de eventos.The reference architecture includes a simulated data generator that reads from a set of static files and pushes the data to Event Hubs. As origens de dados num aplicativo real seria dispositivos instalados em cabs a táxis.The data sources in a real application would be devices installed in the taxi cabs.

Hubs de Eventos do Azure.Azure Event Hubs. Os Hubs de eventos é um serviço de ingestão de eventos.Event Hubs is an event ingestion service. Esta arquitetura utiliza duas instâncias de hub de eventos, um para cada origem de dados.This architecture uses two event hub instances, one for each data source. Cada origem de dados envia um fluxo de dados para o hub de eventos associados.Each data source sends a stream of data to the associated event hub.

O Azure Databricks.Azure Databricks. Databricks é uma plataforma de análise baseada no Apache Spark otimizada para a plataforma de serviços do Microsoft Azure na cloud.Databricks is an Apache Spark-based analytics platform optimized for the Microsoft Azure cloud services platform. Databricks é utilizado para correlacionar da jornada de táxis e se comportarão de dados e também para enriquecer os dados correlacionados com dados armazenados no sistema de ficheiros de Databricks do ambiente.Databricks is used to correlate of the taxi ride and fare data, and also to enrich the correlated data with neighborhood data stored in the Databricks file system.

O cosmos DB.Cosmos DB. A saída da tarefa do Azure Databricks é uma série de registos, que são escritos Cosmos DB com a API Cassandra.The output from Azure Databricks job is a series of records, which are written to Cosmos DB using the Cassandra API. A API de Cassandra é utilizada porque ele oferece suporte a modelação de dados de séries de tempo.The Cassandra API is used because it supports time series data modeling.

Log Analytics do Azure.Azure Log Analytics. Dados de registo de aplicação recolhidos pelo do Azure Monitor são armazenados num área de trabalho do Log Analytics.Application log data collected by Azure Monitor is stored in a Log Analytics workspace. Consultas de análise de registo podem ser utilizadas para analisar e visualizar métricas e inspecionar as mensagens de registo para identificar problemas na aplicação.Log Analytics queries can be used to analyze and visualize metrics and inspect log messages to identify issues within the application.

Ingestão de dadosData ingestion

Para simular uma origem de dados, esta arquitetura de referência utiliza o dados de táxis da cidade de Nova Iorque conjunto de dados[1].To simulate a data source, this reference architecture uses the New York City Taxi Data dataset[1]. Este conjunto de dados contém dados sobre viagens de táxis de nova York durante um período de quatro anos (2010 – 2013).This dataset contains data about taxi trips in New York City over a four-year period (2010 – 2013). Ele contém dois tipos de registo: Enfrente os dados e se comportarão de dados.It contains two types of record: Ride data and fare data. Dados de jornada incluem a duração da viagem, a distância de viagem e a localização de recolha e de redução.Ride data includes trip duration, trip distance, and pickup and dropoff location. Dados de Europeia incluem Europeia, o imposto e o tip quantidades.Fare data includes fare, tax, and tip amounts. Campos comuns em ambos os tipos de registo incluem o número de medallion, licença de acesso e ID do fornecedor.Common fields in both record types include medallion number, hack license, and vendor ID. Em conjunto estes três campos de identificam exclusivamente um táxi além de um driver.Together these three fields uniquely identify a taxi plus a driver. Os dados são armazenados no formato CSV.The data is stored in CSV format.

[1] Donovan, Brian; Work, Dan (2016): Dados de viagens de táxis da cidade de nova York (2010-2013).[1] Donovan, Brian; Work, Dan (2016): New York City Taxi Trip Data (2010-2013). Universidade de Illinois em Urbana Champaign.University of Illinois at Urbana-Champaign. https://doi.org/10.13012/J8PN93H8

O gerador de dados é uma aplicação .NET Core que lê os registos e as envia para os Hubs de eventos do Azure.The data generator is a .NET Core application that reads the records and sends them to Azure Event Hubs. O gerador de envia os dados de jornada em dados Europeia formato JSON no formato CSV.The generator sends ride data in JSON format and fare data in CSV format.

Os Event Hubs utilizam partições para segmentar os dados.Event Hubs uses partitions to segment the data. As partições permitem um consumidor ler cada partição em paralelo.Partitions allow a consumer to read each partition in parallel. Quando envia dados para os Hubs de eventos, pode especificar explicitamente a chave de partição.When you send data to Event Hubs, you can specify the partition key explicitly. Caso contrário, os registos são atribuídos a partições em rodízio.Otherwise, records are assigned to partitions in round-robin fashion.

Neste cenário, é garantida dados e dados de Europeia devem acabar com o mesmo ID de partição para um cab de táxis determinado.In this scenario, ride data and fare data should end up with the same partition ID for a given taxi cab. Isto permite que o Databricks aplicar um grau de paralelismo quando ele correlaciona os dois fluxos.This enables Databricks to apply a degree of parallelism when it correlates the two streams. Um registo numa partição n de simultaneamente dados corresponderá um registo numa partição n dos dados Europeia.A record in partition n of the ride data will match a record in partition n of the fare data.

Diagrama de processamento com o Azure Databricks e os Hubs de eventos de fluxo

O gerador de dados, o modelo de dados comum para ambos os tipos de registo tem um PartitionKey propriedade que é a concatenação de Medallion, HackLicense, e VendorId.In the data generator, the common data model for both record types has a PartitionKey property that is the concatenation of Medallion, HackLicense, and VendorId.

public abstract class TaxiData
{
    public TaxiData()
    {
    }

    [JsonProperty]
    public long Medallion { get; set; }

    [JsonProperty]
    public long HackLicense { get; set; }

    [JsonProperty]
    public string VendorId { get; set; }

    [JsonProperty]
    public DateTimeOffset PickupTime { get; set; }

    [JsonIgnore]
    public string PartitionKey
    {
        get => $"{Medallion}_{HackLicense}_{VendorId}";
    }

Esta propriedade é utilizada para fornecer uma chave de partição explícita ao enviar para os Hubs de eventos:This property is used to provide an explicit partition key when sending to Event Hubs:

using (var client = pool.GetObject())
{
    return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
        t.GetData(dataFormat))), t.PartitionKey);
}

Hubs de EventosEvent Hubs

A capacidade de débito dos Hubs de eventos é medida em unidades de débito.The throughput capacity of Event Hubs is measured in throughput units. Pode dimensionar automaticamente um hub de eventos, ativando ampliação automática, que dimensiona automaticamente as unidades de débito com base no tráfego, até um máximo configurado.You can autoscale an event hub by enabling auto-inflate, which automatically scales the throughput units based on traffic, up to a configured maximum.

Processamento de fluxosStream processing

No Azure Databricks, processamento de dados é executado uma tarefa.In Azure Databricks, data processing is performed by a job. A tarefa é atribuída ao e é executado num cluster.The job is assigned to and runs on a cluster. A tarefa podem ser código personalizado escrito em Java ou Spark bloco de notas.The job can either be custom code written in Java, or a Spark notebook.

Nesta arquitetura de referência, a tarefa é um arquivo de Java com classes escritas no Scala e Java.In this reference architecture, the job is a Java archive with classes written in both Java and Scala. Ao especificar o arquivo de Java para um trabalho do Databricks, a classe é especificada para execução por cluster do Databricks.When specifying the Java archive for a Databricks job, the class is specified for execution by the Databricks cluster. Aqui, o principal método o com.microsoft.pnp.TaxiCabReader classe contém a lógica de processamento de dados.Here, the main method of the com.microsoft.pnp.TaxiCabReader class contains the data processing logic.

Ler o fluxo das instâncias do hub de eventos de doisReading the stream from the two event hub instances

A lógica de processamento de dados utiliza Spark transmissão em fluxo estruturada para ler a partir as duas instâncias do hub de eventos do Azure:The data processing logic uses Spark structured streaming to read from the two Azure event hub instances:

val rideEventHubOptions = EventHubsConf(rideEventHubConnectionString)
      .setConsumerGroup(conf.taxiRideConsumerGroup())
      .setStartingPosition(EventPosition.fromStartOfStream)
    val rideEvents = spark.readStream
      .format("eventhubs")
      .options(rideEventHubOptions.toMap)
      .load

    val fareEventHubOptions = EventHubsConf(fareEventHubConnectionString)
      .setConsumerGroup(conf.taxiFareConsumerGroup())
      .setStartingPosition(EventPosition.fromStartOfStream)
    val fareEvents = spark.readStream
      .format("eventhubs")
      .options(fareEventHubOptions.toMap)
      .load

Enriquecer os dados com as informações da VizinhançaEnriching the data with the neighborhood information

Os dados da jornada incluem as coordenadas de latitude e longitude de escolher com a cópia de segurança e entregar localizações.The ride data includes the latitude and longitude coordinates of the pick up and drop off locations. Embora essas coordenadas sejam úteis, eles não são facilmente consumidos para análise.While these coordinates are useful, they are not easily consumed for analysis. Por conseguinte, estes dados são enriquecidos com dados de ambiente que são lidos a partir de um ficheiro de formas.Therefore, this data is enriched with neighborhood data that is read from a shapefile.

O formato de ficheiro de formas é binário e não facilmente analisado, mas a GeoTools biblioteca fornece ferramentas para dados geoespaciais que utilizam o formato de ficheiro de formas.The shapefile format is binary and not easily parsed, but the GeoTools library provides tools for geospatial data that use the shapefile format. Esta biblioteca é utilizada na com.microsoft.pnp.GeoFinder classe para determinar o nome da vizinhança com base na escolher a cópia de segurança e entregar coordenadas.This library is used in the com.microsoft.pnp.GeoFinder class to determine the neighborhood name based on the pick up and drop off coordinates.

val neighborhoodFinder = (lon: Double, lat: Double) => {
      NeighborhoodFinder.getNeighborhood(lon, lat).get()
    }

Associar os dados da jornada e EuropeiaJoining the ride and fare data

Em primeiro lugar os dados da jornada e Europeia são transformados:First the ride and fare data is transformed:

    val rides = transformedRides
      .filter(r => {
        if (r.isNullAt(r.fieldIndex("errorMessage"))) {
          true
        }
        else {
          malformedRides.add(1)
          false
        }
      })
      .select(
        $"ride.*",
        to_neighborhood($"ride.pickupLon", $"ride.pickupLat")
          .as("pickupNeighborhood"),
        to_neighborhood($"ride.dropoffLon", $"ride.dropoffLat")
          .as("dropoffNeighborhood")
      )
      .withWatermark("pickupTime", conf.taxiRideWatermarkInterval())

    val fares = transformedFares
      .filter(r => {
        if (r.isNullAt(r.fieldIndex("errorMessage"))) {
          true
        }
        else {
          malformedFares.add(1)
          false
        }
      })
      .select(
        $"fare.*",
        $"pickupTime"
      )
      .withWatermark("pickupTime", conf.taxiFareWatermarkInterval())

E, em seguida, os dados da jornada são associados os dados Europeia:And then the ride data is joined with the fare data:

val mergedTaxiTrip = rides.join(fares, Seq("medallion", "hackLicense", "vendorId", "pickupTime"))

Processar os dados e a inserir no Cosmos DBProcessing the data and inserting into Cosmos DB

A quantidade de Europeia médio para cada ambiente é calculada para um determinado intervalo:The average fare amount for each neighborhood is calculated for a given time interval:

val maxAvgFarePerNeighborhood = mergedTaxiTrip.selectExpr("medallion", "hackLicense", "vendorId", "pickupTime", "rateCode", "storeAndForwardFlag", "dropoffTime", "passengerCount", "tripTimeInSeconds", "tripDistanceInMiles", "pickupLon", "pickupLat", "dropoffLon", "dropoffLat", "paymentType", "fareAmount", "surcharge", "mtaTax", "tipAmount", "tollsAmount", "totalAmount", "pickupNeighborhood", "dropoffNeighborhood")
      .groupBy(window($"pickupTime", conf.windowInterval()), $"pickupNeighborhood")
      .agg(
        count("*").as("rideCount"),
        sum($"fareAmount").as("totalFareAmount"),
        sum($"tipAmount").as("totalTipAmount")
      )
      .select($"window.start", $"window.end", $"pickupNeighborhood", $"rideCount", $"totalFareAmount", $"totalTipAmount")

Em seguida, que é inserido do Cosmos DB:Which is then inserted into Cosmos DB:

maxAvgFarePerNeighborhood
      .writeStream
      .queryName("maxAvgFarePerNeighborhood_cassandra_insert")
      .outputMode(OutputMode.Append())
      .foreach(new CassandraSinkForeach(connector))
      .start()
      .awaitTermination()

Considerações de segurançaSecurity considerations

Acesso à área de trabalho de base de dados do Azure é controlado com o consola de administrador.Access to the Azure Database workspace is controlled using the administrator console. A consola do administrador inclui funcionalidades para adicionar utilizadores, gerir permissões de utilizador e configurar o início de sessão único.The administrator console includes functionality to add users, manage user permissions, and set up single sign-on. Controlo de acesso para áreas de trabalho, clusters, tarefas e tabelas também pode ser definido através da consola de administrador.Access control for workspaces, clusters, jobs, and tables can also be set through the administrator console.

Gerir segredosManaging secrets

O Azure Databricks inclui um arquivo de segredos que é utilizada para armazenar segredos, incluindo cadeias de ligação, as chaves de acesso, os nomes de utilizador e palavras-passe.Azure Databricks includes a secret store that is used to store secrets, including connection strings, access keys, user names, and passwords. Segredos num arquivo de segredos Azure Databricks estão particionados pela âmbitos:Secrets within the Azure Databricks secret store are partitioned by scopes:

databricks secrets create-scope --scope "azure-databricks-job"

Segredos são adicionados ao nível do âmbito:Secrets are added at the scope level:

databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"

Nota

Um âmbito de segurança do Azure Key Vault pode ser utilizado em vez do âmbito nativo do Azure Databricks.An Azure Key Vault-backed scope can be used instead of the native Azure Databricks scope. Para obter mais informações, consulte âmbitos de segurança do Azure Key Vault.To learn more, see Azure Key Vault-backed scopes.

No código, os segredos são acedidos através do Azure Databricks utilitários de segredos.In code, secrets are accessed via the Azure Databricks secrets utilities.

Considerações sobre a monitorizaçãoMonitoring considerations

O Azure Databricks baseia-se no Apache Spark e ambas utilizam log4j como biblioteca padrão para o registo.Azure Databricks is based on Apache Spark, and both use log4j as the standard library for logging. Além do registo predefinido fornecido pelo Apache Spark, esta arquitetura de referência envia registos e métricas para do Azure Log Analytics.In addition to the default logging provided by Apache Spark, this reference architecture sends logs and metrics to Azure Log Analytics.

O com.microsoft.pnp.TaxiCabReader classe configura o sistema de registo do Apache Spark para enviar os respetivos registos para o Azure Log Analytics, utilizando os valores na log4j.properties ficheiro.The com.microsoft.pnp.TaxiCabReader class configures the Apache Spark logging system to send its logs to Azure Log Analytics using the values in the log4j.properties file. Enquanto as mensagens de registo do Apache Spark são cadeias de caracteres, o Azure Log Analytics requer as mensagens de registo esteja formatado como JSON.While the Apache Spark logger messages are strings, Azure Log Analytics requires log messages to be formatted as JSON. O com.microsoft.pnp.log4j.LogAnalyticsAppender classe transforma essas mensagens para JSON:The com.microsoft.pnp.log4j.LogAnalyticsAppender class transforms these messages to JSON:


    @Override
    protected void append(LoggingEvent loggingEvent) {
        if (this.layout == null) {
            this.setLayout(new JSONLayout());
        }

        String json = this.getLayout().format(loggingEvent);
        try {
            this.client.send(json, this.logType);
        } catch(IOException ioe) {
            LogLog.warn("Error sending LoggingEvent to Log Analytics", ioe);
        }
    }

Como o com.microsoft.pnp.TaxiCabReader classe processa mensagens de jornada e Europeia, é possível se alguma um pode ser um formato incorreto e, portanto, não é válido.As the com.microsoft.pnp.TaxiCabReader class processes ride and fare messages, it's possible that either one may be malformed and therefore not valid. Num ambiente de produção, é importante analisar essas mensagens com formato incorreto para identificar um problema com as origens de dados para que possa ser corrigida rapidamente para evitar a perda de dados.In a production environment, it's important to analyze these malformed messages to identify a problem with the data sources so it can be fixed quickly to prevent data loss. O com.microsoft.pnp.TaxiCabReader classe registra um acumulador de Spark do Apache que controle o número de um formato incorreto Europeia e registos de jornada:The com.microsoft.pnp.TaxiCabReader class registers an Apache Spark Accumulator that keeps track of the number of malformed fare and ride records:

    @transient val appMetrics = new AppMetrics(spark.sparkContext)
    appMetrics.registerGauge("metrics.malformedrides", AppAccumulators.getRideInstance(spark.sparkContext))
    appMetrics.registerGauge("metrics.malformedfares", AppAccumulators.getFareInstance(spark.sparkContext))
    SparkEnv.get.metricsSystem.registerSource(appMetrics)

Apache Spark utiliza a biblioteca de Dropwizard para enviar métricas e alguns dos campos de métricas de Dropwizard nativos são incompatíveis com o Azure Log Analytics.Apache Spark uses the Dropwizard library to send metrics, and some of the native Dropwizard metrics fields are incompatible with Azure Log Analytics. Por conseguinte, esta arquitetura de referência inclui um sink de Dropwizard personalizado e o gerador de relatórios.Therefore, this reference architecture includes a custom Dropwizard sink and reporter. Formata as métricas no formato esperado pelo Azure Log Analytics.It formats the metrics in the format expected by Azure Log Analytics. Quando as métricas de relatórios do Apache Spark, as métricas personalizadas para os dados com formato incorreto da jornada e Europeia também são enviadas.When Apache Spark reports metrics, the custom metrics for the malformed ride and fare data are also sent.

A última métrica de ter sessão iniciada para a área de trabalho do Log Analytics do Azure é o progresso cumulativo do progresso da tarefa de transmissão estruturada do Spark.The last metric to be logged to the Azure Log Analytics workspace is the cumulative progress of the Spark Structured Streaming job progress. Isso é feito usando um StreamingQuery personalizado ouvinte implementado no com.microsoft.pnp.StreamingMetricsListener classe.This is done using a custom StreamingQuery listener implemented in the com.microsoft.pnp.StreamingMetricsListener class. Esta classe está registada para a sessão do Apache Spark, quando a execução da tarefa:This class is registered to the Apache Spark Session when the job runs:

spark.streams.addListener(new StreamingMetricsListener())

Os métodos no StreamingMetricsListener são chamados pelo runtime do Apache Spark sempre que ocorre um evento de combinação estruturado, enviar mensagens e as métricas de registo para a área de trabalho do Log Analytics do Azure.The methods in the StreamingMetricsListener are called by the Apache Spark runtime whenever a structured steaming event occurs, sending log messages and metrics to the Azure Log Analytics workspace. Pode utilizar as seguintes consultas na sua área de trabalho para monitorizar a aplicação:You can use the following queries in your workspace to monitor the application:

Latência e débito para consultas de transmissão em fluxoLatency and throughput for streaming queries

taxijob_CL
| where TimeGenerated > startofday(datetime(<date>)) and TimeGenerated < endofday(datetime(<date>))
| project  mdc_inputRowsPerSecond_d, mdc_durationms_triggerExecution_d
| render timechart

Exceções registadas durante a execução de consulta do streamExceptions logged during stream query execution

taxijob_CL
| where TimeGenerated > startofday(datetime(<date>)) and TimeGenerated < endofday(datetime(<date>))
| where Level contains "Error"

Acúmulo de um formato incorreto Europeia e dados da jornadaAccumulation of malformed fare and ride data

SparkMetric_CL
| where TimeGenerated > startofday(datetime(<date>)) and TimeGenerated < endofday(datetime(<date>))
| render timechart
| where name_s contains "metrics.malformedrides"

SparkMetric_CL
| where TimeGenerated > startofday(datetime(<date>)) and TimeGenerated < endofday(datetime(<date>))
| render timechart
| where name_s contains "metrics.malformedfares"

Execução da tarefa para a resiliência de rastreioJob execution to trace resiliency

SparkMetric_CL
| where TimeGenerated > startofday(datetime(<date>)) and TimeGenerated < endofday(datetime(<date>))
| render timechart
| where name_s contains "driver.DAGScheduler.job.allJobs"

Para obter mais informações, consulte monitorização do Azure Databricks.For more information, see Monitoring Azure Databricks.

Implementar a soluçãoDeploy the solution

Para a implementar e executar a implementação de referência, siga os passos a Leiame do GitHub.To the deploy and run the reference implementation, follow the steps in the GitHub readme.