Share via


Envoyer et recevoir des messages entre Azure IoT MQ (préversion) et Azure Event Hubs ou Kafka

Important

Opérations Azure IoT (préversion) – activé parc Azure Arc est actuellement en PRÉVERSION. Vous ne devez pas utiliser ce logiciel en préversion dans des environnements de production.

Pour connaître les conditions juridiques qui s’appliquent aux fonctionnalités Azure en version bêta, en préversion ou plus généralement non encore en disponibilité générale, consultez l’Avenant aux conditions d’utilisation des préversions de Microsoft Azure.

Le connecteur Kafka envoie (push) des messages du MQTT broker Azure IoT MQ (préversion) vers un point de terminaison Kafka et extrait de façon similaire les messages dans le sens inverse. Étant donné que Azure Event Hubs prend en charge lesAPI Kafka, le connecteur fonctionne directement avec Event Hubs.

Configurez le connecteur Event Hubs via le point de terminaison Kafka

Par défaut, le connecteur n’est pas installé avec Azure IoT MQ. Il doit être formellement activé avec le mappage de rubriques et les identifiants d’authentification spécifiés. Suivez ces étapes pour activer la communication bidirectionnelle entre IoT MQ et Azure Event Hubs via son point de terminaison Kafka.

  1. Créez un espace de noms Event Hubs.

  2. Créez un hub d’événements pour chaque rubrique Kafka.

Accordez au connecteur l’accès à l’espace de noms Event Hubs

Accordez l’accès à l’extension IoT MQ Arc à un espace de noms Event Hubs est le moyen le plus pratique d’établir une connexion sécurisée entre le connecteur Kakfa d’IoT MQ et Event Hubs.

Enregistrez le modèle Bicep suivant dans un fichier et appliquez-le avec Azure CLI après avoir défini les paramètres valides pour votre environnement :

Remarque

Le modèle Bicep suppose que le cluster Arc connecté et l’espace de noms Event Hubs se trouvent dans le même groupe de ressources, ajustez le modèle si votre environnement est différent.

@description('Location for cloud resources')
param mqExtensionName string = 'mq'
param clusterName string = 'clusterName'
param eventHubNamespaceName string = 'default'

resource connectedCluster 'Microsoft.Kubernetes/connectedClusters@2021-10-01' existing = {
  name: clusterName
}

resource mqExtension 'Microsoft.KubernetesConfiguration/extensions@2022-11-01' existing = {
  name: mqExtensionName
  scope: connectedCluster
}

resource ehNamespace 'Microsoft.EventHub/namespaces@2021-11-01' existing = {
  name: eventHubNamespaceName
}

// Role assignment for Event Hubs Data Receiver role
resource roleAssignmentDataReceiver 'Microsoft.Authorization/roleAssignments@2022-04-01' = {
  name: guid(ehNamespace.id, mqExtension.id, '7f951dda-4ed3-4680-a7ca-43fe172d538d')
  scope: ehNamespace
  properties: {
     // ID for Event Hubs Data Receiver role is a638d3c7-ab3a-418d-83e6-5f17a39d4fde
    roleDefinitionId: resourceId('Microsoft.Authorization/roleDefinitions', 'a638d3c7-ab3a-418d-83e6-5f17a39d4fde') 
    principalId: mqExtension.identity.principalId
    principalType: 'ServicePrincipal'
  }
}

// Role assignment for Event Hubs Data Sender role
resource roleAssignmentDataSender 'Microsoft.Authorization/roleAssignments@2022-04-01' = {
  name: guid(ehNamespace.id, mqExtension.id, '69b88ce2-a752-421f-bd8b-e230189e1d63')
  scope: ehNamespace
  properties: {
    // ID for Event Hubs Data Sender role is 2b629674-e913-4c01-ae53-ef4638d8f975
    roleDefinitionId: resourceId('Microsoft.Authorization/roleDefinitions', '2b629674-e913-4c01-ae53-ef4638d8f975') 
    principalId: mqExtension.identity.principalId
    principalType: 'ServicePrincipal'
  }
}
# Set the required environment variables

