Modifier

Traitement de flux de données avec Azure Databricks

Azure Cosmos DB
Azure Databricks
Hubs d'événements Azure
Azure Log Analytics
Azure Monitor

Cette architecture de référence présente un pipeline de traitement de flux de bout en bout. Ce type de pipeline comprend quatre étapes : ingestion, traitement, stockage, analyse et création de rapports. Pour cette architecture de référence, le pipeline ingère des données issues de deux sources, effectue une jonction sur les enregistrements apparentés de chaque flux, enrichit le résultat et calcule une moyenne en temps réel. Les résultats sont stockés en vue d’une analyse plus approfondie.

GitHub logo Une implémentation de référence de cette architecture est disponible sur GitHub.

Architecture

Diagram showing a reference architecture for stream processing with Azure Databricks.

Téléchargez un fichier Visio de cette architecture.

Workflow

L’architecture est constituée des composants suivants :

Sources de données. Dans cette architecture, deux sources de données génèrent des flux de données en temps réel. Le premier flux de données contient des informations sur les courses, tandis que le second contient des informations sur les tarifs. L’architecture de référence comprend un générateur de données simulées qui lit le contenu d’un ensemble de fichiers statiques et envoie (push) les données vers Event Hubs. Les sources de données dans une application réelle seraient les appareils installés dans les taxis.

Azure Event Hubs. Event Hubs est un service d’ingestion d’événements. Cette architecture utilise deux instances d’Event Hub, à savoir une par source de données. Chaque source de données envoie un flux de données à l’Event Hub associé.

Azure Databricks. Databricks est une plateforme d’analytique basée sur Apache Spark et optimisée pour la plateforme de services cloud Microsoft Azure. Databricks est utilisé pour mettre en corrélation les données de courses et de tarifs des taxis, mais aussi pour enrichir les données mises en corrélation avec les données de quartiers stockées dans le système de fichiers Databricks.

Azure Cosmos DB. La sortie d’un travail Azure Databricks est une série d’enregistrements écrits dans Azure Cosmos DB for Apache Cassandra. Azure Cosmos DB for Apache Cassandra est utilisé, car il prend en charge la modélisation des données de série chronologique.

  • Azure Synapse Link pour Azure Cosmos DB vous permet d’effectuer de l’analytique en quasi temps réel sur les données opérationnelles dans Azure Cosmos DB, sans aucun impact en termes de perfomances ou de coûts sur votre charge de travail transactionnelle, en utilisant les deux moteurs d’analytique disponibles depuis votre espace de travail Azure Synapse : SQL Serverless et des pools Spark.

Azure Log Analytics. Les données de journal d’application collectées par Azure Monitor sont stockées dans un espace de travail Log Analytics. Il est possible d’utiliser des requêtes Log Analytics pour analyser et visualiser les métriques et inspecter les messages de journal pour identifier les problèmes au sein de l’application.

Autres solutions

  • Synapse Link est la solution Microsoft par défaut pour l’analyse des données Azure Cosmos DB.

Détails du scénario

Scénario : Une compagnie de taxis collecte des données sur chaque trajet effectué en taxi. Pour ce scénario, nous partons du principe que deux périphériques distincts envoient des données. Le taxi est équipé d’un compteur qui envoie les informations suivantes sur chaque course : durée, distance et lieux de prise en charge et de dépose. Un autre périphérique accepte les paiements des clients et envoie des données sur les tarifs. Afin de déterminer les tendances des usagers, la compagnie de taxis souhaite calculer le pourboire moyen par mile parcouru, en temps réel, pour chaque quartier.

Cas d’usage potentiels

Cette solution est optimisée pour le secteur de la vente au détail.

Ingestion de données

