إرسال واستقبال الرسائل بين Azure IoT MQ Preview وAzure Event Hubs أو Kafka
هام
معاينة عمليات Azure IoT - التي تم تمكينها بواسطة Azure Arc قيد المعاينة حاليا. يجب عدم استخدام برنامج المعاينة هذا في بيئات الإنتاج.
للحصول على الشروط القانونية التي تنطبق على ميزات Azure الموجودة في الإصدار التجريبي، أو المعاينة، أو التي لم يتم إصدارها بعد في التوفر العام، راجع شروط الاستخدام التكميلية لمعاينات Microsoft Azure.
يدفع موصل Kafka الرسائل من وسيط MQTT لمعاينة Azure IoT MQ إلى نقطة نهاية Kafka، وبالمثل يسحب الرسائل بالطريقة الأخرى. نظرا لأن Azure Event Hubs يدعم Kafka API، يعمل الموصل خارج الصندوق مع مراكز الأحداث.
تكوين موصل مراكز الأحداث عبر نقطة نهاية Kafka
بشكل افتراضي، لا يتم تثبيت الموصل مع Azure IoT MQ. يجب تمكينه بشكل صريح مع تعيين الموضوع وبيانات اعتماد المصادقة المحددة. اتبع هذه الخطوات لتمكين الاتصال ثنائي الاتجاه بين IoT MQ وAzure Event Hubs من خلال نقطة نهاية Kafka الخاصة به.
إنشاء مركز أحداث لكل موضوع Kafka.
منح الموصل حق الوصول إلى مساحة اسم مراكز الأحداث
منح وصول ملحق IoT MQ Arc إلى مساحة اسم مراكز الأحداث هو الطريقة الأكثر ملاءمة لإنشاء اتصال آمن من موصل Kakfa الخاص ب IoT MQ إلى مراكز الأحداث.
احفظ قالب Bicep التالي إلى ملف وقم بتطبيقه باستخدام Azure CLI بعد تعيين المعلمات الصالحة للبيئة الخاصة بك:
إشعار
يفترض قالب Bicep نظام المجموعة المتداخلة Arc ومساحة اسم مراكز الأحداث في نفس مجموعة الموارد، اضبط القالب إذا كانت بيئتك مختلفة.
@description('Location for cloud resources')
param mqExtensionName string = 'mq'
param clusterName string = 'clusterName'
param eventHubNamespaceName string = 'default'
resource connectedCluster 'Microsoft.Kubernetes/connectedClusters@2021-10-01' existing = {
name: clusterName
}
resource mqExtension 'Microsoft.KubernetesConfiguration/extensions@2022-11-01' existing = {
name: mqExtensionName
scope: connectedCluster
}
resource ehNamespace 'Microsoft.EventHub/namespaces@2021-11-01' existing = {
name: eventHubNamespaceName
}
// Role assignment for Event Hubs Data Receiver role
resource roleAssignmentDataReceiver 'Microsoft.Authorization/roleAssignments@2022-04-01' = {
name: guid(ehNamespace.id, mqExtension.id, '7f951dda-4ed3-4680-a7ca-43fe172d538d')
scope: ehNamespace
properties: {
// ID for Event Hubs Data Receiver role is a638d3c7-ab3a-418d-83e6-5f17a39d4fde
roleDefinitionId: resourceId('Microsoft.Authorization/roleDefinitions', 'a638d3c7-ab3a-418d-83e6-5f17a39d4fde')
principalId: mqExtension.identity.principalId
principalType: 'ServicePrincipal'
}
}
// Role assignment for Event Hubs Data Sender role
resource roleAssignmentDataSender 'Microsoft.Authorization/roleAssignments@2022-04-01' = {
name: guid(ehNamespace.id, mqExtension.id, '69b88ce2-a752-421f-bd8b-e230189e1d63')
scope: ehNamespace
properties: {
// ID for Event Hubs Data Sender role is 2b629674-e913-4c01-ae53-ef4638d8f975
roleDefinitionId: resourceId('Microsoft.Authorization/roleDefinitions', '2b629674-e913-4c01-ae53-ef4638d8f975')
principalId: mqExtension.identity.principalId
principalType: 'ServicePrincipal'
}
}
# Set the required environment variables
# Resource group for resources
RESOURCE_GROUP=xxx
# Bicep template files name
TEMPLATE_FILE_NAME=xxx
# MQ Arc extension name
MQ_EXTENSION_NAME=xxx
# Arc connected cluster name
CLUSTER_NAME=xxx
# Event Hubs namespace name
EVENTHUB_NAMESPACE=xxx
az deployment group create \
--name assign-RBAC-roles \
--resource-group $RESOURCE_GROUP \
--template-file $TEMPLATE_FILE_NAME \
--parameters mqExtensionName=$MQ_EXTENSION_NAME \
--parameters clusterName=$CLUSTER_NAME \
--parameters eventHubNamespaceName=$EVENTHUB_NAMESPACE
Kafka الاتصال or
يسمح لك مورد Kafka الاتصال or المخصص (CR) بتكوين موصل Kafka يمكنه توصيل مضيف Kafka ومراكز الأحداث. يمكن لموصل Kafka نقل البيانات بين مواضيع MQTT وموضوعات Kafka، باستخدام مراكز الأحداث كنقطة نهاية متوافقة مع Kafka.
يظهر المثال التالي Kafka الاتصال or CR الذي يتصل بنقطة نهاية Event Hubs باستخدام هوية Azure ل IoT MQ، يفترض أنه تم تثبيت موارد MQ أخرى باستخدام التشغيل السريع:
apiVersion: mq.iotoperations.azure.com/v1beta1
kind: KafkaConnector
metadata:
name: my-eh-connector
namespace: azure-iot-operations # same as one used for other MQ resources
spec:
image:
pullPolicy: IfNotPresent
repository: mcr.microsoft.com/azureiotoperations/kafka
tag: 0.4.0-preview
instances: 2
clientIdPrefix: my-prefix
kafkaConnection:
# Port 9093 is Event Hubs' Kakfa endpoint
# Plug in your Event Hubs namespace name
endpoint: <NAMESPACE>.servicebus.windows.net:9093
tls:
tlsEnabled: true
authentication:
enabled: true
authType:
systemAssignedManagedIdentity:
# plugin in your Event Hubs namespace name
audience: "https://<NAMESPACE>.servicebus.windows.net"
localBrokerConnection:
endpoint: "aio-mq-dmqtt-frontend:8883"
tls:
tlsEnabled: true
trustedCaCertificateConfigMap: "aio-ca-trust-bundle-test-only"
authentication:
kubernetes: {}
يصف الجدول التالي الحقول في Kafka الاتصال or CR:
الحقل | الوصف | مطلوب |
---|---|---|
صورة | صورة موصل Kafka. يمكنك تحديد pullPolicy و repository و tag للصورة. يتم عرض القيم الافتراضية في المثال السابق. |
نعم |
المثيلات | عدد مثيلات موصل Kafka لتشغيله. | نعم |
clientIdPrefix | السلسلة المراد إلحاقها بمعرف عميل يستخدمه الموصل. | لا |
kafka الاتصال ion | تفاصيل الاتصال لنقطة نهاية مراكز الأحداث. راجع Kafka الاتصال ion. | نعم |
localBroker الاتصال ion | تفاصيل الاتصال للوسيط المحلي الذي يتجاوز اتصال الوسيط الافتراضي. راجع إدارة اتصال الوسيط المحلي. | لا |
logLevel | مستوى السجل لموصل Kafka. القيم المحتملة هي: تتبع أو تصحيح أو معلومات أو تحذير أو خطأ أو خطأ فادح. الافتراضي هو التحذير. | لا |
اتصال Kafka
kafkaConnection
يعرف الحقل تفاصيل الاتصال لنقطة نهاية Kafka.
الحقل | الوصف | مطلوب |
---|---|---|
نقطة النهاية | مضيف ومنفذ نقطة نهاية مراكز الأحداث. عادة ما يكون المنفذ 9093. يمكنك تحديد نقاط نهاية متعددة مفصولة بفواصل لاستخدام بناء جملة خوادم bootstrap. | نعم |
Tls | تكوين تشفير TLS. راجع TLS. | نعم |
المصادقة | تكوين المصادقة. راجع المصادقة. | لا |
TLS
tls
يتيح الحقل تشفير TLS للاتصال ويحدد اختياريا خريطة تكوين CA.
الحقل | الوصف | مطلوب |
---|---|---|
tlsEnabled | قيمة منطقية تشير إلى ما إذا كان تشفير TLS ممكنا أم لا. يجب تعيينه إلى true لاتصال Event Hubs. | نعم |
trustedCaCertificateConfigMap | اسم مخطط التكوين الذي يحتوي على شهادة المرجع المصدق للتحقق من هوية الخادم. هذا الحقل غير مطلوب لاتصال مراكز الأحداث، حيث تستخدم مراكز الأحداث المراجع المصدقة المعروفة الموثوق بها بشكل افتراضي. ومع ذلك، يمكنك استخدام هذا الحقل إذا كنت تريد استخدام شهادة CA مخصصة. | لا |
عند تحديد مرجع مصدق موثوق به مطلوب، قم بإنشاء ConfigMap يحتوي على جرعة عامة من المرجع المصدق بتنسيق PEM، وحدد الاسم في الخاصية trustedCaCertificateConfigMap
.
kubectl create configmap ca-pem --from-file path/to/ca.pem
المصادقة
يدعم حقل المصادقة أنواعا مختلفة من أساليب المصادقة، مثل SASL أو X509 أو الهوية المدارة.
الحقل | الوصف | مطلوب |
---|---|---|
مُمكَّن | قيمة منطقية تشير إلى ما إذا كانت المصادقة ممكنة أم لا. | نعم |
authType | حقل يحتوي على نوع المصادقة المستخدم. راجع نوع المصادقة |
نوع المصادقة
الحقل | الوصف | مطلوب |
---|---|---|
Sasl | تكوين مصادقة SASL. saslType حدد ، الذي يمكن أن يكون عاديا أو scramSha256 أو scramSha512، وللإشارة token إلى سر Kubernetes secretName أو Azure Key Vault keyVault الذي يحتوي على كلمة المرور. |
نعم، إذا كنت تستخدم مصادقة SASL |
systemAssignedManagedIdentity | تكوين مصادقة الهوية المدارة. حدد الجمهور لطلب الرمز المميز، والذي يجب أن يتطابق مع مساحة اسم مراكز الأحداث (https://<NAMESPACE>.servicebus.windows.net ) لأن الموصل عميل Kafka. يتم إنشاء هوية مدارة معينة من قبل النظام تلقائيا وتعيينها إلى الموصل عند تمكينها. |
نعم، إذا كنت تستخدم مصادقة الهوية المدارة |
x509 | تكوين مصادقة X509. secretName حدد الحقل أو keyVault . secretName الحقل هو اسم السر الذي يحتوي على شهادة العميل ومفتاح العميل بتنسيق PEM، المخزنة كسر TLS. |
نعم، إذا كنت تستخدم مصادقة X509 |
لمعرفة كيفية استخدام Azure Key Vault وإدارة keyVault
البيانات السرية ل Azure IoT MQ بدلا من أسرار Kubernetes، راجع إدارة الأسرار باستخدام Azure Key Vault أو أسرار Kubernetes.
المصادقة على مراكز الأحداث
للاتصال بمراكز الأحداث باستخدام سر سلسلة الاتصال وKubernetes، استخدم plain
نوع SASL واسم $ConnectionString
المستخدم سلسلة الاتصال الكامل ككلمة مرور. أولا إنشاء سر Kubernetes:
kubectl create secret generic cs-secret -n azure-iot-operations \
--from-literal=username='$ConnectionString' \
--from-literal=password='Endpoint=sb://<NAMESPACE>.servicebus.windows.net/;SharedAccessKeyName=<KEY_NAME>;SharedAccessKey=<KEY>'
ثم، قم بالإشارة إلى السر في التكوين:
authentication:
enabled: true
authType:
sasl:
saslType: plain
token:
secretName: cs-secret
لاستخدام Azure Key Vault بدلا من أسرار Kubernetes، قم بإنشاء سر Azure Key Vault مع سلسلة الاتصال Endpoint=sb://..
، وقم بالرجوع إليه باستخدام vaultSecret
، وحدد اسم المستخدم كما هو الحال "$ConnectionString"
في التكوين.
authentication:
enabled: true
authType:
sasl:
saslType: plain
token:
keyVault:
username: "$ConnectionString"
vault:
name: my-key-vault
directoryId: <AKV directory ID>
credentials:
servicePrincipalLocalSecretName: aio-akv-sp
vaultSecret:
name: my-cs # Endpoint=sb://..
# version: 939ecc2...
لاستخدام الهوية المدارة، حددها على أنها الطريقة الوحيدة ضمن المصادقة. تحتاج أيضا إلى تعيين دور للهوية المدارة التي تمنح الإذن لإرسال واستقبال الرسائل من مراكز الأحداث، مثل مالك بيانات مراكز الأحداث أو مرسل/متلقي بيانات مراكز الأحداث Azure. لمعرفة المزيد، راجع مصادقة تطبيق باستخدام معرف Microsoft Entra للوصول إلى موارد مراكز الأحداث.
authentication:
enabled: true
authType:
systemAssignedManagedIdentity:
audience: https://<NAMESPACE>.servicebus.windows.net
X.509
بالنسبة إلى X.509، استخدم سر Kubernetes TLS الذي يحتوي على الشهادة العامة والمفتاح الخاص.
kubectl create secret tls my-tls-secret -n azure-iot-operations \
--cert=path/to/cert/file \
--key=path/to/key/file
ثم حدد في secretName
التكوين.
authentication:
enabled: true
authType:
x509:
secretName: my-tls-secret
لاستخدام Azure Key Vault بدلا من ذلك، تأكد من استيراد الشهادة والمفتاح الخاص بشكل صحيح ثم حدد المرجع باستخدام vaultCert
.
authentication:
enabled: true
authType:
x509:
keyVault:
vault:
name: my-key-vault
directoryId: <AKV directory ID>
credentials:
servicePrincipalLocalSecretName: aio-akv-sp
vaultCert:
name: my-cert
# version: 939ecc2...
## If presenting full chain also
# vaultCaChainSecret:
# name: my-chain
أو، إذا كان تقديم السلسلة الكاملة مطلوبا، فحمل شهادة السلسلة الكاملة والمفتاح إلى AKV كملف PFX واستخدم vaultCaChainSecret
الحقل بدلا من ذلك.
# ...
keyVault:
vaultCaChainSecret:
name: my-cert
# version: 939ecc2...
إدارة اتصال الوسيط المحلي
مثل جسر MQTT، يعمل موصل Event Hubs كجهة عميل إلى وسيط IoT MQ MQTT. إذا قمت بتخصيص منفذ وحدة الاستماع و/أو مصادقة وسيط MQTT IoT MQTT، فتجاوز تكوين اتصال MQTT المحلي لموصل Event Hubs أيضا. لمعرفة المزيد، راجع وصلة وسيط MQTT المحلية.
Kafka الاتصال orTopicMap
يسمح لك مورد Kafka الاتصال orTopicMap المخصص (CR) بتحديد التعيين بين مواضيع MQTT وموضوعات Kafka لنقل البيانات ثنائية الاتجاه. حدد مرجعا إلى Kafka الاتصال or CR وقائمة بالمسارات. يمكن أن يكون كل مسار إما مسار MQTT إلى Kafka أو مسار Kafka إلى MQTT. على سبيل المثال:
apiVersion: mq.iotoperations.azure.com/v1beta1
kind: KafkaConnectorTopicMap
metadata:
name: my-eh-topic-map
namespace: <SAME NAMESPACE AS BROKER> # For example "default"
spec:
kafkaConnectorRef: my-eh-connector
compression: snappy
batching:
enabled: true
latencyMs: 1000
maxMessages: 100
maxBytes: 1024
partitionStrategy: property
partitionKeyProperty: device-id
copyMqttProperties: true
routes:
# Subscribe from MQTT topic "temperature-alerts/#" and send to Kafka topic "receiving-event-hub"
- mqttToKafka:
name: "route1"
mqttTopic: temperature-alerts/#
kafkaTopic: receiving-event-hub
kafkaAcks: one
qos: 1
sharedSubscription:
groupName: group1
groupMinimumShareNumber: 3
# Pull from kafka topic "sending-event-hub" and publish to MQTT topic "heater-commands"
- kafkaToMqtt:
name: "route2"
consumerGroupId: mqConnector
kafkaTopic: sending-event-hub
mqttTopic: heater-commands
qos: 0
يصف الجدول التالي الحقول في Kafka الاتصال orTopicMap CR:
الحقل | الوصف | مطلوب |
---|---|---|
kafka الاتصال orRef | اسم Kafka الاتصال or CR الذي تنتمي إليه خريطة الموضوع هذه. | نعم |
ضغط | تكوين ضغط الرسائل المرسلة إلى مواضيع Kafka. راجع الضغط. | لا |
التجميع | تكوين الإرسال في دفعات للرسائل المرسلة إلى مواضيع Kafka. راجع الإرسال في دفعات. | لا |
partitionStrategy | استراتيجية التعامل مع أقسام Kafka عند إرسال الرسائل إلى مواضيع Kafka. راجع استراتيجية معالجة القسم. | لا |
copyMqttProperties | قيمة منطقية للتحكم في ما إذا تم نسخ نظام MQTT وخصائص المستخدم إلى رأس رسالة Kafka. يتم نسخ خصائص المستخدم كما هي. يتم إجراء بعض التحويل باستخدام خصائص النظام. الإعدادات الافتراضية للخطأ. | لا |
المسارات | قائمة مسارات نقل البيانات بين مواضيع MQTT وموضوعات Kafka. يمكن أن يكون لكل مسار حقل mqttToKafka أو kafkaToMqtt ، اعتمادا على اتجاه نقل البيانات. راجع المسارات. |
نعم |
الضغط
يتيح حقل الضغط الضغط للرسائل المرسلة إلى مواضيع Kafka. يساعد الضغط على تقليل النطاق الترددي للشبكة ومساحة التخزين المطلوبة لنقل البيانات. ومع ذلك، يضيف الضغط أيضا بعض النفقات العامة وزمن الانتقال إلى العملية. يتم سرد أنواع الضغط المدعومة في الجدول التالي.
قيمة | الوصف |
---|---|
لا شيء | لا يتم تطبيق أي ضغط أو دفعات. لا شيء هو القيمة الافتراضية إذا لم يتم تحديد ضغط. |
gzip | يتم تطبيق ضغط GZIP والدفعات. GZIP هي خوارزمية ضغط للأغراض العامة توفر توازنا جيدا بين نسبة الضغط والسرعة. |
لاذع | يتم تطبيق الضغط الانطباقي والتقاط الدفعات. Snappy هي خوارزمية ضغط سريعة توفر نسبة ضغط متوسطة وسرعة. |
lz4 | يتم تطبيق ضغط LZ4 والدفعات. LZ4 هي خوارزمية ضغط سريعة توفر نسبة ضغط منخفضة وسرعة عالية. |
الدفعات
وبصرف النظر عن الضغط، يمكنك أيضا تكوين دفعات للرسائل قبل إرسالها إلى مواضيع Kafka. يسمح لك الإرسال في دفعات بتجميع رسائل متعددة معا وضغطها كوحدة واحدة، ما يمكن أن يحسن كفاءة الضغط ويقلل من حمل الشبكة.
الحقل | الوصف | مطلوب |
---|---|---|
مُمكَّن | قيمة منطقية تشير إلى ما إذا كان الإرسال في دفعات ممكنا أم لا. إذا لم يتم تعيينها، تكون القيمة الافتراضية خاطئة. | نعم |
زمن الانتقال | الحد الأقصى للفاصل الزمني بالمللي ثانية الذي يمكن تخزين الرسائل مؤقتا قبل إرسالها. إذا تم الوصول إلى هذا الفاصل الزمني، فسيتم إرسال جميع الرسائل المخزنة مؤقتا كدفعة، بغض النظر عن عددها أو حجمها. إذا لم يتم تعيين القيمة الافتراضية هي 5. | لا |
الحد الأقصى من الرسائل | الحد الأقصى لعدد الرسائل التي يمكن تخزينها مؤقتا قبل إرسالها. إذا تم الوصول إلى هذا الرقم، فسيتم إرسال جميع الرسائل المخزنة مؤقتا كدفعة، بغض النظر عن حجمها أو مدة تخزينها مؤقتا. إذا لم يتم تعيين القيمة الافتراضية هي 100000. | لا |
maxBytes | الحد الأقصى للحجم بالبايت الذي يمكن تخزينه مؤقتا قبل إرساله. إذا تم الوصول إلى هذا الحجم، فسيتم إرسال جميع الرسائل المخزنة مؤقتا كدفعة، بغض النظر عن عددها أو مدة تخزينها مؤقتا. القيمة الافتراضية هي 1000000 (1 ميغابايت). | لا |
مثال على استخدام الدفعات هو:
batching:
enabled: true
latencyMs: 1000
maxMessages: 100
maxBytes: 1024
وهذا يعني أنه يتم إرسال الرسائل إما عندما يكون هناك 100 رسالة في المخزن المؤقت، أو عندما يكون هناك 1024 بايت في المخزن المؤقت، أو عندما تنقضي 1000 مللي ثانية منذ آخر إرسال، أيهما يأتي أولا.
استراتيجية معالجة التقسيم
استراتيجية معالجة القسم هي ميزة تسمح لك بالتحكم في كيفية تعيين الرسائل إلى أقسام Kafka عند إرسالها إلى مواضيع Kafka. أقسام Kafka هي مقاطع منطقية لموضوع Kafka التي تمكن المعالجة المتوازية والتسامح مع الخطأ. تحتوي كل رسالة في موضوع Kafka على قسم وإزاحة تستخدم لتحديد الرسائل وترتيبها.
بشكل افتراضي، يقوم موصل Kafka بتعيين رسائل إلى أقسام عشوائية، باستخدام خوارزمية الترتيب الدوري. ومع ذلك، يمكنك استخدام استراتيجيات مختلفة لتعيين رسائل إلى أقسام استنادا إلى بعض المعايير، مثل اسم موضوع MQTT أو خاصية رسالة MQTT. يمكن أن يساعدك هذا في تحقيق موازنة تحميل أفضل أو منطقة بيانات أو ترتيب رسائل.
قيمة | الوصف |
---|---|
افتراضي | تعيين رسائل إلى أقسام عشوائية، باستخدام خوارزمية الترتيب الدوري. إنها القيمة الافتراضية إذا لم يتم تحديد استراتيجية. |
ثابت | تعيين رسائل إلى رقم قسم ثابت مشتق من معرف المثيل للموصل. وهذا يعني أن كل مثيل موصل يرسل رسائل إلى قسم مختلف. يمكن أن يساعد هذا في تحقيق موازنة تحميل أفضل ومحة البيانات. |
الموضوع | يستخدم اسم موضوع MQTT كمفتاح للتقسيم. وهذا يعني أن الرسائل التي تحمل نفس اسم موضوع MQTT يتم إرسالها إلى نفس القسم. يمكن أن يساعد هذا في تحقيق ترتيب أفضل للرسائل ومحلية البيانات. |
خاصية | يستخدم خاصية رسالة MQTT كمفتاح للتقسيم. حدد اسم الخاصية في partitionKeyProperty الحقل. وهذا يعني أن الرسائل التي لها نفس قيمة الخاصية يتم إرسالها إلى نفس القسم. يمكن أن يساعد هذا في تحقيق ترتيب أفضل للرسائل ومحلية البيانات استنادا إلى معيار مخصص. |
مثال على استخدام استراتيجية معالجة القسم هو:
partitionStrategy: property
partitionKeyProperty: device-id
وهذا يعني أن الرسائل التي لها نفس خاصية معرف الجهاز يتم إرسالها إلى نفس القسم.
مسارات
يحدد حقل المسارات قائمة بالمسارات لنقل البيانات بين مواضيع MQTT وموضوعات Kafka. يمكن أن يكون لكل مسار حقل mqttToKafka
أو kafkaToMqtt
، اعتمادا على اتجاه نقل البيانات.
MQTT إلى Kafka
mqttToKafka
يعرف الحقل مسارا ينقل البيانات من موضوع MQTT إلى موضوع Kafka.
الحقل | الوصف | مطلوب |
---|---|---|
الاسم | اسم فريد للمسار. | نعم |
mqttTopic | موضوع MQTT للاشتراك منه. يمكنك استخدام أحرف البدل (# و + ) لمطابقة مواضيع متعددة. |
نعم |
kafkaTopic | موضوع Kafka لإرساله. | نعم |
kafkaAcks | عدد الإقرارات التي يتطلبها الموصل من نقطة نهاية Kafka. القيم المحتملة هي: zero أو one أو all . |
لا |
Qos | مستوى جودة الخدمة (QoS) لاشتراك موضوع MQTT. القيم المحتملة هي: 0 أو 1 (افتراضي). QoS 2 غير مدعوم حاليا. | نعم |
الاشتراك المشترك | تكوين استخدام الاشتراكات المشتركة لمواضيع MQTT. groupName حدد ، وهو معرف فريد لمجموعة من المشتركين، وgroupMinimumShareNumber ، وهو عدد المشتركين في مجموعة تتلقى رسائل من موضوع. على سبيل المثال، إذا كان groupName هو "group1" وكان groupMinimumShareNumber هو 3، فسينشئ الموصل ثلاثة مشتركين بنفس اسم المجموعة لتلقي رسائل من موضوع. تسمح لك هذه الميزة بتوزيع الرسائل بين العديد من المشتركين دون فقدان أي رسائل أو إنشاء تكرارات. |
لا |
مثال على استخدام mqttToKafka
المسار:
mqttToKafka:
mqttTopic: temperature-alerts/#
kafkaTopic: receiving-event-hub
kafkaAcks: one
qos: 1
sharedSubscription:
groupName: group1
groupMinimumShareNumber: 3
في هذا المثال، يتم إرسال الرسائل من مواضيع MQTT التي تطابق تنبيهات درجة الحرارة/# إلى موضوع Kafka receiving-event-hub مع QoS المكافئ 1 ومجموعة الاشتراك المشتركة "group1" مع رقم المشاركة 3.
Kafka إلى MQTT
kafkaToMqtt
يعرف الحقل مسارا ينقل البيانات من موضوع Kafka إلى موضوع MQTT.
الحقل | الوصف | مطلوب |
---|---|---|
الاسم | اسم فريد للمسار. | نعم |
kafkaTopic | موضوع Kafka للسحب منه. | نعم |
mqttTopic | موضوع MQTT للنشر إليه. | نعم |
معرف مجموعة المستهلكين | بادئة معرف مجموعة المستهلكين لكل مسار Kafka إلى MQTT. إذا لم يتم تعيينه، يتم تعيين معرف مجموعة المستهلكين إلى نفس اسم المسار. | لا |
Qos | مستوى جودة الخدمة (QoS) للرسائل المنشورة في موضوع MQTT. القيم المحتملة هي 0 أو 1 (افتراضي). QoS 2 غير مدعوم حاليا. إذا تم تعيين QoS إلى 1، ينشر الموصل الرسالة إلى موضوع MQTT ثم ينتظر ack قبل تثبيت الرسالة مرة أخرى إلى Kafka. بالنسبة إلى QoS 0، يلتزم الموصل مرة أخرى على الفور دون ack MQTT. | لا |
مثال على استخدام kafkaToMqtt
المسار:
kafkaToMqtt:
kafkaTopic: sending-event-hub
mqttTopic: heater-commands
qos: 0
في هذا المثال، يتم نشر الرسائل من موضوع Kafka sending-event-hub* إلى MQTT topic heater-commands مع مستوى QoS 0.
يجب أن يتطابق اسم مركز الأحداث مع موضوع Kafka
يجب تسمية كل مركز حدث فردي ليس مساحة الاسم تماما مثل موضوع Kafka المقصود المحدد في المسارات. أيضا، يجب أن يتطابق سلسلة الاتصال EntityPath
إذا تم تحديد نطاق سلسلة الاتصال إلى مركز أحداث واحد. هذا المطلب لأن مساحة اسم مراكز الأحداث مماثلة لنظام مجموعة Kafka واسم مركز الحدث مشابه لموضوع Kafka، لذلك يجب أن يتطابق اسم موضوع Kafka مع اسم مركز الحدث.
إزاحات مجموعة مستهلكين Kafka
إذا قطع الموصل الاتصال أو تمت إزالته وإعادة تثبيته بنفس معرف مجموعة مستهلك Kafka، يتم تخزين إزاحة مجموعة المستهلكين (الموضع الأخير من حيث قراءة رسائل مستهلك Kafka) في مراكز أحداث Azure. لمعرفة المزيد، راجع مجموعة مستهلكين مراكز الأحداث مقابل مجموعة مستهلكين Kafka.
إصدار MQTT
يستخدم هذا الموصل MQTT v5 فقط.