# Resource group for resources
RESOURCE_GROUP=xxx

# Bicep template files name
TEMPLATE_FILE_NAME=xxx

# MQ Arc extension name
MQ_EXTENSION_NAME=xxx

# Arc connected cluster name
CLUSTER_NAME=xxx

# Event Hubs namespace name
EVENTHUB_NAMESPACE=xxx


az deployment group create \
      --name assign-RBAC-roles \
      --resource-group $RESOURCE_GROUP \
      --template-file $TEMPLATE_FILE_NAME \
      --parameters mqExtensionName=$MQ_EXTENSION_NAME \
      --parameters clusterName=$CLUSTER_NAME \
      --parameters eventHubNamespaceName=$EVENTHUB_NAMESPACE

KafkaConnector

La ressource personnaliséeKafkaConnector (CR) vous permet de configurer un connecteur Kafka capable de communiquer avec un hôte Kafka et Event Hubs. Le connecteur Kafka peut transférer des données entre les rubriques MQTT et les rubriques Kafka, à l’aide d’Event Hubs en tant que point de terminaison compatible avec Kafka.

L’exemple suivant montre une CR de KafkaConnector qui se connecte à un point de terminaison Event Hubs à l’aide de l’identité Azure IoT MQ. Cela suppose que d’autres ressources MQ ont été installées à l’aide du guide de démarrage rapide :

apiVersion: mq.iotoperations.azure.com/v1beta1
kind: KafkaConnector
metadata:
  name: my-eh-connector
  namespace: azure-iot-operations # same as one used for other MQ resources
spec:
  image:
    pullPolicy: IfNotPresent
    repository: mcr.microsoft.com/azureiotoperations/kafka
    tag: 0.4.0-preview
  instances: 2
  clientIdPrefix: my-prefix
  kafkaConnection:
    # Port 9093 is Event Hubs' Kakfa endpoint
    # Plug in your Event Hubs namespace name
    endpoint: <NAMESPACE>.servicebus.windows.net:9093
    tls:
      tlsEnabled: true
    authentication:
      enabled: true
      authType:
        systemAssignedManagedIdentity:
          # plugin in your Event Hubs namespace name
          audience: "https://<NAMESPACE>.servicebus.windows.net" 
  localBrokerConnection:
    endpoint: "aio-mq-dmqtt-frontend:8883"
    tls:
      tlsEnabled: true
      trustedCaCertificateConfigMap: "aio-ca-trust-bundle-test-only"
    authentication:
      kubernetes: {}

Le tableau suivant décrit les champs dans une CR de KafkaConnector :

Champ Description Obligatoire
image Image du connecteur Kafka. Vous pouvez spécifier les éléments pullPolicy, repository et tag de l’image. L’exemple précédent répertorie les valeurs par défaut. Oui
instances Nombre d’instances du connecteur Kafka à exécuter. Oui
clientIdPrefix Chaîne à ajouter à un ID client utilisé par le connecteur. Non
kafkaConnection Détails de connexion avec le point de terminaison Event Hubs. Consultez Connexion Kafka. Oui
localBrokerConnection Détails de connexion du répartiteur local qui remplace la connexion de répartiteur par défaut. Consultez Gérer les connexions du répartiteur local. Non
logLevel Niveau de journalisation du connecteur Kafka. Les valeurs possibles sont les suivantes : trace, debug, info, warn (avertissement), error (erreur)ou fatal (critique). La valeur par défaut est warn. Non

Connexion Kafka

Le champ kafkaConnection définit les détails de connexion du point de terminaison Kafka.

Champ Description Obligatoire
endpoint Hôte et port du point de terminaison Event Hubs. Le port est généralement 9093. Vous pouvez spécifier plusieurs points de terminaison séparés par des virgules pour utiliser la syntaxe des serveurs de démarrage. Oui
tls Configuration du chiffrement TLS. Consultez TLS . Oui
authentification Configuration d’authentification. Voir Authentification. Non