Pour simuler une source de données, cette architecture de référence utilise les données des taxis de la ville de New York[1]. Ce jeu de données contient des données sur les courses de taxis à New York sur une période de quatre ans (2010 – 2013). Il contient deux types d’enregistrement : les données sur les courses et les données sur les tarifs. Les premières incluent la durée du trajet, la distance et les lieux de prise en charge et de dépose. Les secondes incluent le montant des tarifs des courses, des taxes et des pourboires. Les champs communs aux deux types d’enregistrement sont le numéro de médaillon (« taxi jaune »), le permis spécial et l’ID fournisseur. Ensemble ces trois champs identifient un taxi ainsi qu’un chauffeur. Les données sont stockées au format CSV.

[1] Donovan, Brian; Work, Dan (2016) : Données de trajet des taxis de New York (2010-2013). Université de l’Illinois, Urbana-Champaign. https://doi.org/10.13012/J8PN93H8

Le générateur de données est une application .NET Core qui lit les enregistrements et les envoie à Azure Event Hubs. Le générateur envoie les données des courses au format JSON et les données relatives aux tarifs au format CSV.

Le service Event Hubs utilise des partitions pour segmenter les données. Ce système de partition permet à un consommateur de lire chaque partition en parallèle. Lorsque vous envoyez des données à Event Hubs, vous pouvez spécifier explicitement la clé de partition. Sinon, les enregistrements sont affectés aux partitions de manière alternée.

Dans ce scénario, les données de courses et de tarifs doivent se retrouver avec le même ID de partition pour un taxi donné. Cela permet à Databricks d’appliquer un degré de parallélisme quand il met en corrélation les deux flux. Un enregistrement dans la partition n des données des courses correspond à un enregistrement de la partition n des données relatives aux tarifs.

Diagram of stream processing with Azure Databricks and Event Hubs.

Téléchargez un fichier Visio de cette architecture.

Dans le générateur de données, le modèle de données commun pour les deux types d’enregistrement comprend une propriété PartitionKey, qui est la concaténation de Medallion, HackLicense et VendorId.

public abstract class TaxiData
{
    public TaxiData()
    {
    }

    [JsonProperty]
    public long Medallion { get; set; }

    [JsonProperty]
    public long HackLicense { get; set; }

    [JsonProperty]
    public string VendorId { get; set; }

    [JsonProperty]
    public DateTimeOffset PickupTime { get; set; }

    [JsonIgnore]
    public string PartitionKey
    {
        get => $"{Medallion}_{HackLicense}_{VendorId}";
    }

Cette propriété est utilisée pour fournir une clé de partition explicite lors de l’envoi des données vers Event Hubs :

using (var client = pool.GetObject())
{
    return client.Value.SendAsync(new EventData(Encoding.UTF8.GetBytes(
        t.GetData(dataFormat))), t.PartitionKey);
}

Event Hubs

La capacité de débit du service Event Hubs est mesurée par les unités de débit. Vous pouvez mettre automatiquement à l’échelle un Event Hub en activant l’augmentation automatique, qui ajuste automatiquement les unités de débit en fonction du trafic, jusqu’à la limite configurée.

Traitement des flux de données

Dans Azure Databricks, le traitement des données est assuré par un travail. Le travail est attribué à un cluster, qui en assure l’exécution. Le travail peut être du code personnalisé écrit en Java ou un notebook Spark.

Dans cette architecture de référence, le travail est une archive Java avec des classes écrites en Java et Scala. Au moment de spécifier l’archive Java pour un travail Databricks, la classe doit être spécifiée en vue d’une exécution par le cluster Databricks. Ici, la méthode main de la classe com.microsoft.pnp.TaxiCabReader contient la logique de traitement des données.

Lecture du flux à partir des deux instances de hub d’événements

La logique de traitement des données utilise Spark Structured Streaming pour lire dans les deux instances de hub d’événements Azure :

val rideEventHubOptions = EventHubsConf(rideEventHubConnectionString)
      .setConsumerGroup(conf.taxiRideConsumerGroup())
      .setStartingPosition(EventPosition.fromStartOfStream)
    val rideEvents = spark.readStream
      .format("eventhubs")
      .options(rideEventHubOptions.toMap)
      .load

