Отправка и получение сообщений между предварительной версией Azure IoT MQ и Центры событий Azure или Kafka

Внимание

Предварительная версия операций Интернета вещей Azure, включенная Azure Arc в настоящее время находится в предварительной версии. Не следует использовать это программное обеспечение предварительной версии в рабочих средах.

Юридические условия, применимые к функциям Azure, которые находятся в состоянии бета-версии, предварительной версии или иным образом еще не выпущены в общедоступной версии, см. на странице Дополнительные условия использования предварительных версий в Microsoft Azure.

Соединитель Kafka отправляет сообщения из брокера MQ Предварительной версии MQTT Azure IoT в конечную точку Kafka и аналогично извлекает сообщения другим способом. Так как Центры событий Azure поддерживает API Kafka, соединитель работает вне поля с Центрами событий.

Настройка соединителя Центров событий через конечную точку Kafka

По умолчанию соединитель не установлен в Azure IoT MQ. Он должен быть явно включен с указанными учетными данными сопоставления разделов и проверки подлинности. Выполните следующие действия, чтобы включить двунаправленное взаимодействие между IoT MQ и Центры событий Azure через конечную точку Kafka.

  1. Создайте пространство имен Центров событий.

  2. Создайте концентратор событий для каждого раздела Kafka.

Предоставление соединителю доступа к пространству имен Центров событий

Предоставление расширения IoT MQ Arc пространству имен Центров событий является наиболее удобным способом установления безопасного подключения от соединителя Kakfa Центра событий IoT MQ к Центрам событий.

Сохраните следующий шаблон Bicep в файл и примените его к Azure CLI после задания допустимых параметров для вашей среды:

Примечание.

Шаблон Bicep предполагает, что кластер с коннектом Arc и пространство имен Центров событий находятся в одной группе ресурсов, измените шаблон, если ваша среда отличается.

@description('Location for cloud resources')
param mqExtensionName string = 'mq'
param clusterName string = 'clusterName'
param eventHubNamespaceName string = 'default'

resource connectedCluster 'Microsoft.Kubernetes/connectedClusters@2021-10-01' existing = {
  name: clusterName
}

resource mqExtension 'Microsoft.KubernetesConfiguration/extensions@2022-11-01' existing = {
  name: mqExtensionName
  scope: connectedCluster
}

resource ehNamespace 'Microsoft.EventHub/namespaces@2021-11-01' existing = {
  name: eventHubNamespaceName
}

// Role assignment for Event Hubs Data Receiver role
resource roleAssignmentDataReceiver 'Microsoft.Authorization/roleAssignments@2022-04-01' = {
  name: guid(ehNamespace.id, mqExtension.id, '7f951dda-4ed3-4680-a7ca-43fe172d538d')
  scope: ehNamespace
  properties: {
     // ID for Event Hubs Data Receiver role is a638d3c7-ab3a-418d-83e6-5f17a39d4fde
    roleDefinitionId: resourceId('Microsoft.Authorization/roleDefinitions', 'a638d3c7-ab3a-418d-83e6-5f17a39d4fde') 
    principalId: mqExtension.identity.principalId
    principalType: 'ServicePrincipal'
  }
}

// Role assignment for Event Hubs Data Sender role
resource roleAssignmentDataSender 'Microsoft.Authorization/roleAssignments@2022-04-01' = {
  name: guid(ehNamespace.id, mqExtension.id, '69b88ce2-a752-421f-bd8b-e230189e1d63')
  scope: ehNamespace
  properties: {
    // ID for Event Hubs Data Sender role is 2b629674-e913-4c01-ae53-ef4638d8f975
    roleDefinitionId: resourceId('Microsoft.Authorization/roleDefinitions', '2b629674-e913-4c01-ae53-ef4638d8f975') 
    principalId: mqExtension.identity.principalId
    principalType: 'ServicePrincipal'
  }
}
# Set the required environment variables

# Resource group for resources
RESOURCE_GROUP=xxx

# Bicep template files name
TEMPLATE_FILE_NAME=xxx

# MQ Arc extension name
MQ_EXTENSION_NAME=xxx

# Arc connected cluster name
CLUSTER_NAME=xxx

# Event Hubs namespace name
EVENTHUB_NAMESPACE=xxx