TLS

Le champ tls active le chiffrement TLS pour la connexion et spécifie éventuellement une carte de configuration liée à une autorité de certification.

Champ Description Obligatoire
tlsEnabled Obtient une valeur booléenne qui indique si le chiffrement TLS est activé. Elle doit être paramétrée sur true pour la communication avec Event Hubs. Oui
trustedCaCertificateConfigMap Nom de la carte de configuration qui contient le certificat délivré par les autorités pour vérifier l’identité du serveur. Ce champ n’est pas obligatoire pour la communication avec Event Hubs, car Event Hubs utilise des autorités de certification connues approuvées par défaut. Toutefois, vous pouvez utiliser ce champ si vous souhaitez utiliser un certificat personnalisé. Non

Quand une autorité de certification (CA) de confiance est requise, créez une ConfigMap contenant la partie publique de l’autorité de certification et spécifiez le nom de la ConfigMap dans la trustedCaCertificateConfigMappropriété.

kubectl create configmap ca-pem --from-file path/to/ca.pem

Authentification

Le champ d’authentification prend en charge différents types de méthodes d’authentification, tels que SASL, X509 ou une identité managée.

Champ Description Obligatoire
activé Une valeur booléenne qui indique si l’authentification est activée. Oui
authType Un champ contenant le type d’authentification utilisé. ConsultezType d’authentification
Type d’authentification
Champ Description Obligatoire
sasl La configuration de l’authentification SASL. Spécifiez le saslType, qui peut être plain, scramSha256 ou scramSha512, et le token pour faire référence au secretName Kubernetes ou au secret keyVault Azure Key Vault contenant le mot de passe. Oui, si vous utilisez l’authentification SASL
systemAssignedManagedIdentity Configuration de l’authentification de l’identité managée. Spécifiez l’audience pour la requête de jeton, qui doit correspondre à l’espace de noms Event Hubs (https://<NAMESPACE>.servicebus.windows.net) , car le connecteur est un client Kafka. Une identité managée affectée au système est automatiquement affectée à une instance managée lors de sa création. Oui, si vous utilisez l’authentification de l’identité managée
x509 Configuration de l’authentification X509. Spécifiez le champ secretName ou keyVault. Le champ secretName est le nom du secret qui contient le certificat client et la clé client au format PEM, stocké en tant que secret TLS. Oui, si vous utilisez l’authentification X509

Pour savoir comment utiliser Azure Key Vault et le keyVault pour gérer les secrets pour Azure IoT MQ au lieu des secrets Kubernetes, consultez Gérer les secrets en utilisant Azure Key Vault ou des secrets Kubernetes.

S’authentifier auprès d’Event Hubs

Pour vous connecter à Event Hubs en utilisant une chaîne de connexion et un secret Kubernetes, utilisez le type SASL plain et $ConnectionString comme nom d’utilisateur, et la chaîne de connexion complète comme mot de passe. Créer d’abord le secret Kubernetes :

kubectl create secret generic cs-secret -n azure-iot-operations \
  --from-literal=username='$ConnectionString' \
  --from-literal=password='Endpoint=sb://<NAMESPACE>.servicebus.windows.net/;SharedAccessKeyName=<KEY_NAME>;SharedAccessKey=<KEY>'

Ensuite, référencez le secret dans la configuration :

authentication:
  enabled: true
  authType:
    sasl:
      saslType: plain
      token:
        secretName: cs-secret

Pour utiliser Azure Key Vault au lieu de secrets Kubernetes, créez un secret Azure Key Vault avec la chaîne de connexion Endpoint=sb://.., référencez-le avec vaultSecret et spécifiez le nom d’utilisateur sous la forme "$ConnectionString" dans la configuration.

authentication:
  enabled: true
  authType:
    sasl:
      saslType: plain
      token:
        keyVault:
          username: "$ConnectionString"
          vault:
            name: my-key-vault
            directoryId: <AKV directory ID>
            credentials:
              servicePrincipalLocalSecretName: aio-akv-sp
          vaultSecret:
            name: my-cs # Endpoint=sb://..
            # version: 939ecc2...

Pour utiliser l’identité managée, spécifiez-la comme méthode unique d’authentification. Vous devez également attribuer un rôle à l’identité managée qui accorde l’autorisation d’envoyer et de recevoir des messages provenant d’Event Hubs, tels qu’Azure Event Hubs Data Owner ou Azure Event Hubs Data Sender/Receiver. Pour en savoir plus, consultez Authentifier une application avec Microsoft Entra ID pour accéder aux ressources Event Hubs.

authentication:
  enabled: true
  authType:
    systemAssignedManagedIdentity:
      audience: https://<NAMESPACE>.servicebus.windows.net
X.509

Pour X.509, utilisez le secret TLS Kubernetes contenant le certificat client et la clé privée.

kubectl create secret tls my-tls-secret -n azure-iot-operations \
  --cert=path/to/cert/file \
  --key=path/to/key/file

Spécifiez ensuite le secretName dans la configuration.

authentication:
  enabled: true
  authType:
    x509:
      secretName: my-tls-secret

Pour utiliser Azure Key Vault à la place, vérifiez que le certificat et la clé privée sont correctement importés, puis spécifiez la référence avec vaultCert.

authentication:
  enabled: true
  authType:
    x509:
      keyVault:
        vault:
          name: my-key-vault
          directoryId: <AKV directory ID>
          credentials:
            servicePrincipalLocalSecretName: aio-akv-sp
        vaultCert:
          name: my-cert
          # version: 939ecc2...
        ## If presenting full chain also  
        # vaultCaChainSecret:
        #   name: my-chain

Ou, si la présentation de la chaîne complète est requise, chargez le certificat et la clé de chaîne complète dans AKV en tant que fichier PFX, puis utilisez à la place le champ vaultCaChainSecret.

# ...
keyVault:
  vaultCaChainSecret:
    name: my-cert
    # version: 939ecc2...

Gérez la connexion de répartiteur local

Comme le pont MQTT, le connecteur Event Hubs agit en tant que client pour le répartiteur MQTT d’IoT MQ. Si vous avez personnalisé le port d’écoute ou l’authentification de votre répartiteur MQTT dans IoT MQ, remplacez également la configuration de connexion MQTT locale pour le connecteur Event Hubs. Pour plus d’informations, consultez connexion du pont MQTT au répartiteur local.

KafkaConnectorTopicMap

La ressource personnalisée KafkaConnectorTopicMap (CR) vous permet de définir le mappage entre les rubriques MQTT et les rubriques Kafka destinées au transfert de données bidirectionnel. Spécifiez une référence à une CR KafkaConnector et une liste de routes. Chaque route peut être soit MQTT vers une route Kafka ou une route Kafka vers MQTT. Par exemple :

apiVersion: mq.iotoperations.azure.com/v1beta1
kind: KafkaConnectorTopicMap
metadata:
  name: my-eh-topic-map
  namespace: <SAME NAMESPACE AS BROKER> # For example "default"
spec:
  kafkaConnectorRef: my-eh-connector
  compression: snappy
  batching:
    enabled: true
    latencyMs: 1000
    maxMessages: 100
    maxBytes: 1024
  partitionStrategy: property
  partitionKeyProperty: device-id
  copyMqttProperties: true
  routes:
    # Subscribe from MQTT topic "temperature-alerts/#" and send to Kafka topic "receiving-event-hub"
    - mqttToKafka:
        name: "route1"
        mqttTopic: temperature-alerts/#
        kafkaTopic: receiving-event-hub
        kafkaAcks: one
        qos: 1
        sharedSubscription:
          groupName: group1
          groupMinimumShareNumber: 3
    # Pull from kafka topic "sending-event-hub" and publish to MQTT topic "heater-commands"
    - kafkaToMqtt:
        name: "route2"
        consumerGroupId: mqConnector
        kafkaTopic: sending-event-hub
        mqttTopic: heater-commands
        qos: 0

Le tableau suivant décrit les champs dans une CR de KafkaConnectorTopicMap :

Champ Description Obligatoire
kafkaConnectorRef Nom de la CR KafkaConnector à laquelle appartient cette carte de rubriques (topic map). Oui
compression Configuration pour la compression des messages envoyés aux rubriques Kafka. Consultez Compression. Non
traitement par lots Configuration pour le traitement par lot des messages envoyés aux rubriques Kafka. Consultez traitement par lot. Non
partitionStrategy Stratégie de gestion des partitions Kafka lors de l’envoi de messages à des rubriques Kafka. Consultez Stratégie de gestion des partitions. Non
copyMqttProperties Valeur booléenne pour contrôler si les propriétés système et utilisateur MQTT sont copiées dans l’en-tête de message Kafka. Les propriétés utilisateur sont copiées en l’état. Une transformation est effectuée avec les propriétés système. La valeur par défaut est false. Non
itinéraires Liste des routes destinées au transfert de données entre les rubriques MQTT et les rubriques Kafka. Chaque route peut avoir un champ mqttToKafka ou un champ kafkaToMqtt, selon la direction du transfert de données. ConsultezItinéraires. Oui

Compression

Le champs compression permet la compression des messages envoyés aux rubriques Kafka. La compression permet de réduire la bande passante réseau et l’espace de stockage requis pour le transfert de données. Toutefois, la compression ajoute également une surcharge et une latence au processus. Les types de compression pris en charge sont répertoriés dans le tableau ci-après.

Valeur Description
aucun(e) Aucune compression ou traitement par lot n’est appliqué. Si aucune compression n’est spécifiée, aucun est la valeur par défaut.
gzip La compression et le traitement par lots GZIP sont appliqués. GZIP est un algorithme de compression à usage général qui offre un bon équilibre entre le rapport de compression et la vitesse.
snappy La compression Snappy et le traitement par lots sont appliqués. Snappy est un algorithme de compression rapide qui offre un ratio de compression et une vitesse modérés.
lz4 La compression LZ4 et le traitement par lots sont appliqués. LZ4 est un algorithme de compression rapide qui offre un faible ratio de compression et une vitesse élevée.

Traitement par lots

Outre la compression, vous pouvez également configurer le traitement par lots des messages avant de les envoyer à des rubriques Kafka. Le traitement par lots vous permet de regrouper plusieurs messages et de les compresser en tant qu’unité unique, ce qui peut améliorer l’efficacité de la compression et réduire la surcharge réseau.

Champ Description Obligatoire
activé Valeur booléenne qui indique si le traitement par lot est activé. Si elle n’est pas définie, la valeur par défaut est false. Oui
latencyMs Intervalle de temps maximal en millisecondes au cours duquel les messages peuvent être mis en mémoire tampon avant d’être envoyés. Si cet intervalle est atteint, tous les messages mis en mémoire tampon sont envoyés en tant que lot, quel que soit leur nombre ou leur taille. Si elle n’est pas définie, la valeur par défaut est 5. Non
maxMessages Nombre maximal de messages pouvant être mis en mémoire tampon avant d’être envoyés. Si cet intervalle est atteint, tous les messages mis en mémoire tampon sont envoyés en tant que lot, quel que soit leur taille ou leur durée de mise en mémoire tampon. Si elle n’est pas définie, la valeur par défaut est 100000. Non
maxBytes Nombre maximal d’octets pouvant être mis en mémoire tampon avant d’être envoyés. Si ce nombre est atteint, tous les messages mis en mémoire tampon sont envoyés en tant que lot, quel que soit leur taille ou leur durée de mise en mémoire tampon. La valeur par défaut est 1000000 Mo (1 To). Non

Voici un exemple d’utilisation du traitement par lot :

batching:
  enabled: true
  latencyMs: 1000
  maxMessages: 100
  maxBytes: 1024

Cela signifie que les messages sont envoyés soit lorsqu’il y a 100 messages dans la mémoire tampon, soit lorsqu’il y a 1024 octets dans la mémoire tampon, soit lorsque 1 000 millisecondes s’écoulent depuis le dernier envoi, selon le cas qui survient en premier.

Stratégie de gestion des partitions

La stratégie de gestion des partitions est une fonctionnalité qui vous permet de contrôler la façon dont les messages sont affectés aux partitions Kafka lors de leur envoi à des rubriques Kafka. Les partitions Kafka sont des segments logiques d’une rubrique Kafka qui activent le traitement parallèle et la tolérance de panne. Chaque message d’une rubrique Kafka dispose d’une partition et d’un décalage utilisés pour identifier et classer les messages.

Par défaut, le connecteur Kafka affecte des messages à des partitions aléatoires à l’aide d’un algorithme de tourniquet. Toutefois, vous pouvez utiliser différentes stratégies pour affecter des messages à des partitions en fonction de certains critères, tels que le nom de la rubrique MQTT ou une propriété de message MQTT. Cela peut vous aider à améliorer l’équilibrage de charge, la localisation des données ou l’ordre des messages.

Valeur Description
default Attribution des messages à des partitions aléatoires à l’aide d’un algorithme de tourniquet. Il s’agit de la valeur par défaut si aucune stratégie n’est spécifiée.
static Attribution des messages à un numéro de partition fixe dérivé de l’ID d’instance du connecteur. Cela signifie que chaque instance de connecteur envoie des messages à une partition différente. Cela peut aider à améliorer l’équilibrage de charge ainsi que l’emplacement des données.
topic Utilise le nom de la rubrique MQTT comme clé pour le partitionnement. Cela signifie que les messages portant le même nom de rubrique MQTT sont envoyés à la même partition. Cela peut aider à améliorer l’ordre des messages ainsi que la localisation des données.
property Utilise une propriété de message MQTT comme clé pour le partitionnement. Spécifiez le nom de la propriété dans le champ partitionKeyProperty. Cela signifie que les messages ayant la même valeur de propriété sont envoyés à la même partition. Cela peut aider à améliorer l’ordre des messages ainsi que la localisation des données lors de critères personnalisés.

Voici un exemple d’utilisation de la stratégie de gestion des partitions :

partitionStrategy: property
partitionKeyProperty: device-id

Cela signifie que les messages ayant la même propriété device-id sont envoyés à la même partition.

Itinéraires

Le champ routes détermine un liste des routes destinées au transfert de données entre les rubriques MQTT et les rubriques Kafka. Chaque route peut avoir un champ mqttToKafka ou un champ kafkaToMqtt, selon la direction du transfert de données.

MQTT vers Kafka

Le champ mqttToKafka définit un itinéraire qui transfère les données d’une rubrique MQTT vers une rubrique Kafka.

Champ Description Obligatoire
name Nom unique de la route. Oui
mqttTopic Rubrique MQTT à partir de laquelle s’abonner. Vous pouvez utiliser des caractères génériques (# et +) pour faire correspondre plusieurs rubriques. Oui
kafkaTopic Rubrique Kafka vers laquelle envoyer. Oui
kafkaAcks Nombre d’accusés de réception requis par le connecteur à partir du point de terminaison Kafka. Les valeurs possibles sont zero, one, ou all. Non
Qualité de service (QoS) Niveau de qualité de service (QoS) nécessaire pour s’abonner à la rubrique MQTT. Valeurs possibles : 0 ou 1 (par défaut). QoS 2 n’est actuellement pas pris en charge. Oui
sharedSubscription Configuration de l’utilisation d’abonnements partagés pour les rubriques MQTT. Spécifiez le groupName, qui est un identificateur unique pour un groupe d’abonnés et le groupMinimumShareNumber, qui est le nombre d’abonnés d’un groupe qui reçoivent des messages d’une rubrique. Par exemple, si groupName est « group1 » et groupMinimumShareNumber est 3, le connecteur crée alors trois abonnés portant le même nom de groupe pour qu’ils reçoivent des messages d’une rubrique. Cette fonctionnalité vous permet de répartir des messages entre plusieurs abonnés sans perdre de message ni créer des doublons. Non

Voici un exemple d’utilisation de la route mqttToKafka :

mqttToKafka:
  mqttTopic: temperature-alerts/#
  kafkaTopic: receiving-event-hub
  kafkaAcks: one
  qos: 1
  sharedSubscription:
    groupName: group1
    groupMinimumShareNumber: 3

Dans cet exemple, les messages provenant de rubriques MQTT qui correspondent à temperature-alerts/# sont envoyés à la rubrique Kafka receiving-event-hub avec une qualité de service équivalente à 1 et groupe d’abonnements partagés « group1 » ayant le numéro de partage 3.

Kafka vers MQTT

Le champ kafkaToMqtt définit une route qui transfère les données d’une rubrique Kafka vers une rubrique MQTT.

Champ Description Obligatoire
name Nom unique de la route. Oui
kafkaTopic Rubrique Kafka à partir de laquelle extraire. Oui
mqttTopic Rubrique MQTT dans laquelle publier. Oui
consumerGroupId Préfixe de l’ID de groupe de consommateurs pour chaque route Kafka vers MQTT. Si ce n’est pas le cas, l’ID du groupe de consommateurs est défini sur le même nom que le nom de la route. Non
Qualité de service (QoS) Niveau de qualité de service (QoS) des messages publiés dans la rubrique MQTT. Les valeurs possibles sont 0 ou 1 (par défaut). QoS 2 n’est actuellement pas pris en charge. Si QoS est défini sur 1, le connecteur publie le message dans la rubrique MQTT, puis attend l’accusé de réception (ack) avant de confirmer le message et de le renvoyer à Kafka. Pour QoS 0, le connecteur confirme immédiatement sans recevoir l’ack de MQTT. Non

Voici un exemple d’utilisation de la route kafkaToMqtt :

kafkaToMqtt:
  kafkaTopic: sending-event-hub
  mqttTopic: heater-commands
  qos: 0

Dans cet exemple, les messages de la rubrique Kafkasending-event-hub* sont publiés dans la rubrique MQTT heater-commands avec une qualité de service de niveau 0.

Le nom du hub d’événement doit correspondre à la rubrique Kafka

Chaque hub d’événement individuel, et non l’espace de noms, doit pas être nommé exactement de la même manière que la rubrique Kafka prévue spécifiée dans les routes. En outre, la chaîne de connexion EntityPath doit correspondre si la chaîne de connexion est étendue à un hub d’événements. Cette exigence est due au fait que l'espace de noms Event Hubs est analogue au cluster Kafka et que le nom du hub d’événements est analogue à une rubrique Kafka, de sorte que le nom de la rubrique Kafka doit correspondre au nom du hub d’événements.

Décalages de groupe de consommateurs Kafka

Si le connecteur se déconnecte ou est supprimé puis réinstallé avec le même ID de groupe de consommateurs Kafka, le décalage du groupe de consommateurs (la dernière position à partir de laquelle le consommateur Kafka lit les messages) est stocké dans Azure Event Hubs. Pour plus d’informations, consultez Groupe de consommateurs Event Hubs vs. groupe de consommateurs Kafka.

Version MQTT

Ce connecteur utilise uniquement MQTT v5.

Publier et souscrire à des messages MQTT en utilisant Azure IoT MQ (préversion)