Mengirim dan menerima pesan antara Pratinjau Azure IoT MQ dan Azure Event Hubs atau Kafka

Penting

Pratinjau Operasi Azure IoT – diaktifkan oleh Azure Arc saat ini dalam PRATINJAU. Anda tidak boleh menggunakan perangkat lunak pratinjau ini di lingkungan produksi.

Lihat Ketentuan Penggunaan Tambahan untuk Pratinjau Microsoft Azure untuk persyaratan hukum yang berlaku pada fitur Azure dalam versi beta, pratinjau, atau belum dirilis secara umum.

Konektor Kafka mendorong pesan dari broker MQTT Pratinjau Azure IoT MQ ke titik akhir Kafka, dan juga menarik pesan dengan cara lain. Karena Azure Event Hubs mendukung Kafka API, konektor berfungsi secara langsung dengan Azure Event Hubs.

Mengonfigurasi konektor Azure Event Hubs melalui titik akhir Kafka

Secara default, konektor tidak diinstal dengan Azure IoT MQ. Ini harus diaktifkan secara eksplisit dengan pemetaan topik dan kredensial autentikasi yang ditentukan. Ikuti langkah-langkah ini untuk mengaktifkan komunikasi dua arah antara IoT MQ dan Azure Event Hubs melalui titik akhir Kafka-nya.

  1. Buat namespace layanan Azure Event Hubs.

  2. Buat pusat aktivitas untuk setiap topik Kafka.

Memberikan akses konektor ke namespace Layanan Pusat Aktivitas

Memberikan akses ekstensi IoT MQ Arc ke namespace Layanan Pusat Aktivitas adalah cara paling mudah untuk membuat koneksi yang aman dari konektor Kakfa IoT MQ ke Azure Event Hubs.

Simpan templat Bicep berikut ke file dan terapkan dengan Azure CLI setelah mengatur parameter yang valid untuk lingkungan Anda:

Catatan

Templat Bicep mengasumsikan kluster Arc connnected dan namespace Azure Event Hubs berada dalam grup sumber daya yang sama, sesuaikan templat jika lingkungan Anda berbeda.

@description('Location for cloud resources')
param mqExtensionName string = 'mq'
param clusterName string = 'clusterName'
param eventHubNamespaceName string = 'default'

resource connectedCluster 'Microsoft.Kubernetes/connectedClusters@2021-10-01' existing = {
  name: clusterName
}

resource mqExtension 'Microsoft.KubernetesConfiguration/extensions@2022-11-01' existing = {
  name: mqExtensionName
  scope: connectedCluster
}

resource ehNamespace 'Microsoft.EventHub/namespaces@2021-11-01' existing = {
  name: eventHubNamespaceName
}

// Role assignment for Event Hubs Data Receiver role
resource roleAssignmentDataReceiver 'Microsoft.Authorization/roleAssignments@2022-04-01' = {
  name: guid(ehNamespace.id, mqExtension.id, '7f951dda-4ed3-4680-a7ca-43fe172d538d')
  scope: ehNamespace
  properties: {
     // ID for Event Hubs Data Receiver role is a638d3c7-ab3a-418d-83e6-5f17a39d4fde
    roleDefinitionId: resourceId('Microsoft.Authorization/roleDefinitions', 'a638d3c7-ab3a-418d-83e6-5f17a39d4fde') 
    principalId: mqExtension.identity.principalId
    principalType: 'ServicePrincipal'
  }
}

// Role assignment for Event Hubs Data Sender role
resource roleAssignmentDataSender 'Microsoft.Authorization/roleAssignments@2022-04-01' = {
  name: guid(ehNamespace.id, mqExtension.id, '69b88ce2-a752-421f-bd8b-e230189e1d63')
  scope: ehNamespace
  properties: {
    // ID for Event Hubs Data Sender role is 2b629674-e913-4c01-ae53-ef4638d8f975
    roleDefinitionId: resourceId('Microsoft.Authorization/roleDefinitions', '2b629674-e913-4c01-ae53-ef4638d8f975') 
    principalId: mqExtension.identity.principalId
    principalType: 'ServicePrincipal'
  }
}
# Set the required environment variables

