Sortie Azure Stream Analytics dans Azure Cosmos DBAzure Stream Analytics output to Azure Cosmos DB

Azure Stream Analytics peut cibler Azure Cosmos DB pour la sortie JSON, ce qui permet d’archiver des données et d’exécuter des requêtes à faible latence sur des données JSON non structurées.Azure Stream Analytics can target Azure Cosmos DB for JSON output, enabling data archiving and low-latency queries on unstructured JSON data. Ce document traite certaines meilleures pratiques recommandées pour l’implémentation de cette configuration.This document covers some best practices for implementing this configuration. Nous vous recommandons de définir votre travail sur le niveau de compatibilité 1.2 lorsque vous utilisez Azure Cosmos DB comme sortie.We recommend that you set your job to compatability level 1.2 when using Azure Cosmos DB as output.

Si vous n’êtes pas familiarisé avec Azure Cosmos DB, consultez la documentation Azure Cosmos DB pour commencer.If you're unfamiliar with Azure Cosmos DB, see the Azure Cosmos DB documentation to get started.

Notes

Pour le moment, Stream Analytics prend en charge la connexion à Azure Cosmos DB uniquement à l’aide de l’API SQL.At this time, Stream Analytics supports connection to Azure Cosmos DB only through the SQL API. Les autres API Azure Cosmos DB ne sont pas encore prises en charge.Other Azure Cosmos DB APIs are not yet supported. Si vous pointez Stream Analytics vers les comptes Azure Cosmos DB créés avec d’autres API, les données risquent de ne pas être correctement stockées.If you point Stream Analytics to Azure Cosmos DB accounts created with other APIs, the data might not be properly stored.

Principes de base d’Azure Cosmos DB en tant que cible de sortieBasics of Azure Cosmos DB as an output target

La sortie Azure Cosmos DB dans Stream Analytics vous permet d’écrire les résultats du traitement de votre flux en tant que sortie JSON dans vos conteneurs Azure Cosmos DB.The Azure Cosmos DB output in Stream Analytics enables writing your stream processing results as JSON output into your Azure Cosmos DB containers.

Stream Analytics ne crée pas de conteneurs dans votre base de données.Stream Analytics doesn't create containers in your database. Au lieu de cela, vous devez les créer à l’avance.Instead, it requires you to create them up front. Vous pouvez ensuite contrôler les coûts de facturation des conteneurs Azure Cosmos DB.You can then control the billing costs of Azure Cosmos DB containers. Vous pouvez également régler les performances, la cohérence et la capacité de vos conteneurs directement à l’aide des API Azure Cosmos DB.You can also tune the performance, consistency, and capacity of your containers directly by using the Azure Cosmos DB APIs.

Notes

Vous devez ajouter 0.0.0.0 à la liste des adresses IP autorisées de votre pare-feu Azure Cosmos DB.You must add 0.0.0.0 to the list of allowed IPs from your Azure Cosmos DB firewall.

Les sections suivantes détaillent certaines des options de conteneur pour Azure Cosmos DB.The following sections detail some of the container options for Azure Cosmos DB.

Réglage de la cohérence, de la disponibilité et de la latenceTuning consistency, availability, and latency

Pour respecter les exigences de votre application, Azure Cosmos DB vous permet d’ajuster la base de données et les conteneurs, et de faire les meilleurs compromis entre cohérence, disponibilité, latence et débit.To match your application requirements, Azure Cosmos DB allows you to fine-tune the database and containers and make trade-offs between consistency, availability, latency, and throughput.

Selon les niveaux de cohérence de lecture qui s’appliquent à votre scénario compte tenu de la latence de lecture et d’écriture, vous pouvez choisir un niveau de cohérence sur votre compte de base de données.Depending on what levels of read consistency your scenario needs against read and write latency, you can choose a consistency level on your database account. Vous pouvez améliorer le débit en augmentant les unités de requête (RU) sur le conteneur.You can improve throughput by scaling up Request Units (RUs) on the container.

Par défaut, Azure Cosmos DB permet également une indexation synchrone sur chaque opération CRUD exécutée sur votre conteneur.Also by default, Azure Cosmos DB enables synchronous indexing on each CRUD operation to your container. Cette option est également utile pour contrôler les performances de lecture/écriture dans Azure Cosmos DB.This is another useful option to control write/read performance in Azure Cosmos DB.

Pour plus d’informations, consultez l’article Modifier le niveau de cohérence des bases de données et des requêtes.For more information, review the Change your database and query consistency levels article.

