Sortie des files d’attente Service Bus à partir d’Azure Stream Analytics

Les files d’attente Service Bus permettent de livrer des messages FIFO à un ou plusieurs destinataires concurrents. En règle générale, les messages sont reçus et traités par les destinataires dans l’ordre dans lequel ils ont été ajoutés à la file d’attente. Chaque message est reçu et traité par un seul destinataire du message.

Dans le niveau de compatibilité 1.2, Azure Stream Analytics utilise le protocole de messagerie AMQP (Advanced Message Queueing Protocol) pour écrire dans les files d’attente et les rubriques Service Bus. Le protocole AMQP vous permet de développer des applications hybrides interplateforme à l’aide d’un protocole open standard.

Configuration de la sortie

Le tableau suivant répertorie les noms de propriétés et leur description pour la création d’une sortie de file d’attente.

Nom de la propriété Description
Alias de sortie Nom convivial utilisé dans les requêtes pour diriger la sortie de requête vers cette file d’attente Service Bus.
Espace de noms Service Bus Conteneur pour un ensemble d’entités de messagerie.
Nom de la file d'attente Nom de la file d’attente Service Bus.
Nom de la stratégie de file d’attente Lorsque vous créez une file d’attente, vous pouvez également créer des stratégies d’accès partagé dans l’onglet Configurer de la file d’attente. Chaque stratégie d’accès partagé a un nom, les autorisations que vous définissez ainsi que des clés d’accès.
Clé de la stratégie de file d’attente Clé d’accès partagé qui permet d’authentifier l’accès à l’espace de noms Service Bus.
Format de sérialisation de l’événement Format de sérialisation pour les données de sortie. JSON, CSV et Avro sont pris en charge.
Encodage Pour CSV et JSON, UTF-8 est le seul format d’encodage actuellement pris en charge.
Délimiteur Applicable uniquement pour la sérialisation CSV. Stream Analytics prend en charge un certain nombre de délimiteurs communs pour sérialiser des données dans un format CSV. Valeurs prises en charge : virgule, point-virgule, espace, tabulation et barre verticale.
Format Applicable uniquement pour le type JSON. L’expression Séparé par une ligne indique que la sortie est mise en forme de sorte que tous les objets JSON soient séparés par une nouvelle ligne. Si vous sélectionnez Séparé par une ligne, le JSON est lu un objet à la fois. Le contenu entier seul ne serait pas un JSON valide. Le terme Tableau indique que la sortie est mise en forme en tant que tableau d’objets JSON.
Colonnes de propriété facultatif. Colonnes séparées par des virgules qui doivent être jointes en tant que propriétés de l’utilisateur du message sortant au lieu de la charge utile. Vous trouverez plus d’informations sur cette fonctionnalité dans la section Propriétés de métadonnées personnalisées pour la sortie.
Colonnes de propriétés système facultatif. Paires clé/valeur de propriétés système et noms de colonnes correspondants qui doivent être attachés au message sortant au lieu de la charge utile.

Le nombre de partitions est basé sur la référence Service Bus et sa taille. La clé de partition est une valeur entière unique pour chaque partition.

Partitionnement

Le partitionnement est choisi automatiquement. Le nombre de partitions est basé sur la référence Service Bus et sa taille. La clé de partition est une valeur entière unique pour chaque partition. Le nombre de writers de sortie est le même que le nombre de partitions dans la file d’attente de sortie.

Taille de lot de sortie

La taille maximale du message est de 256 Ko par message pour le niveau Standard et 1 Mo pour le niveau Premium. Pour plus d’informations, consultez Limites de Service Bus. Pour optimiser, utilisez un seul événement par message.

Propriétés de métadonnées personnalisées pour la sortie

Vous pouvez joindre des colonnes de requête en tant que propriétés de l’utilisateur aux messages sortants. Ces colonnes ne font pas partie de la charge utile. Les propriétés sont présentes sous la forme d’un dictionnaire sur le message de sortie. Clé est le nom de colonne et valeur est la valeur de la colonne dans le dictionnaire de propriétés. Tous les types de données Stream Analytics sont pris en charge à l’exception des enregistrements et des tableaux.

Dans l’exemple suivant, les champs DeviceId et DeviceStatus sont ajoutés aux métadonnées.

  1. Utilisez la requête suivante :

    select *, DeviceId, DeviceStatus from iotHubInput
    
  2. Configurez DeviceId,DeviceStatus comme colonnes de propriété dans la sortie.

    Property columns

L’illustration suivante présente les propriétés de message de sortie attendues qui sont inspectées dans un Event Hub à l’aide de Service Bus Explorer.

Event custom properties

Propriétés système

Vous pouvez attacher des colonnes de requête en tant que propriétés système à vos messages sortants de file d’attente ou de rubrique Service Bus.

Ces colonnes ne sont pas placées dans la charge utile ; au lieu de cela, la propriété système ServiceBusMessage correspondante est renseignée avec les valeurs de la colonne de requête. Les propriétés système suivantes sont prises en charge : MessageId, ContentType, Label, PartitionKey, ReplyTo, SessionId, CorrelationId, To, ForcePersistence, TimeToLive, ScheduledEnqueueTimeUtc.

Les valeurs de chaîne de ces colonnes sont analysées en tant que type de valeur de propriété système correspondante et les échecs d’analyse sont traités comme des erreurs de données. Ce champ est fourni sous la forme d’un format d’objet JSON. Les détails relatifs à ce format sont les suivants :

  • Entouré par des accolades {}.
  • Écrit dans des paires clé-valeur.
  • Les clés et les valeurs doivent être des chaînes.
  • La clé est le nom de la propriété système et la valeur est le nom de la colonne de requête.
  • Les clés et les valeurs sont séparées par un signe deux-points.
  • Chaque paire clé-valeur est séparée par une virgule.

Voici comment utiliser cette propriété :

  • Requête : select *, column1, column2 INTO queueOutput FROM iotHubInput
  • Colonnes de propriétés système : { "MessageId": "column1", "PartitionKey": "column2"}

Cela définit le MessageId sur les messages de la file d’attente Service bus avec les valeurs de column1, et PartitionKey est défini avec les valeurs de column2.

Étapes suivantes