# Resource group for resources
RESOURCE_GROUP=xxx

# Bicep template files name
TEMPLATE_FILE_NAME=xxx

# MQ Arc extension name
MQ_EXTENSION_NAME=xxx

# Arc connected cluster name
CLUSTER_NAME=xxx

# Event Hubs namespace name
EVENTHUB_NAMESPACE=xxx


az deployment group create \
      --name assign-RBAC-roles \
      --resource-group $RESOURCE_GROUP \
      --template-file $TEMPLATE_FILE_NAME \
      --parameters mqExtensionName=$MQ_EXTENSION_NAME \
      --parameters clusterName=$CLUSTER_NAME \
      --parameters eventHubNamespaceName=$EVENTHUB_NAMESPACE

Kafka Koneksi or

Sumber daya kustom Kafka Koneksi or (CR) memungkinkan Anda mengonfigurasi konektor Kafka yang dapat mengkomunikasikan host Kafka dan Azure Event Hubs. Konektor Kafka dapat mentransfer data antara topik MQTT dan topik Kafka, menggunakan Azure Event Hubs sebagai titik akhir yang kompatibel dengan Kafka.

Contoh berikut menunjukkan Kafka Koneksi or CR yang tersambung ke titik akhir Azure Event Hubs menggunakan identitas Azure IoT MQ, ia mengasumsikan sumber daya MQ lainnya diinstal menggunakan mulai cepat:

apiVersion: mq.iotoperations.azure.com/v1beta1
kind: KafkaConnector
metadata:
  name: my-eh-connector
  namespace: azure-iot-operations # same as one used for other MQ resources
spec:
  image:
    pullPolicy: IfNotPresent
    repository: mcr.microsoft.com/azureiotoperations/kafka
    tag: 0.4.0-preview
  instances: 2
  clientIdPrefix: my-prefix
  kafkaConnection:
    # Port 9093 is Event Hubs' Kakfa endpoint
    # Plug in your Event Hubs namespace name
    endpoint: <NAMESPACE>.servicebus.windows.net:9093
    tls:
      tlsEnabled: true
    authentication:
      enabled: true
      authType:
        systemAssignedManagedIdentity:
          # plugin in your Event Hubs namespace name
          audience: "https://<NAMESPACE>.servicebus.windows.net" 
  localBrokerConnection:
    endpoint: "aio-mq-dmqtt-frontend:8883"
    tls:
      tlsEnabled: true
      trustedCaCertificateConfigMap: "aio-ca-trust-bundle-test-only"
    authentication:
      kubernetes: {}

Tabel berikut ini menjelaskan bidang di Kafka Koneksi or CR:

Bidang Deskripsi Wajib diisi
gambar Gambar konektor Kafka. Anda dapat menentukan pullPolicy, repository, dan tag gambar. Nilai default ditampilkan dalam contoh sebelumnya. Ya
instances Jumlah instans konektor Kafka yang akan dijalankan. Ya
clientIdPrefix String yang akan ditambahkan ke ID klien yang digunakan oleh konektor. No
kafka Koneksi ion Detail koneksi titik akhir Azure Event Hubs. Lihat Kafka Koneksi ion. Ya
localBroker Koneksi ion Detail koneksi broker lokal yang mengambil alih koneksi broker default. Lihat Mengelola koneksi broker lokal. No
logLevel Tingkat log konektor Kafka. Nilai yang mungkin adalah: jejak, debug, info, peringatan, kesalahan, atau fatal. Defaultnya adalah peringatan. No

Koneksi Kafka

Bidang kafkaConnection menentukan detail koneksi titik akhir Kafka.