Upserts à partir de Stream AnalyticsUpserts from Stream Analytics

L’intégration de Stream Analytics à Azure Cosmos DB vous permet d’insérer ou mettre à jour des enregistrements dans votre conteneur à partir d’une colonne d’identification de document donnée.Stream Analytics integration with Azure Cosmos DB allows you to insert or update records in your container based on a given Document ID column. Cette fonction est également appelée un upsert.This is also called an upsert.

Stream Analytics utilise une approche upsert optimiste.Stream Analytics uses an optimistic upsert approach. Les mises à jour se produisent uniquement en cas d’échec d’une insertion avec un conflit d’ID de document.Updates happen only when an insert fails with a document ID conflict.

Avec le niveau de compatibilité 1.0, Stream Analytics effectue cette mise à jour en tant qu’opération PATCH, ce qui permet des mises à jour partielles du document.With compatibility level 1.0, Stream Analytics performs this update as a PATCH operation, so it enables partial updates to the document. Stream Analytics ajoute de nouvelles propriétés ou remplace une propriété existante de manière incrémentielle.Stream Analytics adds new properties or replaces an existing property incrementally. Toutefois, les modifications apportées aux valeurs des propriétés du tableau dans votre document JSON ont pour effet d’écraser l’intégralité du tableau.However, changes in the values of array properties in your JSON document result in overwriting the entire array. Autrement dit, le tableau n’est pas fusionné.That is, the array isn't merged.

Avec le niveau 1.2, le comportement de l'opération upsert est modifié pour insérer ou remplacer le document.With 1.2, upsert behavior is modified to insert or replace the document. La section ultérieure sur le niveau de compatibilité 1.2 décrit plus en détail ce comportement.The later section about compatibility level 1.2 further describes this behavior.

Si le document JSON entrant contient un champ ID existant, ce champ est automatiquement utilisé comme colonne ID de document dans Azure Cosmos DB.If the incoming JSON document has an existing ID field, that field is automatically used as the Document ID column in Azure Cosmos DB. Toutes les écritures suivantes sont traitées comme telles, ce qui conduit à l’une des situations suivantes :Any subsequent writes are handled as such, leading to one of these situations:

  • Des ID uniques aboutissent à une insertion.Unique IDs lead to insert.
  • Des ID dupliqués et ID du document défini sur ID aboutissent à une opération upsert.Duplicate IDs and Document ID set to ID lead to upsert.
  • Des ID dupliqués et ID du document non défini aboutissent à une erreur après le premier document.Duplicate IDs and Document ID not set lead to error, after the first document.

Si vous souhaitez enregistrer tous les documents, notamment ceux dont l’ID est dupliqué, renommez le champ ID dans votre requête (à l’aide du mot clé AS).If you want to save all documents, including the ones that have a duplicate ID, rename the ID field in your query (by using the AS keyword). Laissez Azure Cosmos DB créer le champ ID ou remplacer l’ID par la valeur d’une autre colonne (en utilisant le mot clé AS ou en utilisant le paramètre ID de document).Let Azure Cosmos DB create the ID field or replace the ID with another column's value (by using the AS keyword or by using the Document ID setting).

Partitionnement des données dans Azure Cosmos DBData partitioning in Azure Cosmos DB

Azure Cosmos DB ajuste automatiquement les partitions en fonction de votre charge de travail.Azure Cosmos DB automatically scales partitions based on your workload. Nous vous recommandons donc d’utiliser des conteneurs illimités comme approche pour partitionner vos données.So we recommend unlimited containers as the approach for partitioning your data. Pendant une opération d’écriture dans des conteneurs illimités, Stream Analytics utilise autant d’enregistreurs parallèles que l’étape de requête précédente ou le schéma de partitionnement d’entrée.When Stream Analytics writes to unlimited containers, it uses as many parallel writers as the previous query step or input partitioning scheme.

Notes

Azure Stream Analytics prend uniquement en charge un nombre illimité de conteneurs avec des clés de partition au niveau supérieur.Azure Stream Analytics supports only unlimited containers with partition keys at the top level. Par exemple, la clé de partition /region est prise en charge.For example, /region is supported. Les clés de partition imbriquées (par exemple, /region/name) ne sont pas prises en charge.Nested partition keys (for example, /region/name) are not supported.

Selon votre choix de clé de partition, vous pourrez recevoir cet avertissement :Depending on your choice of partition key, you might receive this warning:

