Отправка и получение сообщений между предварительной версией Azure IoT MQ и Центры событий Azure или Kafka
Внимание
Предварительная версия операций Интернета вещей Azure, включенная Azure Arc в настоящее время находится в предварительной версии. Не следует использовать это программное обеспечение предварительной версии в рабочих средах.
Юридические условия, применимые к функциям Azure, которые находятся в состоянии бета-версии, предварительной версии или иным образом еще не выпущены в общедоступной версии, см. на странице Дополнительные условия использования предварительных версий в Microsoft Azure.
Соединитель Kafka отправляет сообщения из брокера MQ Предварительной версии MQTT Azure IoT в конечную точку Kafka и аналогично извлекает сообщения другим способом. Так как Центры событий Azure поддерживает API Kafka, соединитель работает вне поля с Центрами событий.
Настройка соединителя Центров событий через конечную точку Kafka
По умолчанию соединитель не установлен в Azure IoT MQ. Он должен быть явно включен с указанными учетными данными сопоставления разделов и проверки подлинности. Выполните следующие действия, чтобы включить двунаправленное взаимодействие между IoT MQ и Центры событий Azure через конечную точку Kafka.
Создайте концентратор событий для каждого раздела Kafka.
Предоставление соединителю доступа к пространству имен Центров событий
Предоставление расширения IoT MQ Arc пространству имен Центров событий является наиболее удобным способом установления безопасного подключения от соединителя Kakfa Центра событий IoT MQ к Центрам событий.
Сохраните следующий шаблон Bicep в файл и примените его к Azure CLI после задания допустимых параметров для вашей среды:
Примечание.
Шаблон Bicep предполагает, что кластер с коннектом Arc и пространство имен Центров событий находятся в одной группе ресурсов, измените шаблон, если ваша среда отличается.
@description('Location for cloud resources')
param mqExtensionName string = 'mq'
param clusterName string = 'clusterName'
param eventHubNamespaceName string = 'default'
resource connectedCluster 'Microsoft.Kubernetes/connectedClusters@2021-10-01' existing = {
name: clusterName
}
resource mqExtension 'Microsoft.KubernetesConfiguration/extensions@2022-11-01' existing = {
name: mqExtensionName
scope: connectedCluster
}
resource ehNamespace 'Microsoft.EventHub/namespaces@2021-11-01' existing = {
name: eventHubNamespaceName
}
// Role assignment for Event Hubs Data Receiver role
resource roleAssignmentDataReceiver 'Microsoft.Authorization/roleAssignments@2022-04-01' = {
name: guid(ehNamespace.id, mqExtension.id, '7f951dda-4ed3-4680-a7ca-43fe172d538d')
scope: ehNamespace
properties: {
// ID for Event Hubs Data Receiver role is a638d3c7-ab3a-418d-83e6-5f17a39d4fde
roleDefinitionId: resourceId('Microsoft.Authorization/roleDefinitions', 'a638d3c7-ab3a-418d-83e6-5f17a39d4fde')
principalId: mqExtension.identity.principalId
principalType: 'ServicePrincipal'
}
}
// Role assignment for Event Hubs Data Sender role
resource roleAssignmentDataSender 'Microsoft.Authorization/roleAssignments@2022-04-01' = {
name: guid(ehNamespace.id, mqExtension.id, '69b88ce2-a752-421f-bd8b-e230189e1d63')
scope: ehNamespace
properties: {
// ID for Event Hubs Data Sender role is 2b629674-e913-4c01-ae53-ef4638d8f975
roleDefinitionId: resourceId('Microsoft.Authorization/roleDefinitions', '2b629674-e913-4c01-ae53-ef4638d8f975')
principalId: mqExtension.identity.principalId
principalType: 'ServicePrincipal'
}
}
# Set the required environment variables
# Resource group for resources
RESOURCE_GROUP=xxx
# Bicep template files name
TEMPLATE_FILE_NAME=xxx
# MQ Arc extension name
MQ_EXTENSION_NAME=xxx
# Arc connected cluster name
CLUSTER_NAME=xxx
# Event Hubs namespace name
EVENTHUB_NAMESPACE=xxx
az deployment group create \
--name assign-RBAC-roles \
--resource-group $RESOURCE_GROUP \
--template-file $TEMPLATE_FILE_NAME \
--parameters mqExtensionName=$MQ_EXTENSION_NAME \
--parameters clusterName=$CLUSTER_NAME \
--parameters eventHubNamespaceName=$EVENTHUB_NAMESPACE
Kafka Подключение or
Настраиваемый ресурс Kafka Подключение or позволяет настроить соединитель Kafka, который может взаимодействовать с узлом Kafka и Центрами событий. Соединитель Kafka может передавать данные между разделами MQTT и разделами Kafka, используя Центры событий в качестве конечной точки, совместимой с Kafka.
В следующем примере показано значение Kafka Подключение or CR, которое подключается к конечной точке Центров событий с помощью удостоверения Azure Центра событий Интернета вещей, предполагается, что другие ресурсы MQ были установлены с помощью краткого руководства.
apiVersion: mq.iotoperations.azure.com/v1beta1
kind: KafkaConnector
metadata:
name: my-eh-connector
namespace: azure-iot-operations # same as one used for other MQ resources
spec:
image:
pullPolicy: IfNotPresent
repository: mcr.microsoft.com/azureiotoperations/kafka
tag: 0.4.0-preview
instances: 2
clientIdPrefix: my-prefix
kafkaConnection:
# Port 9093 is Event Hubs' Kakfa endpoint
# Plug in your Event Hubs namespace name
endpoint: <NAMESPACE>.servicebus.windows.net:9093
tls:
tlsEnabled: true
authentication:
enabled: true
authType:
systemAssignedManagedIdentity:
# plugin in your Event Hubs namespace name
audience: "https://<NAMESPACE>.servicebus.windows.net"
localBrokerConnection:
endpoint: "aio-mq-dmqtt-frontend:8883"
tls:
tlsEnabled: true
trustedCaCertificateConfigMap: "aio-ca-trust-bundle-test-only"
authentication:
kubernetes: {}
В следующей таблице описаны поля в kafka Подключение or CR:
Поле | Description | Обязательное поле |
---|---|---|
Изображение | Изображение соединителя Kafka. Можно указать pullPolicy и repository tag изображение. Значения по умолчанию отображаются в предыдущем примере. |
Да |
Экземпляры | Количество экземпляров соединителя Kafka для запуска. | Да |
clientIdPrefix | Строка для подготовки к идентификатору клиента, используемому соединителем. | No |
kafka Подключение ion | Сведения о подключении конечной точки Центров событий. См. Подключение Kafka. | Да |
localBroker Подключение ion | Сведения о подключении локального брокера, переопределивающее подключение брокера по умолчанию. См. раздел "Управление подключением локального брокера". | No |
LogLevel | Уровень журнала соединителя Kafka. Возможные значения: трассировка, отладка, информация, предупреждение, ошибка или неустранимая версия. По умолчанию предупреждение. | No |
Подключение Kafka
Поле kafkaConnection
определяет сведения о подключении конечной точки Kafka.
Поле | Description | Обязательное поле |
---|---|---|
конечная точка | Узел и порт конечной точки Центров событий. Порт обычно равен 9093. Можно указать несколько конечных точек, разделенных запятыми, чтобы использовать синтаксис серверов начальной загрузки. | Да |
tls | Конфигурация шифрования TLS. См. протокол TLS. | Да |
проверка подлинности | Конфигурация проверки подлинности. См. раздел Аутентификация. | No |
TLS
Поле tls
включает шифрование TLS для подключения и при необходимости указывает карту конфигурации ЦС.
Поле | Description | Обязательное поле |
---|---|---|
tlsEnabled | Логическое значение, указывающее, включена ли шифрование TLS. Оно должно иметь значение true для обмена данными Центров событий. | Да |
TrustedCaCertificateConfigMap | Имя карты конфигурации, содержащей сертификат ЦС для проверки удостоверения сервера. Это поле не требуется для обмена данными центров событий, так как центры событий используют известные ЦС, доверенные по умолчанию. Однако это поле можно использовать, если вы хотите использовать пользовательский сертификат ЦС. | No |
При указании доверенного ЦС необходимо создать ConfigMap, содержащий общедоступное зелье ЦС в формате PEM, и укажите имя в свойстве trustedCaCertificateConfigMap
.
kubectl create configmap ca-pem --from-file path/to/ca.pem
Проверка подлинности
Поле проверки подлинности поддерживает различные типы методов проверки подлинности, таких как SASL, X509 или управляемое удостоверение.
Поле | Description | Обязательное поле |
---|---|---|
включена | Логическое значение, указывающее, включена ли проверка подлинности. | Да |
authType | Поле, содержащее используемый тип проверки подлинности. См. тип проверки подлинности |
Тип проверки подлинности
Поле | Description | Обязательное поле |
---|---|---|
sasl | Конфигурация проверки подлинности SASL. saslType Укажите , который может быть простым, scramSha256 или scramSha512, и token ссылаться на секрет Kubernetes secretName или Azure Key VaultkeyVault , содержащий пароль. |
Да, если используется проверка подлинности SASL |
systemAssignedManagedIdentity | Конфигурация для проверки подлинности управляемого удостоверения. Укажите аудиторию для запроса маркера, который должен соответствовать пространству имен Центров событий (https://<NAMESPACE>.servicebus.windows.net ), так как соединитель является клиентом Kafka. Управляемое удостоверение, назначаемое системой, автоматически создается и назначается соединителю при его включении. |
Да, если используется проверка подлинности управляемого удостоверения |
x509 | Конфигурация для проверки подлинности X509. secretName Укажите или keyVault поле. Поле secretName — это имя секрета, содержащего сертификат клиента и ключ клиента в формате PEM, хранящийся в виде секрета TLS. |
Да, если используется проверка подлинности X509 |
Сведения об использовании Azure Key Vault и keyVault
управлении секретами для Azure IoT MQ вместо секретов Kubernetes см. в статье "Управление секретами с помощью Azure Key Vault или Kubernetes".
Проверка подлинности в Центрах событий
Чтобы подключиться к Центрам событий с помощью секрета строка подключения и Kubernetes, используйте plain
тип SASL и имя пользователя и $ConnectionString
полный строка подключения в качестве пароля. Сначала создайте секрет Kubernetes:
kubectl create secret generic cs-secret -n azure-iot-operations \
--from-literal=username='$ConnectionString' \
--from-literal=password='Endpoint=sb://<NAMESPACE>.servicebus.windows.net/;SharedAccessKeyName=<KEY_NAME>;SharedAccessKey=<KEY>'
Затем наведите ссылку на секрет в конфигурации:
authentication:
enabled: true
authType:
sasl:
saslType: plain
token:
secretName: cs-secret
Чтобы использовать Azure Key Vault вместо секретов Kubernetes, создайте секрет Azure Key Vault с строка подключенияEndpoint=sb://..
, наведите ссылку на него vaultSecret
и укажите имя пользователя, как "$ConnectionString"
в конфигурации.
authentication:
enabled: true
authType:
sasl:
saslType: plain
token:
keyVault:
username: "$ConnectionString"
vault:
name: my-key-vault
directoryId: <AKV directory ID>
credentials:
servicePrincipalLocalSecretName: aio-akv-sp
vaultSecret:
name: my-cs # Endpoint=sb://..
# version: 939ecc2...
Чтобы использовать управляемое удостоверение, укажите его в качестве единственного метода при проверке подлинности. Кроме того, необходимо назначить роль управляемому удостоверению, предоставляющему разрешение на отправку и получение сообщений из Центров событий, таких как Центры событий Azure владельца данных или Центры событий Azure отправителя или получателя данных. Дополнительные сведения см. в статье Аутентификация приложения с помощью идентификатора Microsoft Entra для доступа к ресурсам Центров событий.
authentication:
enabled: true
authType:
systemAssignedManagedIdentity:
audience: https://<NAMESPACE>.servicebus.windows.net
X.509
Для X.509 используйте секрет TLS Kubernetes, содержащий открытый сертификат и закрытый ключ.
kubectl create secret tls my-tls-secret -n azure-iot-operations \
--cert=path/to/cert/file \
--key=path/to/key/file
Затем укажите secretName
конфигурацию.
authentication:
enabled: true
authType:
x509:
secretName: my-tls-secret
Чтобы использовать Azure Key Vault, убедитесь, что сертификат и закрытый ключ импортируются правильно, а затем укажите ссылку.vaultCert
authentication:
enabled: true
authType:
x509:
keyVault:
vault:
name: my-key-vault
directoryId: <AKV directory ID>
credentials:
servicePrincipalLocalSecretName: aio-akv-sp
vaultCert:
name: my-cert
# version: 939ecc2...
## If presenting full chain also
# vaultCaChainSecret:
# name: my-chain
Или, если требуется представить полную цепочку, отправьте полный сертификат цепочки и ключ в AKV в качестве PFX-файла и используйте vaultCaChainSecret
поле.
# ...
keyVault:
vaultCaChainSecret:
name: my-cert
# version: 939ecc2...
Управление подключением локального брокера
Как и мост MQTT, соединитель Центров событий выступает в качестве клиента брокера MQ MQTT Интернета вещей. Если вы настроили порт прослушивателя и (или) проверку подлинности брокера MQ MQTT IoT, переопределите локальную конфигурацию подключения MQTT для соединителя Центров событий. Дополнительные сведения см. в статье MQTT, чтобы узнать больше о подключении локального брокера mQTT.
Kafka Подключение orTopicMap
Настраиваемый ресурс Kafka Подключение orTopicMap (CR) позволяет определить сопоставление между разделами MQTT и разделами Kafka для двунаправленной передачи данных. Укажите ссылку на Kafka Подключение or CR и список маршрутов. Каждый маршрут может быть маршрутом MQTT в Kafka или маршрутом Kafka к MQTT. Например:
apiVersion: mq.iotoperations.azure.com/v1beta1
kind: KafkaConnectorTopicMap
metadata:
name: my-eh-topic-map
namespace: <SAME NAMESPACE AS BROKER> # For example "default"
spec:
kafkaConnectorRef: my-eh-connector
compression: snappy
batching:
enabled: true
latencyMs: 1000
maxMessages: 100
maxBytes: 1024
partitionStrategy: property
partitionKeyProperty: device-id
copyMqttProperties: true
routes:
# Subscribe from MQTT topic "temperature-alerts/#" and send to Kafka topic "receiving-event-hub"
- mqttToKafka:
name: "route1"
mqttTopic: temperature-alerts/#
kafkaTopic: receiving-event-hub
kafkaAcks: one
qos: 1
sharedSubscription:
groupName: group1
groupMinimumShareNumber: 3
# Pull from kafka topic "sending-event-hub" and publish to MQTT topic "heater-commands"
- kafkaToMqtt:
name: "route2"
consumerGroupId: mqConnector
kafkaTopic: sending-event-hub
mqttTopic: heater-commands
qos: 0
В следующей таблице описаны поля в Kafka Подключение orTopicMap CR:
Поле | Description | Обязательное поле |
---|---|---|
kafka Подключение orRef | Имя kafka Подключение or CR, к которому принадлежит эта карта раздела. | Да |
compression | Конфигурация сжатия сообщений, отправленных в разделы Kafka. См. раздел " Сжатие". | No |
пакетная обработка | Конфигурация пакетной обработки сообщений, отправляемых в разделы Kafka. См . пакетную обработку. | No |
partitionStrategy | Стратегия обработки секций Kafka при отправке сообщений в разделы Kafka. См . стратегию обработки секций. | No |
copyMqttProperties | Логическое значение для управления копированием свойств системы И пользователя MQTT в заголовок сообщения Kafka. Свойства пользователей копируются как есть. Некоторые преобразования выполняются с помощью системных свойств. Значение по умолчанию — false. | No |
routes | Список маршрутов для передачи данных между разделами MQTT и разделами Kafka. Каждый маршрут может иметь поле mqttToKafka или kafkaToMqtt поле в зависимости от направления передачи данных. См. статью "Маршруты". |
Да |
Сжатие
Поле сжатия позволяет сжимать сообщения, отправленные в разделы Kafka. Сжатие помогает сократить пропускную способность сети и пространство хранилища, необходимые для передачи данных. Однако сжатие также добавляет некоторые издержки и задержку в процесс. Поддерживаемые типы сжатия перечислены в следующей таблице.
значение | Описание |
---|---|
ничего | Сжатие или пакетная обработка не применяется. Значение по умолчанию не является значением, если сжатие не указано. |
gzip | Применяются сжатие и пакетная обработка GZIP. GZIP — это алгоритм сжатия общего назначения, который обеспечивает хороший баланс между коэффициентом сжатия и скоростью. |
snappy | Применяются сжатие snappy и пакетная обработка. Snappy — это алгоритм быстрого сжатия, который предлагает умеренное соотношение сжатия и скорость. |
lz4 | Применяются сжатие LZ4 и пакетная обработка. LZ4 — это алгоритм быстрого сжатия, который обеспечивает низкое соотношение сжатия и высокую скорость. |
Пакетная обработка
Помимо сжатия, можно также настроить пакетную обработку сообщений перед отправкой в разделы Kafka. Пакетная обработка позволяет группировать несколько сообщений вместе и сжимать их в виде одной единицы, что может повысить эффективность сжатия и снизить нагрузку на сеть.
Поле | Description | Обязательное поле |
---|---|---|
включена | Логическое значение, указывающее, включена ли пакетная обработка. Если значение не задано, значение по умолчанию равно false. | Да |
задержки | Максимальный интервал времени в миллисекундах, который можно буферизировать перед отправкой сообщений. Если этот интервал достигнут, все буферные сообщения отправляются в виде пакета независимо от того, сколько или сколько их размеров. Если значение не задано, значение по умолчанию равно 5. | No |
maxMessages | Максимальное количество сообщений, которые можно буферизать перед отправкой. Если это число достигнуто, все буферные сообщения отправляются в виде пакета независимо от того, насколько они большие или как долго они буферизаются. Если значение не задано, значение по умолчанию равно 100000. | No |
maxBytes | Максимальный размер в байтах, который можно буферизать перед отправкой. Если этот размер достигнут, все буферные сообщения отправляются в виде пакета независимо от того, сколько они находятся в буфере или как долго они буферичены. Значение по умолчанию — 1000000 (1 МБ). | No |
Пример использования пакетной обработки:
batching:
enabled: true
latencyMs: 1000
maxMessages: 100
maxBytes: 1024
Это означает, что сообщения отправляются либо при наличии 100 сообщений в буфере, либо при наличии 1024 байтов в буфере или при истечении 1000 миллисекунда с момента последней отправки.
Стратегия обработки секций
Стратегия обработки секций — это функция, которая позволяет управлять назначением сообщений секциям Kafka при отправке их в разделы Kafka. Секции Kafka — это логические сегменты раздела Kafka, обеспечивающие параллельную обработку и отказоустойчивость. Каждое сообщение в разделе Kafka содержит секцию и смещение, используемое для идентификации и упорядочивания сообщений.
По умолчанию соединитель Kafka назначает сообщения случайным секциям с помощью алгоритма циклического перебора. Однако можно использовать различные стратегии для назначения сообщений секциям на основе некоторых критериев, таких как имя раздела MQTT или свойство сообщения MQTT. Это поможет повысить балансировку нагрузки, локализацию данных или упорядочивание сообщений.
значение | Описание |
---|---|
default | Назначает сообщения случайным секциям с помощью алгоритма циклического перебора. Это значение по умолчанию, если стратегия не указана. |
static | Назначает сообщения фиксированному номеру секции, который является производным от идентификатора экземпляра соединителя. Это означает, что каждый экземпляр соединителя отправляет сообщения в другую секцию. Это может помочь добиться лучшей балансировки нагрузки и локальности данных. |
topic | Использует имя раздела MQTT в качестве ключа для секционирования. Это означает, что сообщения с тем же именем раздела MQTT отправляются в ту же секцию. Это может помочь добиться лучшего порядка сообщений и локальности данных. |
свойство | Использует свойство сообщения MQTT в качестве ключа для секционирования. Укажите имя свойства в partitionKeyProperty поле. Это означает, что сообщения с тем же значением свойства отправляются в одну секцию. Это поможет улучшить порядок сообщений и локализацию данных на основе пользовательского критерия. |
Пример использования стратегии обработки секций:
partitionStrategy: property
partitionKeyProperty: device-id
Это означает, что сообщения с тем же свойством device-id отправляются в ту же секцию.
Маршруты
Поле маршрутов определяет список маршрутов для передачи данных между разделами MQTT и разделами Kafka. Каждый маршрут может иметь поле mqttToKafka
или kafkaToMqtt
поле в зависимости от направления передачи данных.
MQTT в Kafka
Поле mqttToKafka
определяет маршрут, который передает данные из раздела MQTT в раздел Kafka.
Поле | Description | Обязательное поле |
---|---|---|
name | Уникальное имя маршрута. | Да |
mqttTopic | Раздел MQTT для подписки. Для сопоставления нескольких разделов можно использовать дикие карта символы (# и+ ) . |
Да |
kafkaTopic | Раздел Kafka для отправки. | Да |
kafkaAcks | Количество подтверждений, необходимых соединителю из конечной точки Kafka. Возможные значения: zero , one или all . |
No |
качество обслуживания | Уровень качества обслуживания (QoS) для подписки на раздел MQTT. Возможные значения: 0 или 1 (по умолчанию). QoS 2 в настоящее время не поддерживается. | Да |
sharedSubscription | Конфигурация использования общих подписок для разделов MQTT. groupName Укажите уникальный идентификатор для группы подписчиков и groupMinimumShareNumber число подписчиков в группе, получающей сообщения из раздела. Например, если имя группы — group1 и groupMinimumShareNumber равно 3, соединитель создает трех подписчиков с одинаковым именем группы для получения сообщений из раздела. Эта функция позволяет распространять сообщения среди нескольких подписчиков, не теряя никаких сообщений или создавая дубликаты. |
No |
Пример использования mqttToKafka
маршрута:
mqttToKafka:
mqttTopic: temperature-alerts/#
kafkaTopic: receiving-event-hub
kafkaAcks: one
qos: 1
sharedSubscription:
groupName: group1
groupMinimumShareNumber: 3
В этом примере сообщения из разделов MQTT, соответствующих предупреждениям о температуре/# , отправляются в раздел Kafka, получающий события-концентратор с эквивалентом QoS 1 и общей группой подписок "group1" с общим номером 3.
Kafka to MQTT
Поле kafkaToMqtt
определяет маршрут, который передает данные из раздела Kafka в раздел MQTT.
Поле | Description | Обязательное поле |
---|---|---|
name | Уникальное имя маршрута. | Да |
kafkaTopic | Раздел Kafka, из который вытащить. | Да |
mqttTopic | Раздел MQTT для публикации. | Да |
consumerGroupId | Префикс идентификатора группы потребителей для каждого маршрута Kafka к MQTT. Если этот параметр не задан, идентификатор группы потребителей имеет то же значение, что и имя маршрута. | No |
качество обслуживания | Уровень качества обслуживания (QoS) для сообщений, опубликованных в разделе MQTT. Возможные значения: 0 или 1 (по умолчанию). QoS 2 в настоящее время не поддерживается. Если для QoS задано значение 1, соединитель публикует сообщение в разделе MQTT, а затем ожидает взлома, прежде чем зафиксировать сообщение обратно в Kafka. Для QoS 0 соединитель фиксируется немедленно без ack MQTT. | No |
Пример использования kafkaToMqtt
маршрута:
kafkaToMqtt:
kafkaTopic: sending-event-hub
mqttTopic: heater-commands
qos: 0
В этом примере сообщения из раздела Kafka, отправляющего события*, публикуются в командах MQTT с уровнем QoS 0.
Имя концентратора событий должно соответствовать разделу Kafka
Каждый отдельный концентратор событий не пространство имен должен быть назван точно так же, как и предполагаемый раздел Kafka, указанный в маршрутах. Кроме того, строка подключения EntityPath
должен соответствовать, если строка подключения область в один концентратор событий. Это требование связано с тем, что пространство имен Центров событий аналогично кластеру Kafka и имени концентратора событий аналогично разделу Kafka, поэтому имя раздела Kafka должно совпадать с именем концентратора событий.
Смещения группы потребителей Kafka
Если соединитель отключен или удален и переустановлен с тем же идентификатором группы потребителей Kafka, смещение группы потребителей (последняя позиция, из которой сообщения для чтения потребителей Kafka) хранится в Центры событий Azure. Дополнительные сведения см. в разделе "Группа потребителей Центров событий" и группа потребителей Kafka.
Версия MQTT
Этот соединитель использует только MQTT версии 5.
Связанный контент
Публикация и подписка на сообщения MQTT с помощью предварительной версии Azure IoT MQ