Bidang Deskripsi Wajib diisi
titik akhir Host dan port titik akhir Azure Event Hubs. Port biasanya 9093. Anda dapat menentukan beberapa titik akhir yang dipisahkan oleh koma untuk menggunakan sintaks server bootstrap. Ya
tls Konfigurasi untuk enkripsi TLS. Lihat TLS. Ya
Autentikasi Konfigurasi untuk autentikasi. Lihat Autentikasi. No

TLS

Bidang ini tls memungkinkan enkripsi TLS untuk koneksi dan secara opsional menentukan peta konfigurasi CA.

Bidang Deskripsi Wajib diisi
tlsEnabled Nilai boolean yang menunjukkan apakah enkripsi TLS diaktifkan atau tidak. Ini harus diatur ke true untuk komunikasi Azure Event Hubs. Ya
trustedCaCertificateConfigMap Nama peta konfigurasi yang berisi sertifikat CA untuk memverifikasi identitas server. Bidang ini tidak diperlukan untuk komunikasi Azure Event Hubs, karena Azure Event Hubs menggunakan CA terkenal yang dipercaya secara default. Namun, Anda dapat menggunakan bidang ini jika Anda ingin menggunakan sertifikat CA kustom. No

Saat menentukan CA tepercaya diperlukan, buat ConfigMap yang berisi ramuan publik CA dalam format PEM, dan tentukan nama dalam trustedCaCertificateConfigMap properti .

kubectl create configmap ca-pem --from-file path/to/ca.pem

Autentikasi

Bidang autentikasi mendukung berbagai jenis metode autentikasi, seperti SASL, X509, atau identitas terkelola.