CosmosDB Output contains multiple rows and just one row per partition key. If the output latency is higher than expected, consider choosing a partition key that contains at least several hundred records per partition key.

Il est important de choisir une propriété de clé de partition présentant un nombre de valeurs distinctes, et vous permettant de distribuer votre charge de travail uniformément entre ces valeurs.It's important to choose a partition key property that has a number of distinct values, and that lets you distribute your workload evenly across these values. Conséquence inhérente du partitionnement, les demandes impliquant la même clé de partition sont limitées par le débit maximal d’une seule partition.As a natural artifact of partitioning, requests that involve the same partition key are limited by the maximum throughput of a single partition.

La taille de stockage des documents appartenant à la même valeur de clé de partition est limitée à 20 Go (la limite de taille de partition physique est de 50 Go).The storage size for documents that belong to the same partition key value is limited to 20 GB (the physical partition size limit is 50 GB). Une clé de partition idéale apparaît fréquemment sous forme de filtre dans vos requêtes efficaces et possède une cardinalité suffisante pour garantir la scalabilité de votre solution.An ideal partition key is one that appears frequently as a filter in your queries and has sufficient cardinality to ensure that your solution is scalable.

Les clés de partition utilisées pour les requêtes Stream Analytics et Cosmos DB n’ont pas besoin d’être identiques.Partition keys used for Stream Analytics queries and Cosmos DB don't need to be identical. Les topologies entièrement parallèles recommandent l’utilisation d’une clé de partition d’entrée, PartitionId, comme clé de partition de la requête Stream Analytics, mais ce n’est peut-être pas le choix recommandé pour la clé de partition d’un conteneur Cosmos DB.Fully parallel topologies recommend using Input Partition key, PartitionId, as the Stream Analytics query's partition key but that may not be the recommended choice for a Cosmos DB container's partition key.

Une clé de partition sert également de limite pour les transactions dans les procédures stockées et les déclencheurs d’Azure Cosmos DB.A partition key is also the boundary for transactions in stored procedures and triggers for Azure Cosmos DB. Vous devez choisir la clé de partition de sorte que les documents impliqués dans les mêmes transactions partagent la même valeur de clé de partition.You should choose the partition key so that documents that occur together in transactions share the same partition key value. L’article Partitionnement des données dans Azure Cosmos DB fournit des détails supplémentaires sur le choix d’une clé de partition.The article Partitioning in Azure Cosmos DB gives more details on choosing a partition key.

Pour les conteneurs Azure Cosmos DB fixes arrivés à saturation, Stream Analytics n’offre pas de possibilité de Scale-up ou de Scale-out.For fixed Azure Cosmos DB containers, Stream Analytics allows no way to scale up or out after they're full. Ils ont une limite maximale de 10 Go et un débit de 10 000 RU/s.They have an upper limit of 10 GB and 10,000 RU/s of throughput. Pour migrer les données d’un conteneur fixe vers un conteneur illimité (par exemple avec au moins 1 000 RU/s et une clé de partition), utilisez l’outil de migration de données ou la bibliothèque de flux de modification.To migrate the data from a fixed container to an unlimited container (for example, one with at least 1,000 RU/s and a partition key), use the data migration tool or the change feed library.

La fonctionnalité permettant d’écrire dans plusieurs conteneurs fixes est dépréciée.The ability to write to multiple fixed containers is being deprecated. Nous ne la recommandons pas pour le Scale-out de votre travail Stream Analytics.We don't recommend it for scaling out your Stream Analytics job.

Débit amélioré avec le niveau de compatibilité 1.2Improved throughput with compatibility level 1.2

Avec le niveau de compatibilité 1.2, Stream Analytics prend en charge l’intégration native pour écrire en bloc dans Azure Cosmos DB.With compatibility level 1.2, Stream Analytics supports native integration to bulk write into Azure Cosmos DB. Cette intégration permet d’écrire efficacement dans Azure Cosmos DB tout en optimisant le débit et en gérant efficacement les requêtes de limitation.This integration enables writing effectively to Azure Cosmos DB while maximizing throughput and efficiently handling throttling requests.