    val fareEventHubOptions = EventHubsConf(fareEventHubConnectionString)
      .setConsumerGroup(conf.taxiFareConsumerGroup())
      .setStartingPosition(EventPosition.fromStartOfStream)
    val fareEvents = spark.readStream
      .format("eventhubs")
      .options(fareEventHubOptions.toMap)
      .load

Enrichissement des données avec les informations de quartiers

Les données de courses comprennent les coordonnées de latitude et de longitude des lieux de prise en charge et de dépôt des clients. Même si ces coordonnées sont utiles, elles ne sont pas facilement exploitables pour l’analyse. Par conséquent, ces données sont enrichies avec les données de quartiers lues dans un fichier de forme.

De format binaire, le fichier de forme ne s’analyse pas facilement, mais la bibliothèque GeoTools propose des outils pour les données géospatiales qui utilisent le format de fichier de forme. Cette bibliothèque est utilisée dans la classe com.microsoft.pnp.GeoFinder pour déterminer le nom du quartier en fonction des coordonnées de prise en charge et de dépôt.

val neighborhoodFinder = (lon: Double, lat: Double) => {
      NeighborhoodFinder.getNeighborhood(lon, lat).get()
    }

Jonction des données de courses et de tarifs

Dans un premier temps, les données de courses et de tarifs sont transformées :

    val rides = transformedRides
      .filter(r => {
        if (r.isNullAt(r.fieldIndex("errorMessage"))) {
          true
        }
        else {
          malformedRides.add(1)
          false
        }
      })
      .select(
        $"ride.*",
        to_neighborhood($"ride.pickupLon", $"ride.pickupLat")
          .as("pickupNeighborhood"),
        to_neighborhood($"ride.dropoffLon", $"ride.dropoffLat")
          .as("dropoffNeighborhood")
      )
      .withWatermark("pickupTime", conf.taxiRideWatermarkInterval())