Bidang Deskripsi Wajib diisi
diaktifkan Nilai boolean yang menunjukkan apakah autentikasi diaktifkan atau tidak. Ya
authType Bidang yang berisi jenis autentikasi yang digunakan. Lihat Jenis Autentikasi
Jenis Autentikasi
Bidang Deskripsi Wajib diisi
sasl Konfigurasi untuk autentikasi SASL. saslTypeTentukan , yang bisa biasa, scramSha256, atau scramSha512, dan token untuk mereferensikan rahasia Kubernetes secretName atau Azure Key Vault keyVault yang berisi kata sandi. Ya, jika menggunakan autentikasi SASL
systemAssignedManagedIdentity Konfigurasi untuk autentikasi identitas terkelola. Tentukan audiens untuk permintaan token, yang harus cocok dengan namespace Layanan Pusat Aktivitas (https://<NAMESPACE>.servicebus.windows.net) karena konektor adalah klien Kafka. Identitas terkelola yang ditetapkan sistem secara otomatis dibuat dan ditetapkan ke konektor saat diaktifkan. Ya, jika menggunakan autentikasi identitas terkelola
x509 Konfigurasi untuk autentikasi X509. Tentukan secretName bidang atau keyVault . Bidang secretName adalah nama rahasia yang berisi sertifikat klien dan kunci klien dalam format PEM, disimpan sebagai rahasia TLS. Ya, jika menggunakan autentikasi X509

Untuk mempelajari cara menggunakan Azure Key Vault dan keyVault untuk mengelola rahasia untuk Azure IoT MQ alih-alih rahasia Kubernetes, lihat Mengelola rahasia menggunakan rahasia Azure Key Vault atau Kubernetes.

Mengautentikasi ke Azure Event Hubs

Untuk terhubung ke Azure Event Hubs menggunakan rahasia string koneksi dan Kubernetes, gunakan plain jenis SASL dan $ConnectionString sebagai nama pengguna dan string koneksi lengkap sebagai kata sandi. Pertama-tama buat rahasia Kubernetes:

kubectl create secret generic cs-secret -n azure-iot-operations \
  --from-literal=username='$ConnectionString' \
  --from-literal=password='Endpoint=sb://<NAMESPACE>.servicebus.windows.net/;SharedAccessKeyName=<KEY_NAME>;SharedAccessKey=<KEY>'

Kemudian, referensikan rahasia dalam konfigurasi:

authentication:
  enabled: true
  authType:
    sasl:
      saslType: plain
      token:
        secretName: cs-secret

Untuk menggunakan Azure Key Vault alih-alih rahasia Kubernetes, buat rahasia Azure Key Vault dengan string koneksi Endpoint=sb://.., referensikan dengan vaultSecret, dan tentukan nama pengguna seperti "$ConnectionString" dalam konfigurasi.

authentication:
  enabled: true
  authType:
    sasl:
      saslType: plain
      token:
        keyVault:
          username: "$ConnectionString"
          vault:
            name: my-key-vault
            directoryId: <AKV directory ID>
            credentials:
              servicePrincipalLocalSecretName: aio-akv-sp
          vaultSecret:
            name: my-cs # Endpoint=sb://..
            # version: 939ecc2...

Untuk menggunakan identitas terkelola, tentukan sebagai satu-satunya metode di bawah autentikasi. Anda juga perlu menetapkan peran ke identitas terkelola yang memberikan izin untuk mengirim dan menerima pesan dari Azure Event Hubs, seperti Pemilik Data Azure Event Hubs atau Pengirim/Penerima Data Azure Event Hubs. Untuk mempelajari selengkapnya, lihat Mengautentikasi aplikasi dengan ID Microsoft Entra untuk mengakses sumber daya Azure Event Hubs.

authentication:
  enabled: true
  authType:
    systemAssignedManagedIdentity:
      audience: https://<NAMESPACE>.servicebus.windows.net
X.509

Untuk X.509, gunakan rahasia Kubernetes TLS yang berisi sertifikat publik dan kunci privat.

kubectl create secret tls my-tls-secret -n azure-iot-operations \
  --cert=path/to/cert/file \
  --key=path/to/key/file

Kemudian tentukan secretName dalam konfigurasi.

authentication:
  enabled: true
  authType:
    x509:
      secretName: my-tls-secret

Untuk menggunakan Azure Key Vault, pastikan sertifikat dan kunci privat diimpor dengan benar lalu tentukan referensi dengan vaultCert.

authentication:
  enabled: true
  authType:
    x509:
      keyVault:
        vault:
          name: my-key-vault
          directoryId: <AKV directory ID>
          credentials:
            servicePrincipalLocalSecretName: aio-akv-sp
        vaultCert:
          name: my-cert
          # version: 939ecc2...
        ## If presenting full chain also  
        # vaultCaChainSecret:
        #   name: my-chain

Atau, jika menyajikan rantai penuh diperlukan, unggah sertifikasi rantai penuh dan kunci ke AKV sebagai file PFX dan gunakan bidang sebagai gantinya vaultCaChainSecret .

# ...
keyVault:
  vaultCaChainSecret:
    name: my-cert
    # version: 939ecc2...

Mengelola koneksi broker lokal

Seperti jembatan MQTT, konektor Azure Event Hubs bertindak sebagai klien ke broker MQTT IoT MQTT. Jika Anda telah menyesuaikan port pendengar dan/atau autentikasi broker MQTT IoT MQTT Anda, ambil alih konfigurasi koneksi MQTT lokal untuk konektor Azure Event Hubs juga. Untuk mempelajari lebih lanjut, lihat Koneksi broker lokal jembatan MQTT.

Kafka Koneksi orTopicMap

Sumber daya kustom (CR) Kafka Koneksi orTopicMap memungkinkan Anda menentukan pemetaan antara topik MQTT dan topik Kafka untuk transfer data dua arah. Tentukan referensi ke Kafka Koneksi or CR dan daftar rute. Setiap rute dapat berupa rute MQTT ke Kafka atau rute Kafka ke MQTT. Contohnya:

apiVersion: mq.iotoperations.azure.com/v1beta1
kind: KafkaConnectorTopicMap
metadata:
  name: my-eh-topic-map
  namespace: <SAME NAMESPACE AS BROKER> # For example "default"
spec:
  kafkaConnectorRef: my-eh-connector
  compression: snappy
  batching:
    enabled: true
    latencyMs: 1000
    maxMessages: 100
    maxBytes: 1024
  partitionStrategy: property
  partitionKeyProperty: device-id
  copyMqttProperties: true
  routes:
    # Subscribe from MQTT topic "temperature-alerts/#" and send to Kafka topic "receiving-event-hub"
    - mqttToKafka:
        name: "route1"
        mqttTopic: temperature-alerts/#
        kafkaTopic: receiving-event-hub
        kafkaAcks: one
        qos: 1
        sharedSubscription:
          groupName: group1
          groupMinimumShareNumber: 3
    # Pull from kafka topic "sending-event-hub" and publish to MQTT topic "heater-commands"
    - kafkaToMqtt:
        name: "route2"
        consumerGroupId: mqConnector
        kafkaTopic: sending-event-hub
        mqttTopic: heater-commands
        qos: 0

Tabel berikut ini menjelaskan bidang di Kafka Koneksi orTopicMap CR:

Bidang Deskripsi Wajib diisi
kafka Koneksi orRef Nama Kafka Koneksi or CR yang dimiliki peta topik ini. Ya
kompresi Konfigurasi untuk pemadatan untuk pesan yang dikirim ke topik Kafka. Lihat Pemadatan. No
pembuatan batch Konfigurasi untuk batching untuk pesan yang dikirim ke topik Kafka. Lihat Batching. No
partitionStrategy Strategi untuk menangani partisi Kafka saat mengirim pesan ke topik Kafka. Lihat Strategi penanganan partisi. No
copyMqttProperties Nilai Boolean untuk mengontrol apakah sistem MQTT dan properti pengguna disalin ke header pesan Kafka. Properti pengguna disalin apa adanya. Beberapa transformasi dilakukan dengan properti sistem. Default ke false. No
rute Daftar rute untuk transfer data antara topik MQTT dan topik Kafka. Setiap rute dapat memiliki mqttToKafka bidang atau kafkaToMqtt , tergantung pada arah transfer data. Lihat Rute. Ya

Kompresi

Bidang pemadatan memungkinkan pemadatan untuk pesan yang dikirim ke topik Kafka. Pemadatan membantu mengurangi bandwidth jaringan dan ruang penyimpanan yang diperlukan untuk transfer data. Namun, kompresi juga menambahkan beberapa overhead dan latensi ke proses. Jenis kompresi yang didukung tercantum dalam tabel berikut.

Nilai Deskripsi
tidak ada Tidak ada kompresi atau batching yang diterapkan. none adalah nilai default jika tidak ada kompresi yang ditentukan.
gzip Kompresi dan batching GZIP diterapkan. GZIP adalah algoritma kompresi tujuan umum yang menawarkan keseimbangan yang baik antara rasio kompresi dan kecepatan.
snappy Pemadatan dan batching snappy diterapkan. Snappy adalah algoritma kompresi cepat yang menawarkan rasio dan kecepatan kompresi sedang.
lz4 Kompresi dan batching LZ4 diterapkan. LZ4 adalah algoritma kompresi cepat yang menawarkan rasio kompresi rendah dan kecepatan tinggi.

Pembuatan batch

Selain kompresi, Anda juga dapat mengonfigurasi batching untuk pesan sebelum mengirimnya ke topik Kafka. Batching memungkinkan Anda mengelompokkan beberapa pesan bersama-sama dan mengompresinya sebagai satu unit, yang dapat meningkatkan efisiensi kompresi dan mengurangi overhead jaringan.

Bidang Deskripsi Wajib diisi
diaktifkan Nilai boolean yang menunjukkan apakah batching diaktifkan atau tidak. Jika tidak diatur, nilai default adalah palsu. Ya
latensiM Interval waktu maksimum dalam milidetik pesan dapat di-buffer sebelum dikirim. Jika interval ini tercapai, maka semua pesan yang di-buffer dikirim sebagai batch, terlepas dari berapa banyak atau seberapa besar pesan tersebut. Jika tidak diatur, nilai defaultnya adalah 5. No
maxMessages Jumlah maksimum pesan yang dapat di-buffer sebelum dikirim. Jika jumlah ini tercapai, maka semua pesan buffer dikirim sebagai batch, terlepas dari seberapa besar atau berapa lama pesan tersebut di-buffer. Jika tidak diatur, nilai defaultnya adalah 100000. No
maxBytes Ukuran maksimum dalam byte yang dapat di-buffer sebelum dikirim. Jika ukuran ini tercapai, maka semua pesan buffer dikirim sebagai batch, terlepas dari berapa banyak pesan tersebut atau berapa lama pesan tersebut di-buffer. Nilai defaultnya adalah 1000000 (1 MB). No

Contoh penggunaan batching adalah:

batching:
  enabled: true
  latencyMs: 1000
  maxMessages: 100
  maxBytes: 1024

Ini berarti bahwa pesan dikirim baik ketika ada 100 pesan di buffer, atau ketika ada 1024 byte dalam buffer, atau ketika 1000 milidetik berlalu sejak pengiriman terakhir, mana yang lebih dulu.

Strategi penanganan partisi

Strategi penanganan partisi adalah fitur yang memungkinkan Anda mengontrol bagaimana pesan ditetapkan ke partisi Kafka saat mengirimkannya ke topik Kafka. Partisi Kafka adalah segmen logis dari topik Kafka yang memungkinkan pemrosesan paralel dan toleransi kesalahan. Setiap pesan dalam topik Kafka memiliki partisi dan offset yang digunakan untuk mengidentifikasi dan memesan pesan.

Secara default, konektor Kafka menetapkan pesan ke partisi acak, menggunakan algoritma round-robin. Namun, Anda dapat menggunakan strategi yang berbeda untuk menetapkan pesan ke partisi berdasarkan beberapa kriteria, seperti nama topik MQTT atau properti pesan MQTT. Ini dapat membantu Anda mencapai penyeimbangan beban, lokalitas data, atau urutan pesan yang lebih baik.

Nilai Deskripsi
Default Menetapkan pesan ke partisi acak, menggunakan algoritma round-robin. Ini adalah nilai default jika tidak ada strategi yang ditentukan.
statik Menetapkan pesan ke nomor partisi tetap yang berasal dari ID instans konektor. Ini berarti bahwa setiap instans konektor mengirim pesan ke partisi yang berbeda. Ini dapat membantu mencapai penyeimbangan beban dan lokalitas data yang lebih baik.
topik Menggunakan nama topik MQTT sebagai kunci untuk partisi. Ini berarti bahwa pesan dengan nama topik MQTT yang sama dikirim ke partisi yang sama. Ini dapat membantu mencapai urutan pesan dan lokalitas data yang lebih baik.
properti Menggunakan properti pesan MQTT sebagai kunci untuk pemartisian. Tentukan nama properti di partitionKeyProperty bidang . Ini berarti bahwa pesan dengan nilai properti yang sama dikirim ke partisi yang sama. Ini dapat membantu mencapai urutan pesan dan lokalitas data yang lebih baik berdasarkan kriteria kustom.

Contoh penggunaan strategi penanganan partisi adalah:

partitionStrategy: property
partitionKeyProperty: device-id

Ini berarti bahwa pesan dengan properti id perangkat yang sama dikirim ke partisi yang sama.

Rute

Bidang rute menentukan daftar rute untuk transfer data antara topik MQTT dan topik Kafka. Setiap rute dapat memiliki mqttToKafka bidang atau kafkaToMqtt , tergantung pada arah transfer data.

MQTT ke Kafka

Bidang mqttToKafka menentukan rute yang mentransfer data dari topik MQTT ke topik Kafka.

Bidang Deskripsi Wajib
nama Nama unik untuk rute. Ya
mqttTopic Topik MQTT untuk berlangganan. Anda dapat menggunakan karakter kartubebas (# dan +) untuk mencocokkan beberapa topik. Ya
kafkaTopic Topik Kafka yang akan dikirim. Ya
kafkaAcks Jumlah pengakuan yang diperlukan konektor dari titik akhir Kafka. Nilai yang mungkin adalah: zero , one, atau all. No
qos Tingkat kualitas layanan (QoS) untuk langganan topik MQTT. Nilai yang mungkin adalah: 0 atau 1 (default). QoS 2 saat ini tidak didukung. Ya
sharedSubscription Konfigurasi untuk menggunakan langganan bersama untuk topik MQTT. groupNameTentukan , yang merupakan pengidentifikasi unik untuk sekelompok pelanggan, dan groupMinimumShareNumber, yang merupakan jumlah pelanggan dalam grup yang menerima pesan dari topik. Misalnya, jika groupName adalah "group1" dan groupMinimumShareNumber adalah 3, maka konektor membuat tiga pelanggan dengan nama grup yang sama untuk menerima pesan dari topik. Fitur ini memungkinkan Anda mendistribusikan pesan di antara beberapa pelanggan tanpa kehilangan pesan apa pun atau membuat duplikat. No

Contoh penggunaan mqttToKafka rute:

mqttToKafka:
  mqttTopic: temperature-alerts/#
  kafkaTopic: receiving-event-hub
  kafkaAcks: one
  qos: 1
  sharedSubscription:
    groupName: group1
    groupMinimumShareNumber: 3

Dalam contoh ini, pesan dari topik MQTT yang cocok dengan peringatan suhu/# dikirim ke topik Kafka receiving-event-hub dengan QoS setara 1 dan grup langganan bersama "group1" dengan nomor berbagi 3.

Kafka ke MQTT

Bidang kafkaToMqtt menentukan rute yang mentransfer data dari topik Kafka ke topik MQTT.

Bidang Deskripsi Wajib
nama Nama unik untuk rute. Ya
kafkaTopic Topik Kafka untuk ditarik. Ya
mqttTopic Topik MQTT untuk diterbitkan. Ya
consumerGroupId Awalan ID grup konsumen untuk setiap rute Kafka ke MQTT. Jika tidak diatur, ID grup konsumen diatur ke sama dengan nama rute. No
qos Tingkat kualitas layanan (QoS) untuk pesan yang diterbitkan ke topik MQTT. Nilai yang mungkin adalah 0 atau 1 (default). QoS 2 saat ini tidak didukung. Jika QoS diatur ke 1, konektor menerbitkan pesan ke topik MQTT lalu menunggu ack sebelum menerapkan pesan kembali ke Kafka. Untuk QoS 0, konektor segera berkomitmen kembali tanpa Ack MQTT. No

Contoh penggunaan kafkaToMqtt rute:

kafkaToMqtt:
  kafkaTopic: sending-event-hub
  mqttTopic: heater-commands
  qos: 0

Dalam contoh ini, pesan dari topik Kafka sending-event-hub* diterbitkan ke topik MQTT heater-commands dengan QoS level 0.

Nama hub peristiwa harus cocok dengan topik Kafka

Setiap hub peristiwa individual yang tidak diberi nama namespace harus sama persis dengan topik Kafka yang dimaksudkan yang ditentukan dalam rute. Selain itu, string koneksi EntityPath harus cocok jika string koneksi dilingkup ke satu hub peristiwa. Persyaratan ini karena namespace Layanan Azure Event Hubs dianalogikan dengan kluster Kafka dan nama hub peristiwa dianalogikan dengan topik Kafka, sehingga nama topik Kafka harus cocok dengan nama hub peristiwa.

Offset grup konsumen Kafka

Jika konektor terputus atau dihapus dan diinstal ulang dengan ID grup konsumen Kafka yang sama, offset grup konsumen (posisi terakhir dari tempat pesan baca konsumen Kafka) disimpan di Azure Event Hubs. Untuk mempelajari lebih lanjut, lihat Grup konsumen Azure Event Hubs vs. Grup konsumen Kafka.

Versi MQTT

Konektor ini hanya menggunakan MQTT v5.

Menerbitkan dan berlangganan pesan MQTT menggunakan Pratinjau Azure IoT MQ