Le mécanisme d’écriture amélioré est disponible sous un nouveau niveau de compatibilité en raison d’une différence de comportement de l’opération upsert.The improved writing mechanism is available under a new compatibility level because of a difference in upsert behavior. Avec les niveaux antérieurs au niveau 1.2, le comportement de l’opération upsert consiste à insérer ou fusionner le document.With levels before 1.2, the upsert behavior is to insert or merge the document. Avec le niveau 1.2, le comportement de l'opération upsert est modifié pour insérer ou remplacer le document.With 1.2, upsert behavior is modified to insert or replace the document.

Avec les niveaux antérieurs au niveau 1.2, Stream Analytics utilise une procédure stockée personnalisée pour exécuter en bloc l’opération upsert sur les documents par clé de partition dans Azure Cosmos DB.With levels before 1.2, Stream Analytics uses a custom stored procedure to bulk upsert documents per partition key into Azure Cosmos DB. À partir de là, un lot est écrit sous la forme d’une transaction.There, a batch is written as a transaction. Si un seul enregistrement se heurte à une erreur temporaire (limitation), le lot dans son intégralité doit faire l’objet d’une nouvelle tentative.Even when a single record hits a transient error (throttling), the whole batch has to be retried. Cela ralentit considérablement même les scénarios présentant des limitations raisonnables.This makes scenarios with even reasonable throttling relatively slow.

L’exemple suivant montre deux travaux Stream Analytics identiques lisant à partir de la même entrée Azure Event Hubs.The following example shows two identical Stream Analytics jobs reading from the same Azure Event Hubs input. Les deux travaux Stream Analytics sont entièrement partitionnés avec une requête passthrough et écrivent dans des conteneurs Azure Cosmos DB identiques.Both Stream Analytics jobs are fully partitioned with a passthrough query and write to identical Azure Cosmos DB containers. Les métriques de gauche proviennent du travail configuré avec le niveau de compatibilité 1.0.Metrics on the left are from the job configured with compatibility level 1.0. Les métriques de droite sont configurées avec le niveau 1.2.Metrics on the right are configured with 1.2. La clé de partition d’un conteneur Azure Cosmos DB est un GUID unique issu de l’événement d’entrée.An Azure Cosmos DB container's partition key is a unique GUID that comes from the input event.

Comparaison des métriques Stream Analytics

Le taux d’événements entrants dans Event Hubs est deux fois supérieur à ce que les conteneurs Azure Cosmos DB (20 000 RU) sont configurés pour recevoir, et dès lors une limitation est à prévoir dans Azure Cosmos DB.The incoming event rate in Event Hubs is two times higher than Azure Cosmos DB containers (20,000 RUs) are configured to take in, so throttling is expected in Azure Cosmos DB. Cela étant, le travail configuré avec le niveau 1.2 écrit de manière cohérente à un débit plus élevé (événements de sortie par minute) et avec une utilisation SU% moyenne moins élevée.However, the job with 1.2 is consistently writing at a higher throughput (output events per minute) and with a lower average SU% utilization. Dans votre environnement, cette différence dépend de quelques facteurs supplémentaires.In your environment, this difference will depend on few more factors. Ces facteurs incluent le choix du format d’événement, la taille de l’événement/du message d’entrée, les clés de partition et la requête.These factors include choice of event format, input event/message size, partition keys, and query.

Comparaison des métriques Azure Cosmos DB

Avec le niveau de compatibilité 1.2, Stream Analytics utilise de manière plus intelligente 100 % du débit disponible dans Azure Cosmos DB, avec très peu de nouvelles soumissions dues aux limitations.With 1.2, Stream Analytics is more intelligent in utilizing 100 percent of the available throughput in Azure Cosmos DB with very few resubmissions from throttling or rate limiting. Cela permet une meilleure expérience pour les autres charges de travail telles que les requêtes en cours d’exécution sur le conteneur en même temps.This provides a better experience for other workloads like queries running on the container at the same time. Si vous souhaitez voir le Scale-out de Stream Analytics avec Azure Cosmos DB en tant que récepteur pour 1 000 à 10 000 messages par seconde, essayez cet exemple de projet Azure.If you want to see how Stream Analytics scales out with Azure Cosmos DB as a sink for 1,000 to 10,000 messages per second, try this Azure sample project.

Le débit de sortie Azure Cosmos DB est identique avec les niveaux 1.0 et 1.1.Throughput of Azure Cosmos DB output is identical with 1.0 and 1.1. Nous vous recommandons fortement d’utiliser le niveau de compatibilité 1.2 dans Stream Analytics avec Azure Cosmos DB.We strongly recommend that you use compatibility level 1.2 in Stream Analytics with Azure Cosmos DB.