    val fares = transformedFares
      .filter(r => {
        if (r.isNullAt(r.fieldIndex("errorMessage"))) {
          true
        }
        else {
          malformedFares.add(1)
          false
        }
      })
      .select(
        $"fare.*",
        $"pickupTime"
      )
      .withWatermark("pickupTime", conf.taxiFareWatermarkInterval())

Les données de courses sont ensuite jointes aux données de tarifs :

val mergedTaxiTrip = rides.join(fares, Seq("medallion", "hackLicense", "vendorId", "pickupTime"))

Traitement des données et insertion dans Azure Cosmos DB

Le montant du tarif moyen pour chaque quartier est calculé pour un intervalle de temps donné :

val maxAvgFarePerNeighborhood = mergedTaxiTrip.selectExpr("medallion", "hackLicense", "vendorId", "pickupTime", "rateCode", "storeAndForwardFlag", "dropoffTime", "passengerCount", "tripTimeInSeconds", "tripDistanceInMiles", "pickupLon", "pickupLat", "dropoffLon", "dropoffLat", "paymentType", "fareAmount", "surcharge", "mtaTax", "tipAmount", "tollsAmount", "totalAmount", "pickupNeighborhood", "dropoffNeighborhood")
      .groupBy(window($"pickupTime", conf.windowInterval()), $"pickupNeighborhood")
      .agg(
        count("*").as("rideCount"),
        sum($"fareAmount").as("totalFareAmount"),
        sum($"tipAmount").as("totalTipAmount"),
        (sum($"fareAmount")/count("*")).as("averageFareAmount"),
        (sum($"tipAmount")/count("*")).as("averageTipAmount")
      )
      .select($"window.start", $"window.end", $"pickupNeighborhood", $"rideCount", $"totalFareAmount", $"totalTipAmount", $"averageFareAmount", $"averageTipAmount")

Il est ensuite inséré dans Azure Cosmos DB :

maxAvgFarePerNeighborhood
      .writeStream
      .queryName("maxAvgFarePerNeighborhood_cassandra_insert")
      .outputMode(OutputMode.Append())
      .foreach(new CassandraSinkForeach(connector))
      .start()
      .awaitTermination()

Considérations

Ces considérations implémentent les piliers d’Azure Well-Architected Framework qui est un ensemble de principes directeurs qui permettent d’améliorer la qualité d’une charge de travail. Pour plus d'informations, consultez Microsoft Azure Well-Architected Framework.

Sécurité

La sécurité fournit des garanties contre les attaques délibérées, et contre l’utilisation abusive de vos données et systèmes importants. Pour plus d’informations, consultez Vue d’ensemble du pilier Sécurité.

L’accès à l’espace de travail Azure Databricks est contrôlé à l’aide de la console administrateur. La console administrateur comprend une fonctionnalité qui permet d’ajouter des utilisateurs, de gérer les autorisations utilisateur et de configurer l’authentification unique. Le contrôle d’accès pour les espaces de travail, les clusters, les travaux et les tables peut aussi être défini via la console administrateur.

Gestion des secrets

Azure Databricks comprend un magasin des secrets dans lequel sont stockés les secrets, notamment les chaînes de connexion, les clés d’accès, les noms d’utilisateur et les mots de passe. Les secrets contenus dans le magasin des secrets Azure Databricks sont partitionnés par étendues (scope) :

databricks secrets create-scope --scope "azure-databricks-job"

Les secrets sont ajoutés au niveau de l’étendue :

databricks secrets put --scope "azure-databricks-job" --key "taxi-ride"

Notes

Il est possible d’utiliser une étendue adossée à Azure Key Vault à la place de l’étendue Azure Databricks native. Pour en savoir plus, consultez la page relative aux étendues adossées à Azure Key Vault.

Dans le code, les secrets sont accessibles via les utilitaires de secrets Azure Databricks.

Surveillance

Azure Databricks est basé sur Apache Spark et tous deux utilisent log4j comme bibliothèque standard pour la journalisation. En plus de la journalisation par défaut fournie par Apache Spark, vous pouvez implémenter une journalisation dans Azure Log Analytics en procédant de la manière décrite dans l’article Supervision d’Azure Databricks.

Comme la classe com.microsoft.pnp.TaxiCabReader traite les messages relatifs aux courses et aux tarifs, il est possible que l’un des deux soit mal formé et donc non valide. Dans un environnement de production, il est important d’analyser ces messages mal formés pour identifier un problème au niveau des sources de données et le résoudre rapidement pour éviter toute perte de données. La classe com.microsoft.pnp.TaxiCabReader inscrit un accumulateur Apache Spark qui assure le suivi du nombre d’enregistrements relatifs aux courses et aux tarifs mal formés :

    @transient val appMetrics = new AppMetrics(spark.sparkContext)
    appMetrics.registerGauge("metrics.malformedrides", AppAccumulators.getRideInstance(spark.sparkContext))
    appMetrics.registerGauge("metrics.malformedfares", AppAccumulators.getFareInstance(spark.sparkContext))
    SparkEnv.get.metricsSystem.registerSource(appMetrics)

Apache Spark utilise la bibliothèque Dropwizard pour envoyer des métriques, et certains champs de métriques Dropwizard natifs sont incompatibles avec Azure Log Analytics. Par conséquent, cette architecture de référence comprend un récepteur et un rapporteur Dropwizard personnalisés. Elle met en forme les métriques dans le format attendu par Azure Log Analytics. Quand Apache Spark transmet des métriques, les métriques personnalisées pour les données de course et de tarif malformées sont aussi envoyées.

Voici des exemples de requêtes que vous pouvez utiliser dans votre espace de travail Azure Log Analytics pour surveiller l’exécution du travail de diffusion en continu. L’argument ago(1d) dans chaque requête retourne tous les enregistrements générés au cours de la dernière journée. Vous pouvez l’ajuster pour afficher une autre période.

Exceptions journalisées pendant l’exécution de requête de flux

SparkLoggingEvent_CL
| where TimeGenerated > ago(1d)
| where Level == "ERROR"

Accumulation de données de course et de tarif mal formées

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedrides"
| project value_d, TimeGenerated, applicationId_s
| render timechart

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "metrics.malformedfares"
| project value_d, TimeGenerated, applicationId_s
| render timechart

Exécution du travail au fil du temps

SparkMetric_CL
| where TimeGenerated > ago(1d)
| where name_s contains "driver.DAGScheduler.job.allJobs"
| project value_d, TimeGenerated, applicationId_s
| render timechart

Pour plus d'informations, consultez Supervision d'Azure Databricks.

DevOps