az deployment group create \
      --name assign-RBAC-roles \
      --resource-group $RESOURCE_GROUP \
      --template-file $TEMPLATE_FILE_NAME \
      --parameters mqExtensionName=$MQ_EXTENSION_NAME \
      --parameters clusterName=$CLUSTER_NAME \
      --parameters eventHubNamespaceName=$EVENTHUB_NAMESPACE

Kafka Подключение or

Настраиваемый ресурс Kafka Подключение or позволяет настроить соединитель Kafka, который может взаимодействовать с узлом Kafka и Центрами событий. Соединитель Kafka может передавать данные между разделами MQTT и разделами Kafka, используя Центры событий в качестве конечной точки, совместимой с Kafka.

В следующем примере показано значение Kafka Подключение or CR, которое подключается к конечной точке Центров событий с помощью удостоверения Azure Центра событий Интернета вещей, предполагается, что другие ресурсы MQ были установлены с помощью краткого руководства.

apiVersion: mq.iotoperations.azure.com/v1beta1
kind: KafkaConnector
metadata:
  name: my-eh-connector
  namespace: azure-iot-operations # same as one used for other MQ resources
spec:
  image:
    pullPolicy: IfNotPresent
    repository: mcr.microsoft.com/azureiotoperations/kafka
    tag: 0.4.0-preview
  instances: 2
  clientIdPrefix: my-prefix
  kafkaConnection:
    # Port 9093 is Event Hubs' Kakfa endpoint
    # Plug in your Event Hubs namespace name
    endpoint: <NAMESPACE>.servicebus.windows.net:9093
    tls:
      tlsEnabled: true
    authentication:
      enabled: true
      authType:
        systemAssignedManagedIdentity:
          # plugin in your Event Hubs namespace name
          audience: "https://<NAMESPACE>.servicebus.windows.net" 
  localBrokerConnection:
    endpoint: "aio-mq-dmqtt-frontend:8883"
    tls:
      tlsEnabled: true
      trustedCaCertificateConfigMap: "aio-ca-trust-bundle-test-only"
    authentication:
      kubernetes: {}

В следующей таблице описаны поля в kafka Подключение or CR:

Поле Description Обязательное поле
Изображение Изображение соединителя Kafka. Можно указать pullPolicyи repositorytag изображение. Значения по умолчанию отображаются в предыдущем примере. Да
Экземпляры Количество экземпляров соединителя Kafka для запуска. Да
clientIdPrefix Строка для подготовки к идентификатору клиента, используемому соединителем. No
kafka Подключение ion Сведения о подключении конечной точки Центров событий. См. Подключение Kafka. Да
localBroker Подключение ion Сведения о подключении локального брокера, переопределивающее подключение брокера по умолчанию. См. раздел "Управление подключением локального брокера". No
LogLevel Уровень журнала соединителя Kafka. Возможные значения: трассировка, отладка, информация, предупреждение, ошибка или неустранимая версия. По умолчанию предупреждение. No

Подключение Kafka

Поле kafkaConnection определяет сведения о подключении конечной точки Kafka.

Поле Description Обязательное поле
конечная точка Узел и порт конечной точки Центров событий. Порт обычно равен 9093. Можно указать несколько конечных точек, разделенных запятыми, чтобы использовать синтаксис серверов начальной загрузки. Да
tls Конфигурация шифрования TLS. См. протокол TLS. Да
проверка подлинности Конфигурация проверки подлинности. См. раздел Аутентификация. No

TLS

Поле tls включает шифрование TLS для подключения и при необходимости указывает карту конфигурации ЦС.

Поле Description Обязательное поле
tlsEnabled Логическое значение, указывающее, включена ли шифрование TLS. Оно должно иметь значение true для обмена данными Центров событий. Да
TrustedCaCertificateConfigMap Имя карты конфигурации, содержащей сертификат ЦС для проверки удостоверения сервера. Это поле не требуется для обмена данными центров событий, так как центры событий используют известные ЦС, доверенные по умолчанию. Однако это поле можно использовать, если вы хотите использовать пользовательский сертификат ЦС. No

При указании доверенного ЦС необходимо создать ConfigMap, содержащий общедоступное зелье ЦС в формате PEM, и укажите имя в свойстве trustedCaCertificateConfigMap .