Paramètres Azure Cosmos DB pour la sortie JSONAzure Cosmos DB settings for JSON output

Lorsque vous utilisez Azure Cosmos DB en tant que sortie dans Stream Analytics, vous devez fournir des informations supplémentaires, comme indiqué ci-dessous.Using Azure Cosmos DB as an output in Stream Analytics generates the following prompt for information.

Champs d’informations pour un flux de sortie Azure Cosmos DB

ChampField DescriptionDescription
Alias de sortieOutput alias Alias référençant cette sortie dans votre requête Stream Analytics.An alias to refer to this output in your Stream Analytics query.
AbonnementSubscription Abonnement Azure.The Azure subscription.
ID de compteAccount ID Nom ou URI de point de terminaison du compte Azure Cosmos DB.The name or endpoint URI of the Azure Cosmos DB account.
Clé de compteAccount key Clé d’accès partagé du compte Azure Cosmos DB.The shared access key for the Azure Cosmos DB account.
Base de donnéesDatabase Nom de la base de données Azure Cosmos DB.The Azure Cosmos DB database name.
Nom du conteneurContainer name Nom du conteneur, comme MyContainer.The container name, such as MyContainer. Un conteneur nommé MyContainer doit exister.One container named MyContainer must exist.
ID du documentDocument ID facultatif.Optional. Nom de colonne dans les événements de sortie utilisé comme clé unique sur laquelle doivent être basées les opérations d’insertion ou de mise à jour.The column name in output events used as the unique key on which insert or update operations must be based. Si vous laissez ce champ vide, tous les événements sont insérés, sans option de mise à jour.If you leave it empty, all events will be inserted, with no update option.

Une fois la sortie Azure Cosmos DB configurée, elle peut être utilisée dans la requête en tant que cible d’une instruction INTO.After you configure the Azure Cosmos DB output, you can use it in the query as the target of an INTO statement. Lors de l’utilisation d’une sortie Azure Cosmos DB de cette manière, une clé de partition doit être définie explicitement.When you're using an Azure Cosmos DB output that way, a partition key needs to be set explicitly.

L’enregistrement de sortie doit contenir une colonne qui respecte la casse nommée d’après la clé de partition dans Azure Cosmos DB.The output record must contain a case-sensitive column named after the partition key in Azure Cosmos DB. Pour obtenir une plus grande parallélisation, l’instruction peut exiger une clause PARTITION BY utilisant la même colonne.To achieve greater parallelization, the statement might require a PARTITION BY clause that uses the same column.

Voici un exemple de requête :Here's a sample query:

    SELECT TollBoothId, PartitionId
    INTO CosmosDBOutput
    FROM Input1 PARTITION BY PartitionId

Gestion des erreurs et nouvelles tentativesError handling and retries

En cas de panne transitoire, d’indisponibilité du service ou de limitation de bande passante lors de l’envoi d’événements à Azure Cosmos DB, Stream Analytics effectue de nouvelles tentatives indéfiniment pour mener à bien l’opération.If a transient failure, service unavailability, or throttling happens while Stream Analytics is sending events to Azure Cosmos DB, Stream Analytics retries indefinitely to finish the operation successfully. Mais il n’effectue pas de nouvelles tentatives pour les échecs suivants :But it doesn't attempt retries for the following failures:

  • Unauthorized (code d’erreur HTTP 401)Unauthorized (HTTP error code 401)
  • NotFound (code d’erreur HTTP 404)NotFound (HTTP error code 404)
  • Forbidden (code d’erreur HTTP 403)Forbidden (HTTP error code 403)
  • BadRequest (code d’erreur HTTP 400)BadRequest (HTTP error code 400)

Problèmes courantsCommon issues

  1. Une contrainte d’index unique est ajoutée à la collection et les données de sortie de Stream Analytics enfreignent cette contrainte.A unique index constraint is added to the collection and the output data from Stream Analytics violates this constraint. Assurez-vous que les données de sortie de Stream Analytics ne violent pas de contraintes uniques ou supprimez les contraintes.Ensure the output data from Stream Analytics doesn't violate unique constraints or remove constraints. Pour plus d’informations, consultez Contraintes de clé unique dans Azure Cosmos DB.For more information, see Unique key constraints in Azure Cosmos DB.

  2. La colonne PartitionKey n’existe pas.The PartitionKey column does not exists.

  3. La colonne Id n’existe pas.The Id column does not exist.

Étapes suivantesNext steps