  • Créez des groupes de ressources distincts pour les environnements de production, de développement et de test. Des groupes de ressources distincts simplifient la gestion des déploiements, la suppression des déploiements de tests et l’attribution des droits d’accès.

  • Utilisez le modèle Azure Resource Manager pour déployer les ressources Azure en suivant le processus IaC (Infrastructure as Code). Grâce aux modèles, il est plus facile d'automatiser les déploiements à l'aide d'Azure DevOps Services, ou d'autres solutions de CI/CD.

  • Placez chaque charge de travail dans un modèle de déploiement distinct et stockez les ressources dans des systèmes de contrôle de code source. Vous pouvez déployer les modèles ensemble ou individuellement dans le cadre d'un processus CI/CD pour faciliter le processus d'automatisation.

    Dans cette architecture, Azure Event Hubs, Log Analytics et Azure Cosmos DB sont identifiés comme une charge de travail unique. Ces ressources sont incluses dans un même modèle ARM.

  • Envisagez d'échelonner vos charges de travail. Déployez à différentes étapes et effectuez des vérifications de la validation à chaque étape avant de passer à l'étape suivante. Vous pourrez ainsi envoyer (push) les mises à jour de vos environnements de production de manière hautement contrôlée et limiter les problèmes de déploiement imprévus.

    Cette architecture comporte plusieurs étapes de déploiement. Envisagez de créer un pipeline Azure DevOps et d'ajouter ces étapes. Voici quelques exemples d'étapes que vous pouvez automatiser :

    • Démarrer un cluster Databricks
    • Configurer l'interface de ligne de commande Databricks
    • Installer les outils Scala
    • Ajouter les secrets Databricks

    Envisagez également de créer des tests d'intégration automatisés pour améliorer la qualité et la fiabilité du code Databricks et de son cycle de vie.

  • Envisagez d'utiliser Azure Monitor pour analyser les performances de votre pipeline de traitement de flux. Pour plus d'informations, consultez Supervision d'Azure Databricks.

Pour plus d'informations, consultez la section DevOps de Microsoft Azure Well-Architected Framework.

Optimisation des coûts

L’optimisation des coûts consiste à examiner les moyens de réduire les dépenses inutiles et d’améliorer l’efficacité opérationnelle. Pour plus d’informations, consultez Vue d’ensemble du pilier d’optimisation des coûts.

Utiliser la calculatrice de prix Azure pour estimer les coûts. Les considérations suivantes s'appliquent aux services utilisés dans cette architecture de référence.

Event Hubs

Cette architecture de référence déploie Event Hubs au niveau Standard. Le modèle de tarification repose sur les unités de débit, les événements d'entrée et les événements de capture. Un événement d'entrée est une unité de données de 64 Ko ou moins. Les messages plus volumineux sont facturés par multiples de 64 Ko. Vous pouvez spécifier les unités de débit par le biais du portail Azure ou des API de gestion Event Hubs.

Si vous avez besoin de plus de jours de conservation, envisagez le niveau Dedicated. Ce niveau offre des déploiements à un seul locataire répondant aux exigences les plus strictes. Cette offre constitue un cluster basé sur les unités de capacité (CU) qui n'est pas liée par des unités de débit.

Le niveau Standard est également facturé sur la base des événements d'entrée et des unités de débit.

Pour plus d'informations sur la tarification d'Event Hubs, consultez la rubrique Tarification d'Event Hubs.

Azure Databricks

Azure Databricks offre deux niveaux, Standard et Premium, chacun prenant en charge trois charges de travail. Cette architecture de référence déploie l'espace de travail Azure Databricks au niveau Premium.

Les charges de travail Engineering données et Engineering données allégé permettent aux ingénieurs Données de créer et d'exécuter des travaux. La charge de travail Analytique données permet aux scientifiques des données d'explorer, de visualiser, de manipuler et de partager des données et des insights de manière interactive.

Azure Databricks propose de nombreux modèles de tarification.