kubectl create configmap ca-pem --from-file path/to/ca.pem

Проверка подлинности

Поле проверки подлинности поддерживает различные типы методов проверки подлинности, таких как SASL, X509 или управляемое удостоверение.

Поле Description Обязательное поле
включена Логическое значение, указывающее, включена ли проверка подлинности. Да
authType Поле, содержащее используемый тип проверки подлинности. См. тип проверки подлинности
Тип проверки подлинности
Поле Description Обязательное поле
sasl Конфигурация проверки подлинности SASL. saslTypeУкажите , который может быть простым, scramSha256 или scramSha512, и token ссылаться на секрет Kubernetes secretName или Azure Key VaultkeyVault, содержащий пароль. Да, если используется проверка подлинности SASL
systemAssignedManagedIdentity Конфигурация для проверки подлинности управляемого удостоверения. Укажите аудиторию для запроса маркера, который должен соответствовать пространству имен Центров событий (https://<NAMESPACE>.servicebus.windows.net), так как соединитель является клиентом Kafka. Управляемое удостоверение, назначаемое системой, автоматически создается и назначается соединителю при его включении. Да, если используется проверка подлинности управляемого удостоверения
x509 Конфигурация для проверки подлинности X509. secretName Укажите или keyVault поле. Поле secretName — это имя секрета, содержащего сертификат клиента и ключ клиента в формате PEM, хранящийся в виде секрета TLS. Да, если используется проверка подлинности X509

Сведения об использовании Azure Key Vault и keyVault управлении секретами для Azure IoT MQ вместо секретов Kubernetes см. в статье "Управление секретами с помощью Azure Key Vault или Kubernetes".

Проверка подлинности в Центрах событий

Чтобы подключиться к Центрам событий с помощью секрета строка подключения и Kubernetes, используйте plain тип SASL и имя пользователя и $ConnectionString полный строка подключения в качестве пароля. Сначала создайте секрет Kubernetes:

kubectl create secret generic cs-secret -n azure-iot-operations \
  --from-literal=username='$ConnectionString' \
  --from-literal=password='Endpoint=sb://<NAMESPACE>.servicebus.windows.net/;SharedAccessKeyName=<KEY_NAME>;SharedAccessKey=<KEY>'

Затем наведите ссылку на секрет в конфигурации:

authentication:
  enabled: true
  authType:
    sasl:
      saslType: plain
      token:
        secretName: cs-secret

Чтобы использовать Azure Key Vault вместо секретов Kubernetes, создайте секрет Azure Key Vault с строка подключенияEndpoint=sb://.., наведите ссылку на него vaultSecretи укажите имя пользователя, как "$ConnectionString" в конфигурации.

authentication:
  enabled: true
  authType:
    sasl:
      saslType: plain
      token:
        keyVault:
          username: "$ConnectionString"
          vault:
            name: my-key-vault
            directoryId: <AKV directory ID>
            credentials:
              servicePrincipalLocalSecretName: aio-akv-sp
          vaultSecret:
            name: my-cs # Endpoint=sb://..
            # version: 939ecc2...

Чтобы использовать управляемое удостоверение, укажите его в качестве единственного метода при проверке подлинности. Кроме того, необходимо назначить роль управляемому удостоверению, предоставляющему разрешение на отправку и получение сообщений из Центров событий, таких как Центры событий Azure владельца данных или Центры событий Azure отправителя или получателя данных. Дополнительные сведения см. в статье Аутентификация приложения с помощью идентификатора Microsoft Entra для доступа к ресурсам Центров событий.

authentication:
  enabled: true
  authType:
    systemAssignedManagedIdentity:
      audience: https://<NAMESPACE>.servicebus.windows.net
X.509

Для X.509 используйте секрет TLS Kubernetes, содержащий открытый сертификат и закрытый ключ.

kubectl create secret tls my-tls-secret -n azure-iot-operations \
  --cert=path/to/cert/file \
  --key=path/to/key/file

Затем укажите secretName конфигурацию.

authentication:
  enabled: true
  authType:
    x509:
      secretName: my-tls-secret

Чтобы использовать Azure Key Vault, убедитесь, что сертификат и закрытый ключ импортируются правильно, а затем укажите ссылку.vaultCert

authentication:
  enabled: true
  authType:
    x509:
      keyVault:
        vault:
          name: my-key-vault
          directoryId: <AKV directory ID>
          credentials:
            servicePrincipalLocalSecretName: aio-akv-sp
        vaultCert:
          name: my-cert
          # version: 939ecc2...
        ## If presenting full chain also  
        # vaultCaChainSecret:
        #   name: my-chain

Или, если требуется представить полную цепочку, отправьте полный сертификат цепочки и ключ в AKV в качестве PFX-файла и используйте vaultCaChainSecret поле.

# ...
keyVault:
  vaultCaChainSecret:
    name: my-cert
    # version: 939ecc2...

Управление подключением локального брокера

Как и мост MQTT, соединитель Центров событий выступает в качестве клиента брокера MQ MQTT Интернета вещей. Если вы настроили порт прослушивателя и (или) проверку подлинности брокера MQ MQTT IoT, переопределите локальную конфигурацию подключения MQTT для соединителя Центров событий. Дополнительные сведения см. в статье MQTT, чтобы узнать больше о подключении локального брокера mQTT.

Kafka Подключение orTopicMap

Настраиваемый ресурс Kafka Подключение orTopicMap (CR) позволяет определить сопоставление между разделами MQTT и разделами Kafka для двунаправленной передачи данных. Укажите ссылку на Kafka Подключение or CR и список маршрутов. Каждый маршрут может быть маршрутом MQTT в Kafka или маршрутом Kafka к MQTT. Например:

apiVersion: mq.iotoperations.azure.com/v1beta1
kind: KafkaConnectorTopicMap
metadata:
  name: my-eh-topic-map
  namespace: <SAME NAMESPACE AS BROKER> # For example "default"
spec:
  kafkaConnectorRef: my-eh-connector
  compression: snappy
  batching:
    enabled: true
    latencyMs: 1000
    maxMessages: 100
    maxBytes: 1024
  partitionStrategy: property
  partitionKeyProperty: device-id
  copyMqttProperties: true
  routes:
    # Subscribe from MQTT topic "temperature-alerts/#" and send to Kafka topic "receiving-event-hub"
    - mqttToKafka:
        name: "route1"
        mqttTopic: temperature-alerts/#
        kafkaTopic: receiving-event-hub
        kafkaAcks: one
        qos: 1
        sharedSubscription:
          groupName: group1
          groupMinimumShareNumber: 3
    # Pull from kafka topic "sending-event-hub" and publish to MQTT topic "heater-commands"
    - kafkaToMqtt:
        name: "route2"
        consumerGroupId: mqConnector
        kafkaTopic: sending-event-hub
        mqttTopic: heater-commands
        qos: 0

В следующей таблице описаны поля в Kafka Подключение orTopicMap CR:

Поле Description Обязательное поле
kafka Подключение orRef Имя kafka Подключение or CR, к которому принадлежит эта карта раздела. Да
compression Конфигурация сжатия сообщений, отправленных в разделы Kafka. См. раздел " Сжатие". No
пакетная обработка Конфигурация пакетной обработки сообщений, отправляемых в разделы Kafka. См . пакетную обработку. No
partitionStrategy Стратегия обработки секций Kafka при отправке сообщений в разделы Kafka. См . стратегию обработки секций. No
copyMqttProperties Логическое значение для управления копированием свойств системы И пользователя MQTT в заголовок сообщения Kafka. Свойства пользователей копируются как есть. Некоторые преобразования выполняются с помощью системных свойств. Значение по умолчанию — false. No
routes Список маршрутов для передачи данных между разделами MQTT и разделами Kafka. Каждый маршрут может иметь поле mqttToKafka или kafkaToMqtt поле в зависимости от направления передачи данных. См. статью "Маршруты". Да

Сжатие

Поле сжатия позволяет сжимать сообщения, отправленные в разделы Kafka. Сжатие помогает сократить пропускную способность сети и пространство хранилища, необходимые для передачи данных. Однако сжатие также добавляет некоторые издержки и задержку в процесс. Поддерживаемые типы сжатия перечислены в следующей таблице.

значение Описание
ничего Сжатие или пакетная обработка не применяется. Значение по умолчанию не является значением, если сжатие не указано.
gzip Применяются сжатие и пакетная обработка GZIP. GZIP — это алгоритм сжатия общего назначения, который обеспечивает хороший баланс между коэффициентом сжатия и скоростью.
snappy Применяются сжатие snappy и пакетная обработка. Snappy — это алгоритм быстрого сжатия, который предлагает умеренное соотношение сжатия и скорость.
lz4 Применяются сжатие LZ4 и пакетная обработка. LZ4 — это алгоритм быстрого сжатия, который обеспечивает низкое соотношение сжатия и высокую скорость.

Пакетная обработка

Помимо сжатия, можно также настроить пакетную обработку сообщений перед отправкой в разделы Kafka. Пакетная обработка позволяет группировать несколько сообщений вместе и сжимать их в виде одной единицы, что может повысить эффективность сжатия и снизить нагрузку на сеть.

Поле Description Обязательное поле
включена Логическое значение, указывающее, включена ли пакетная обработка. Если значение не задано, значение по умолчанию равно false. Да
задержки Максимальный интервал времени в миллисекундах, который можно буферизировать перед отправкой сообщений. Если этот интервал достигнут, все буферные сообщения отправляются в виде пакета независимо от того, сколько или сколько их размеров. Если значение не задано, значение по умолчанию равно 5. No
maxMessages Максимальное количество сообщений, которые можно буферизать перед отправкой. Если это число достигнуто, все буферные сообщения отправляются в виде пакета независимо от того, насколько они большие или как долго они буферизаются. Если значение не задано, значение по умолчанию равно 100000. No
maxBytes Максимальный размер в байтах, который можно буферизать перед отправкой. Если этот размер достигнут, все буферные сообщения отправляются в виде пакета независимо от того, сколько они находятся в буфере или как долго они буферичены. Значение по умолчанию — 1000000 (1 МБ). No

Пример использования пакетной обработки:

batching:
  enabled: true
  latencyMs: 1000
  maxMessages: 100
  maxBytes: 1024

Это означает, что сообщения отправляются либо при наличии 100 сообщений в буфере, либо при наличии 1024 байтов в буфере или при истечении 1000 миллисекунда с момента последней отправки.

Стратегия обработки секций

Стратегия обработки секций — это функция, которая позволяет управлять назначением сообщений секциям Kafka при отправке их в разделы Kafka. Секции Kafka — это логические сегменты раздела Kafka, обеспечивающие параллельную обработку и отказоустойчивость. Каждое сообщение в разделе Kafka содержит секцию и смещение, используемое для идентификации и упорядочивания сообщений.

По умолчанию соединитель Kafka назначает сообщения случайным секциям с помощью алгоритма циклического перебора. Однако можно использовать различные стратегии для назначения сообщений секциям на основе некоторых критериев, таких как имя раздела MQTT или свойство сообщения MQTT. Это поможет повысить балансировку нагрузки, локализацию данных или упорядочивание сообщений.

значение Описание
default Назначает сообщения случайным секциям с помощью алгоритма циклического перебора. Это значение по умолчанию, если стратегия не указана.
static Назначает сообщения фиксированному номеру секции, который является производным от идентификатора экземпляра соединителя. Это означает, что каждый экземпляр соединителя отправляет сообщения в другую секцию. Это может помочь добиться лучшей балансировки нагрузки и локальности данных.
topic Использует имя раздела MQTT в качестве ключа для секционирования. Это означает, что сообщения с тем же именем раздела MQTT отправляются в ту же секцию. Это может помочь добиться лучшего порядка сообщений и локальности данных.
свойство Использует свойство сообщения MQTT в качестве ключа для секционирования. Укажите имя свойства в partitionKeyProperty поле. Это означает, что сообщения с тем же значением свойства отправляются в одну секцию. Это поможет улучшить порядок сообщений и локализацию данных на основе пользовательского критерия.

Пример использования стратегии обработки секций:

partitionStrategy: property
partitionKeyProperty: device-id

Это означает, что сообщения с тем же свойством device-id отправляются в ту же секцию.

Маршруты

Поле маршрутов определяет список маршрутов для передачи данных между разделами MQTT и разделами Kafka. Каждый маршрут может иметь поле mqttToKafka или kafkaToMqtt поле в зависимости от направления передачи данных.

MQTT в Kafka

Поле mqttToKafka определяет маршрут, который передает данные из раздела MQTT в раздел Kafka.

Поле Description Обязательное поле
name Уникальное имя маршрута. Да
mqttTopic Раздел MQTT для подписки. Для сопоставления нескольких разделов можно использовать дикие карта символы (#и+) . Да
kafkaTopic Раздел Kafka для отправки. Да
kafkaAcks Количество подтверждений, необходимых соединителю из конечной точки Kafka. Возможные значения: zero , oneили all. No
качество обслуживания Уровень качества обслуживания (QoS) для подписки на раздел MQTT. Возможные значения: 0 или 1 (по умолчанию). QoS 2 в настоящее время не поддерживается. Да
sharedSubscription Конфигурация использования общих подписок для разделов MQTT. groupNameУкажите уникальный идентификатор для группы подписчиков и groupMinimumShareNumberчисло подписчиков в группе, получающей сообщения из раздела. Например, если имя группы — group1 и groupMinimumShareNumber равно 3, соединитель создает трех подписчиков с одинаковым именем группы для получения сообщений из раздела. Эта функция позволяет распространять сообщения среди нескольких подписчиков, не теряя никаких сообщений или создавая дубликаты. No

Пример использования mqttToKafka маршрута:

mqttToKafka:
  mqttTopic: temperature-alerts/#
  kafkaTopic: receiving-event-hub
  kafkaAcks: one
  qos: 1
  sharedSubscription:
    groupName: group1
    groupMinimumShareNumber: 3

В этом примере сообщения из разделов MQTT, соответствующих предупреждениям о температуре/# , отправляются в раздел Kafka, получающий события-концентратор с эквивалентом QoS 1 и общей группой подписок "group1" с общим номером 3.

Kafka to MQTT

Поле kafkaToMqtt определяет маршрут, который передает данные из раздела Kafka в раздел MQTT.

Поле Description Обязательное поле
name Уникальное имя маршрута. Да
kafkaTopic Раздел Kafka, из который вытащить. Да
mqttTopic Раздел MQTT для публикации. Да
consumerGroupId Префикс идентификатора группы потребителей для каждого маршрута Kafka к MQTT. Если этот параметр не задан, идентификатор группы потребителей имеет то же значение, что и имя маршрута. No
качество обслуживания Уровень качества обслуживания (QoS) для сообщений, опубликованных в разделе MQTT. Возможные значения: 0 или 1 (по умолчанию). QoS 2 в настоящее время не поддерживается. Если для QoS задано значение 1, соединитель публикует сообщение в разделе MQTT, а затем ожидает взлома, прежде чем зафиксировать сообщение обратно в Kafka. Для QoS 0 соединитель фиксируется немедленно без ack MQTT. No

Пример использования kafkaToMqtt маршрута:

kafkaToMqtt:
  kafkaTopic: sending-event-hub
  mqttTopic: heater-commands
  qos: 0

В этом примере сообщения из раздела Kafka, отправляющего события*, публикуются в командах MQTT с уровнем QoS 0.

Имя концентратора событий должно соответствовать разделу Kafka

Каждый отдельный концентратор событий не пространство имен должен быть назван точно так же, как и предполагаемый раздел Kafka, указанный в маршрутах. Кроме того, строка подключения EntityPath должен соответствовать, если строка подключения область в один концентратор событий. Это требование связано с тем, что пространство имен Центров событий аналогично кластеру Kafka и имени концентратора событий аналогично разделу Kafka, поэтому имя раздела Kafka должно совпадать с именем концентратора событий.

Смещения группы потребителей Kafka

Если соединитель отключен или удален и переустановлен с тем же идентификатором группы потребителей Kafka, смещение группы потребителей (последняя позиция, из которой сообщения для чтения потребителей Kafka) хранится в Центры событий Azure. Дополнительные сведения см. в разделе "Группа потребителей Центров событий" и группа потребителей Kafka.

Версия MQTT

Этот соединитель использует только MQTT версии 5.

Публикация и подписка на сообщения MQTT с помощью предварительной версии Azure IoT MQ