  • Plan de paiement à l'utilisation

    Vous êtes facturé pour les machines virtuelles configurées en clusters et Unités Databricks (DBU) en fonction de l'instance de machine virtuelle sélectionnée. Une DBU est une unité de capacité de traitement facturée sur la base d'une utilisation par seconde. La consommation de DBU dépend de la taille et du type d'instance qui exécute Azure Databricks. La tarification dépendra de la charge de travail et du niveau sélectionnés.

  • Plan de pré-achat

    Vous vous engagez à utiliser les Unités Azure Databricks (DBU) en tant qu'unités DBCU (Databricks Commit Units) pendant un ou trois ans. Par rapport au modèle de paiement à l'utilisation, vous pouvez économiser jusqu'à 37 %.

Pour plus d'informations, consultez Tarification d'Azure Databricks.

Azure Cosmos DB

Dans cette architecture, une série d’enregistrements est écrite dans Azure Cosmos DB par le travail Azure Databricks. Vous êtes facturé en fonction de la capacité que vous réservez, exprimée en unités de requête par seconde (RU/s), et qui est utilisée pour effectuer des opérations d'insertion. L'unité de facturation est de 100 RU/s par heure. Par exemple, le coût d'écriture d'éléments de 100 Ko est de 50 RU/s.

Pour les opérations d'écriture, configurez une capacité suffisante pour prendre en charge le nombre d'écritures requises par seconde. Vous pouvez augmenter le débit configuré à l'aide du portail ou d'Azure CLI avant d'effectuer les opérations d'écriture, puis réduire le débit une fois les écritures terminées. Le débit relatif à la période d'écriture correspond au débit minimal nécessaire pour les données spécifiées, auquel s'ajoute le débit nécessaire pour l'opération d'insertion, en supposant qu'aucune autre charge de travail n'est en cours d'exécution.

Exemple d’analyse des coûts

Supposons que vous configuriez une valeur de débit de 1000 RU/s sur un conteneur. Celui-ci est déployé 24 heures sur 24 pendant 30 jours, soit un total de 720 heures.

Le conteneur est facturé à 10 unités de 100 RU/s par heure pour chaque heure. 10 unités à 0,008 USD (100 RU/s par heure) sont facturées à 0,08 USD par heure.

Pour 720 heures ou 7200 unités (de 100 RU), vous êtes facturé 57,60 USD pour le mois.

Le stockage est également facturé pour chaque Go utilisé pour vos données et index stockés. Pour plus d’informations, consultez le modèle de tarification Azure Cosmos DB.

Utilisez la calculatrice de capacité Azure Cosmos DB pour obtenir une estimation rapide du coût de la charge de travail.

Pour plus d’informations, consultez la section sur les coûts dans Microsoft Azure Well-Architected Framework.

Déployer ce scénario

Pour déployer et exécuter l’implémentation de référence, suivez les étapes du fichier Readme de GitHub.

Étapes suivantes