إرسال واستقبال الرسائل بين Azure IoT MQ Preview وAzure Event Hubs أو Kafka

هام

معاينة عمليات Azure IoT - التي تم تمكينها بواسطة Azure Arc قيد المعاينة حاليا. يجب عدم استخدام برنامج المعاينة هذا في بيئات الإنتاج.

للحصول على الشروط القانونية التي تنطبق على ميزات Azure الموجودة في الإصدار التجريبي، أو المعاينة، أو التي لم يتم إصدارها بعد في التوفر العام، راجع شروط الاستخدام التكميلية لمعاينات Microsoft Azure.

يدفع موصل Kafka الرسائل من وسيط MQTT لمعاينة Azure IoT MQ إلى نقطة نهاية Kafka، وبالمثل يسحب الرسائل بالطريقة الأخرى. نظرا لأن Azure Event Hubs يدعم Kafka API، يعمل الموصل خارج الصندوق مع مراكز الأحداث.

تكوين موصل مراكز الأحداث عبر نقطة نهاية Kafka

بشكل افتراضي، لا يتم تثبيت الموصل مع Azure IoT MQ. يجب تمكينه بشكل صريح مع تعيين الموضوع وبيانات اعتماد المصادقة المحددة. اتبع هذه الخطوات لتمكين الاتصال ثنائي الاتجاه بين IoT MQ وAzure Event Hubs من خلال نقطة نهاية Kafka الخاصة به.

  1. إنشاء مساحة اسم "مراكز الأحداث".

  2. إنشاء مركز أحداث لكل موضوع Kafka.

منح الموصل حق الوصول إلى مساحة اسم مراكز الأحداث

منح وصول ملحق IoT MQ Arc إلى مساحة اسم مراكز الأحداث هو الطريقة الأكثر ملاءمة لإنشاء اتصال آمن من موصل Kakfa الخاص ب IoT MQ إلى مراكز الأحداث.

احفظ قالب Bicep التالي إلى ملف وقم بتطبيقه باستخدام Azure CLI بعد تعيين المعلمات الصالحة للبيئة الخاصة بك:

إشعار

يفترض قالب Bicep نظام المجموعة المتداخلة Arc ومساحة اسم مراكز الأحداث في نفس مجموعة الموارد، اضبط القالب إذا كانت بيئتك مختلفة.

@description('Location for cloud resources')
param mqExtensionName string = 'mq'
param clusterName string = 'clusterName'
param eventHubNamespaceName string = 'default'

resource connectedCluster 'Microsoft.Kubernetes/connectedClusters@2021-10-01' existing = {
  name: clusterName
}

resource mqExtension 'Microsoft.KubernetesConfiguration/extensions@2022-11-01' existing = {
  name: mqExtensionName
  scope: connectedCluster
}

resource ehNamespace 'Microsoft.EventHub/namespaces@2021-11-01' existing = {
  name: eventHubNamespaceName
}

// Role assignment for Event Hubs Data Receiver role
resource roleAssignmentDataReceiver 'Microsoft.Authorization/roleAssignments@2022-04-01' = {
  name: guid(ehNamespace.id, mqExtension.id, '7f951dda-4ed3-4680-a7ca-43fe172d538d')
  scope: ehNamespace
  properties: {
     // ID for Event Hubs Data Receiver role is a638d3c7-ab3a-418d-83e6-5f17a39d4fde
    roleDefinitionId: resourceId('Microsoft.Authorization/roleDefinitions', 'a638d3c7-ab3a-418d-83e6-5f17a39d4fde') 
    principalId: mqExtension.identity.principalId
    principalType: 'ServicePrincipal'
  }
}

// Role assignment for Event Hubs Data Sender role
resource roleAssignmentDataSender 'Microsoft.Authorization/roleAssignments@2022-04-01' = {
  name: guid(ehNamespace.id, mqExtension.id, '69b88ce2-a752-421f-bd8b-e230189e1d63')
  scope: ehNamespace
  properties: {
    // ID for Event Hubs Data Sender role is 2b629674-e913-4c01-ae53-ef4638d8f975
    roleDefinitionId: resourceId('Microsoft.Authorization/roleDefinitions', '2b629674-e913-4c01-ae53-ef4638d8f975') 
    principalId: mqExtension.identity.principalId
    principalType: 'ServicePrincipal'
  }
}
# Set the required environment variables

# Resource group for resources
RESOURCE_GROUP=xxx

# Bicep template files name
TEMPLATE_FILE_NAME=xxx

# MQ Arc extension name
MQ_EXTENSION_NAME=xxx

# Arc connected cluster name
CLUSTER_NAME=xxx

# Event Hubs namespace name
EVENTHUB_NAMESPACE=xxx


az deployment group create \
      --name assign-RBAC-roles \
      --resource-group $RESOURCE_GROUP \
      --template-file $TEMPLATE_FILE_NAME \
      --parameters mqExtensionName=$MQ_EXTENSION_NAME \
      --parameters clusterName=$CLUSTER_NAME \
      --parameters eventHubNamespaceName=$EVENTHUB_NAMESPACE

Kafka الاتصال or

يسمح لك مورد Kafka الاتصال or المخصص (CR) بتكوين موصل Kafka يمكنه توصيل مضيف Kafka ومراكز الأحداث. يمكن لموصل Kafka نقل البيانات بين مواضيع MQTT وموضوعات Kafka، باستخدام مراكز الأحداث كنقطة نهاية متوافقة مع Kafka.

يظهر المثال التالي Kafka الاتصال or CR الذي يتصل بنقطة نهاية Event Hubs باستخدام هوية Azure ل IoT MQ، يفترض أنه تم تثبيت موارد MQ أخرى باستخدام التشغيل السريع:

apiVersion: mq.iotoperations.azure.com/v1beta1
kind: KafkaConnector
metadata:
  name: my-eh-connector
  namespace: azure-iot-operations # same as one used for other MQ resources
spec:
  image:
    pullPolicy: IfNotPresent
    repository: mcr.microsoft.com/azureiotoperations/kafka
    tag: 0.4.0-preview
  instances: 2
  clientIdPrefix: my-prefix
  kafkaConnection:
    # Port 9093 is Event Hubs' Kakfa endpoint
    # Plug in your Event Hubs namespace name
    endpoint: <NAMESPACE>.servicebus.windows.net:9093
    tls:
      tlsEnabled: true
    authentication:
      enabled: true
      authType:
        systemAssignedManagedIdentity:
          # plugin in your Event Hubs namespace name
          audience: "https://<NAMESPACE>.servicebus.windows.net" 
  localBrokerConnection:
    endpoint: "aio-mq-dmqtt-frontend:8883"
    tls:
      tlsEnabled: true
      trustedCaCertificateConfigMap: "aio-ca-trust-bundle-test-only"
    authentication:
      kubernetes: {}

يصف الجدول التالي الحقول في Kafka الاتصال or CR:

الحقل ‏‏الوصف مطلوب
صورة صورة موصل Kafka. يمكنك تحديد pullPolicyو repositoryو tag للصورة. يتم عرض القيم الافتراضية في المثال السابق. ‏‏نعم‬
المثيلات عدد مثيلات موصل Kafka لتشغيله. ‏‏نعم‬
clientIdPrefix السلسلة المراد إلحاقها بمعرف عميل يستخدمه الموصل. لا
kafka الاتصال ion تفاصيل الاتصال لنقطة نهاية مراكز الأحداث. راجع Kafka الاتصال ion. ‏‏نعم‬
localBroker الاتصال ion تفاصيل الاتصال للوسيط المحلي الذي يتجاوز اتصال الوسيط الافتراضي. راجع إدارة اتصال الوسيط المحلي. لا
logLevel مستوى السجل لموصل Kafka. القيم المحتملة هي: تتبع أو تصحيح أو معلومات أو تحذير أو خطأ أو خطأ فادح. الافتراضي هو التحذير. لا

اتصال Kafka

kafkaConnection يعرف الحقل تفاصيل الاتصال لنقطة نهاية Kafka.

الحقل ‏‏الوصف مطلوب
نقطة النهاية مضيف ومنفذ نقطة نهاية مراكز الأحداث. عادة ما يكون المنفذ 9093. يمكنك تحديد نقاط نهاية متعددة مفصولة بفواصل لاستخدام بناء جملة خوادم bootstrap. ‏‏نعم‬
Tls تكوين تشفير TLS. راجع TLS. ‏‏نعم‬
المصادقة تكوين المصادقة. راجع المصادقة. لا

TLS

tls يتيح الحقل تشفير TLS للاتصال ويحدد اختياريا خريطة تكوين CA.

الحقل ‏‏الوصف مطلوب
tlsEnabled قيمة منطقية تشير إلى ما إذا كان تشفير TLS ممكنا أم لا. يجب تعيينه إلى true لاتصال Event Hubs. ‏‏نعم‬
trustedCaCertificateConfigMap اسم مخطط التكوين الذي يحتوي على شهادة المرجع المصدق للتحقق من هوية الخادم. هذا الحقل غير مطلوب لاتصال مراكز الأحداث، حيث تستخدم مراكز الأحداث المراجع المصدقة المعروفة الموثوق بها بشكل افتراضي. ومع ذلك، يمكنك استخدام هذا الحقل إذا كنت تريد استخدام شهادة CA مخصصة. لا

عند تحديد مرجع مصدق موثوق به مطلوب، قم بإنشاء ConfigMap يحتوي على جرعة عامة من المرجع المصدق بتنسيق PEM، وحدد الاسم في الخاصية trustedCaCertificateConfigMap .

kubectl create configmap ca-pem --from-file path/to/ca.pem

المصادقة

يدعم حقل المصادقة أنواعا مختلفة من أساليب المصادقة، مثل SASL أو X509 أو الهوية المدارة.

الحقل ‏‏الوصف مطلوب
مُمكَّن قيمة منطقية تشير إلى ما إذا كانت المصادقة ممكنة أم لا. ‏‏نعم‬
authType حقل يحتوي على نوع المصادقة المستخدم. راجع نوع المصادقة
نوع المصادقة
الحقل ‏‏الوصف مطلوب
Sasl تكوين مصادقة SASL. saslTypeحدد ، الذي يمكن أن يكون عاديا أو scramSha256 أو scramSha512، وللإشارة token إلى سر Kubernetes secretName أو Azure Key Vault keyVault الذي يحتوي على كلمة المرور. نعم، إذا كنت تستخدم مصادقة SASL
systemAssignedManagedIdentity تكوين مصادقة الهوية المدارة. حدد الجمهور لطلب الرمز المميز، والذي يجب أن يتطابق مع مساحة اسم مراكز الأحداث (https://<NAMESPACE>.servicebus.windows.net) لأن الموصل عميل Kafka. يتم إنشاء هوية مدارة معينة من قبل النظام تلقائيا وتعيينها إلى الموصل عند تمكينها. نعم، إذا كنت تستخدم مصادقة الهوية المدارة
x509 تكوين مصادقة X509. secretName حدد الحقل أو keyVault . secretName الحقل هو اسم السر الذي يحتوي على شهادة العميل ومفتاح العميل بتنسيق PEM، المخزنة كسر TLS. نعم، إذا كنت تستخدم مصادقة X509

لمعرفة كيفية استخدام Azure Key Vault وإدارة keyVault البيانات السرية ل Azure IoT MQ بدلا من أسرار Kubernetes، راجع إدارة الأسرار باستخدام Azure Key Vault أو أسرار Kubernetes.

المصادقة على مراكز الأحداث

للاتصال بمراكز الأحداث باستخدام سر سلسلة الاتصال وKubernetes، استخدم plain نوع SASL واسم $ConnectionString المستخدم سلسلة الاتصال الكامل ككلمة مرور. أولا إنشاء سر Kubernetes:

kubectl create secret generic cs-secret -n azure-iot-operations \
  --from-literal=username='$ConnectionString' \
  --from-literal=password='Endpoint=sb://<NAMESPACE>.servicebus.windows.net/;SharedAccessKeyName=<KEY_NAME>;SharedAccessKey=<KEY>'

ثم، قم بالإشارة إلى السر في التكوين:

authentication:
  enabled: true
  authType:
    sasl:
      saslType: plain
      token:
        secretName: cs-secret

لاستخدام Azure Key Vault بدلا من أسرار Kubernetes، قم بإنشاء سر Azure Key Vault مع سلسلة الاتصال Endpoint=sb://..، وقم بالرجوع إليه باستخدام vaultSecret، وحدد اسم المستخدم كما هو الحال "$ConnectionString" في التكوين.

authentication:
  enabled: true
  authType:
    sasl:
      saslType: plain
      token:
        keyVault:
          username: "$ConnectionString"
          vault:
            name: my-key-vault
            directoryId: <AKV directory ID>
            credentials:
              servicePrincipalLocalSecretName: aio-akv-sp
          vaultSecret:
            name: my-cs # Endpoint=sb://..
            # version: 939ecc2...

لاستخدام الهوية المدارة، حددها على أنها الطريقة الوحيدة ضمن المصادقة. تحتاج أيضا إلى تعيين دور للهوية المدارة التي تمنح الإذن لإرسال واستقبال الرسائل من مراكز الأحداث، مثل مالك بيانات مراكز الأحداث أو مرسل/متلقي بيانات مراكز الأحداث Azure. لمعرفة المزيد، راجع مصادقة تطبيق باستخدام معرف Microsoft Entra للوصول إلى موارد مراكز الأحداث.

authentication:
  enabled: true
  authType:
    systemAssignedManagedIdentity:
      audience: https://<NAMESPACE>.servicebus.windows.net
X.509

بالنسبة إلى X.509، استخدم سر Kubernetes TLS الذي يحتوي على الشهادة العامة والمفتاح الخاص.

kubectl create secret tls my-tls-secret -n azure-iot-operations \
  --cert=path/to/cert/file \
  --key=path/to/key/file

ثم حدد في secretName التكوين.

authentication:
  enabled: true
  authType:
    x509:
      secretName: my-tls-secret

لاستخدام Azure Key Vault بدلا من ذلك، تأكد من استيراد الشهادة والمفتاح الخاص بشكل صحيح ثم حدد المرجع باستخدام vaultCert.

authentication:
  enabled: true
  authType:
    x509:
      keyVault:
        vault:
          name: my-key-vault
          directoryId: <AKV directory ID>
          credentials:
            servicePrincipalLocalSecretName: aio-akv-sp
        vaultCert:
          name: my-cert
          # version: 939ecc2...
        ## If presenting full chain also  
        # vaultCaChainSecret:
        #   name: my-chain

أو، إذا كان تقديم السلسلة الكاملة مطلوبا، فحمل شهادة السلسلة الكاملة والمفتاح إلى AKV كملف PFX واستخدم vaultCaChainSecret الحقل بدلا من ذلك.

# ...
keyVault:
  vaultCaChainSecret:
    name: my-cert
    # version: 939ecc2...

إدارة اتصال الوسيط المحلي

مثل جسر MQTT، يعمل موصل Event Hubs كجهة عميل إلى وسيط IoT MQ MQTT. إذا قمت بتخصيص منفذ وحدة الاستماع و/أو مصادقة وسيط MQTT IoT MQTT، فتجاوز تكوين اتصال MQTT المحلي لموصل Event Hubs أيضا. لمعرفة المزيد، راجع وصلة وسيط MQTT المحلية.

Kafka الاتصال orTopicMap

يسمح لك مورد Kafka الاتصال orTopicMap المخصص (CR) بتحديد التعيين بين مواضيع MQTT وموضوعات Kafka لنقل البيانات ثنائية الاتجاه. حدد مرجعا إلى Kafka الاتصال or CR وقائمة بالمسارات. يمكن أن يكون كل مسار إما مسار MQTT إلى Kafka أو مسار Kafka إلى MQTT. على سبيل المثال:

apiVersion: mq.iotoperations.azure.com/v1beta1
kind: KafkaConnectorTopicMap
metadata:
  name: my-eh-topic-map
  namespace: <SAME NAMESPACE AS BROKER> # For example "default"
spec:
  kafkaConnectorRef: my-eh-connector
  compression: snappy
  batching:
    enabled: true
    latencyMs: 1000
    maxMessages: 100
    maxBytes: 1024
  partitionStrategy: property
  partitionKeyProperty: device-id
  copyMqttProperties: true
  routes:
    # Subscribe from MQTT topic "temperature-alerts/#" and send to Kafka topic "receiving-event-hub"
    - mqttToKafka:
        name: "route1"
        mqttTopic: temperature-alerts/#
        kafkaTopic: receiving-event-hub
        kafkaAcks: one
        qos: 1
        sharedSubscription:
          groupName: group1
          groupMinimumShareNumber: 3
    # Pull from kafka topic "sending-event-hub" and publish to MQTT topic "heater-commands"
    - kafkaToMqtt:
        name: "route2"
        consumerGroupId: mqConnector
        kafkaTopic: sending-event-hub
        mqttTopic: heater-commands
        qos: 0

يصف الجدول التالي الحقول في Kafka الاتصال orTopicMap CR:

الحقل ‏‏الوصف مطلوب
kafka الاتصال orRef اسم Kafka الاتصال or CR الذي تنتمي إليه خريطة الموضوع هذه. ‏‏نعم‬
ضغط تكوين ضغط الرسائل المرسلة إلى مواضيع Kafka. راجع الضغط. لا
التجميع تكوين الإرسال في دفعات للرسائل المرسلة إلى مواضيع Kafka. راجع الإرسال في دفعات. لا
partitionStrategy استراتيجية التعامل مع أقسام Kafka عند إرسال الرسائل إلى مواضيع Kafka. راجع استراتيجية معالجة القسم. لا
copyMqttProperties قيمة منطقية للتحكم في ما إذا تم نسخ نظام MQTT وخصائص المستخدم إلى رأس رسالة Kafka. يتم نسخ خصائص المستخدم كما هي. يتم إجراء بعض التحويل باستخدام خصائص النظام. الإعدادات الافتراضية للخطأ. لا
المسارات قائمة مسارات نقل البيانات بين مواضيع MQTT وموضوعات Kafka. يمكن أن يكون لكل مسار حقل mqttToKafka أو kafkaToMqtt ، اعتمادا على اتجاه نقل البيانات. راجع المسارات. ‏‏نعم‬

الضغط

يتيح حقل الضغط الضغط للرسائل المرسلة إلى مواضيع Kafka. يساعد الضغط على تقليل النطاق الترددي للشبكة ومساحة التخزين المطلوبة لنقل البيانات. ومع ذلك، يضيف الضغط أيضا بعض النفقات العامة وزمن الانتقال إلى العملية. يتم سرد أنواع الضغط المدعومة في الجدول التالي.

قيمة ‏‏الوصف
لا شيء لا يتم تطبيق أي ضغط أو دفعات. لا شيء هو القيمة الافتراضية إذا لم يتم تحديد ضغط.
gzip يتم تطبيق ضغط GZIP والدفعات. GZIP هي خوارزمية ضغط للأغراض العامة توفر توازنا جيدا بين نسبة الضغط والسرعة.
لاذع يتم تطبيق الضغط الانطباقي والتقاط الدفعات. Snappy هي خوارزمية ضغط سريعة توفر نسبة ضغط متوسطة وسرعة.
lz4 يتم تطبيق ضغط LZ4 والدفعات. LZ4 هي خوارزمية ضغط سريعة توفر نسبة ضغط منخفضة وسرعة عالية.

الدفعات

وبصرف النظر عن الضغط، يمكنك أيضا تكوين دفعات للرسائل قبل إرسالها إلى مواضيع Kafka. يسمح لك الإرسال في دفعات بتجميع رسائل متعددة معا وضغطها كوحدة واحدة، ما يمكن أن يحسن كفاءة الضغط ويقلل من حمل الشبكة.

الحقل ‏‏الوصف مطلوب
مُمكَّن قيمة منطقية تشير إلى ما إذا كان الإرسال في دفعات ممكنا أم لا. إذا لم يتم تعيينها، تكون القيمة الافتراضية خاطئة. ‏‏نعم‬
زمن الانتقال الحد الأقصى للفاصل الزمني بالمللي ثانية الذي يمكن تخزين الرسائل مؤقتا قبل إرسالها. إذا تم الوصول إلى هذا الفاصل الزمني، فسيتم إرسال جميع الرسائل المخزنة مؤقتا كدفعة، بغض النظر عن عددها أو حجمها. إذا لم يتم تعيين القيمة الافتراضية هي 5. لا
الحد الأقصى من الرسائل الحد الأقصى لعدد الرسائل التي يمكن تخزينها مؤقتا قبل إرسالها. إذا تم الوصول إلى هذا الرقم، فسيتم إرسال جميع الرسائل المخزنة مؤقتا كدفعة، بغض النظر عن حجمها أو مدة تخزينها مؤقتا. إذا لم يتم تعيين القيمة الافتراضية هي 100000. لا
maxBytes الحد الأقصى للحجم بالبايت الذي يمكن تخزينه مؤقتا قبل إرساله. إذا تم الوصول إلى هذا الحجم، فسيتم إرسال جميع الرسائل المخزنة مؤقتا كدفعة، بغض النظر عن عددها أو مدة تخزينها مؤقتا. القيمة الافتراضية هي 1000000 (1 ميغابايت). لا

مثال على استخدام الدفعات هو:

batching:
  enabled: true
  latencyMs: 1000
  maxMessages: 100
  maxBytes: 1024

وهذا يعني أنه يتم إرسال الرسائل إما عندما يكون هناك 100 رسالة في المخزن المؤقت، أو عندما يكون هناك 1024 بايت في المخزن المؤقت، أو عندما تنقضي 1000 مللي ثانية منذ آخر إرسال، أيهما يأتي أولا.

استراتيجية معالجة التقسيم

استراتيجية معالجة القسم هي ميزة تسمح لك بالتحكم في كيفية تعيين الرسائل إلى أقسام Kafka عند إرسالها إلى مواضيع Kafka. أقسام Kafka هي مقاطع منطقية لموضوع Kafka التي تمكن المعالجة المتوازية والتسامح مع الخطأ. تحتوي كل رسالة في موضوع Kafka على قسم وإزاحة تستخدم لتحديد الرسائل وترتيبها.

بشكل افتراضي، يقوم موصل Kafka بتعيين رسائل إلى أقسام عشوائية، باستخدام خوارزمية الترتيب الدوري. ومع ذلك، يمكنك استخدام استراتيجيات مختلفة لتعيين رسائل إلى أقسام استنادا إلى بعض المعايير، مثل اسم موضوع MQTT أو خاصية رسالة MQTT. يمكن أن يساعدك هذا في تحقيق موازنة تحميل أفضل أو منطقة بيانات أو ترتيب رسائل.

قيمة ‏‏الوصف
افتراضي تعيين رسائل إلى أقسام عشوائية، باستخدام خوارزمية الترتيب الدوري. إنها القيمة الافتراضية إذا لم يتم تحديد استراتيجية.
ثابت تعيين رسائل إلى رقم قسم ثابت مشتق من معرف المثيل للموصل. وهذا يعني أن كل مثيل موصل يرسل رسائل إلى قسم مختلف. يمكن أن يساعد هذا في تحقيق موازنة تحميل أفضل ومحة البيانات.
الموضوع يستخدم اسم موضوع MQTT كمفتاح للتقسيم. وهذا يعني أن الرسائل التي تحمل نفس اسم موضوع MQTT يتم إرسالها إلى نفس القسم. يمكن أن يساعد هذا في تحقيق ترتيب أفضل للرسائل ومحلية البيانات.
خاصية يستخدم خاصية رسالة MQTT كمفتاح للتقسيم. حدد اسم الخاصية في partitionKeyProperty الحقل. وهذا يعني أن الرسائل التي لها نفس قيمة الخاصية يتم إرسالها إلى نفس القسم. يمكن أن يساعد هذا في تحقيق ترتيب أفضل للرسائل ومحلية البيانات استنادا إلى معيار مخصص.

مثال على استخدام استراتيجية معالجة القسم هو:

partitionStrategy: property
partitionKeyProperty: device-id

وهذا يعني أن الرسائل التي لها نفس خاصية معرف الجهاز يتم إرسالها إلى نفس القسم.

مسارات

يحدد حقل المسارات قائمة بالمسارات لنقل البيانات بين مواضيع MQTT وموضوعات Kafka. يمكن أن يكون لكل مسار حقل mqttToKafka أو kafkaToMqtt ، اعتمادا على اتجاه نقل البيانات.

MQTT إلى Kafka

mqttToKafka يعرف الحقل مسارا ينقل البيانات من موضوع MQTT إلى موضوع Kafka.

الحقل ‏‏الوصف مطلوب
الاسم اسم فريد للمسار. ‏‏نعم‬
mqttTopic موضوع MQTT للاشتراك منه. يمكنك استخدام أحرف البدل (# و +) لمطابقة مواضيع متعددة. ‏‏نعم‬
kafkaTopic موضوع Kafka لإرساله. ‏‏نعم‬
kafkaAcks عدد الإقرارات التي يتطلبها الموصل من نقطة نهاية Kafka. القيم المحتملة هي: zero أو oneأو all. لا
Qos مستوى جودة الخدمة (QoS) لاشتراك موضوع MQTT. القيم المحتملة هي: 0 أو 1 (افتراضي). QoS 2 غير مدعوم حاليا. ‏‏نعم‬
الاشتراك المشترك تكوين استخدام الاشتراكات المشتركة لمواضيع MQTT. groupNameحدد ، وهو معرف فريد لمجموعة من المشتركين، وgroupMinimumShareNumber، وهو عدد المشتركين في مجموعة تتلقى رسائل من موضوع. على سبيل المثال، إذا كان groupName هو "group1" وكان groupMinimumShareNumber هو 3، فسينشئ الموصل ثلاثة مشتركين بنفس اسم المجموعة لتلقي رسائل من موضوع. تسمح لك هذه الميزة بتوزيع الرسائل بين العديد من المشتركين دون فقدان أي رسائل أو إنشاء تكرارات. لا

مثال على استخدام mqttToKafka المسار:

mqttToKafka:
  mqttTopic: temperature-alerts/#
  kafkaTopic: receiving-event-hub
  kafkaAcks: one
  qos: 1
  sharedSubscription:
    groupName: group1
    groupMinimumShareNumber: 3

في هذا المثال، يتم إرسال الرسائل من مواضيع MQTT التي تطابق تنبيهات درجة الحرارة/# إلى موضوع Kafka receiving-event-hub مع QoS المكافئ 1 ومجموعة الاشتراك المشتركة "group1" مع رقم المشاركة 3.

Kafka إلى MQTT

kafkaToMqtt يعرف الحقل مسارا ينقل البيانات من موضوع Kafka إلى موضوع MQTT.

الحقل ‏‏الوصف مطلوب
الاسم اسم فريد للمسار. ‏‏نعم‬
kafkaTopic موضوع Kafka للسحب منه. ‏‏نعم‬
mqttTopic موضوع MQTT للنشر إليه. ‏‏نعم‬
معرف مجموعة المستهلكين بادئة معرف مجموعة المستهلكين لكل مسار Kafka إلى MQTT. إذا لم يتم تعيينه، يتم تعيين معرف مجموعة المستهلكين إلى نفس اسم المسار. لا
Qos مستوى جودة الخدمة (QoS) للرسائل المنشورة في موضوع MQTT. القيم المحتملة هي 0 أو 1 (افتراضي). QoS 2 غير مدعوم حاليا. إذا تم تعيين QoS إلى 1، ينشر الموصل الرسالة إلى موضوع MQTT ثم ينتظر ack قبل تثبيت الرسالة مرة أخرى إلى Kafka. بالنسبة إلى QoS 0، يلتزم الموصل مرة أخرى على الفور دون ack MQTT. لا

مثال على استخدام kafkaToMqtt المسار:

kafkaToMqtt:
  kafkaTopic: sending-event-hub
  mqttTopic: heater-commands
  qos: 0

في هذا المثال، يتم نشر الرسائل من موضوع Kafka sending-event-hub* إلى MQTT topic heater-commands مع مستوى QoS 0.

يجب أن يتطابق اسم مركز الأحداث مع موضوع Kafka

يجب تسمية كل مركز حدث فردي ليس مساحة الاسم تماما مثل موضوع Kafka المقصود المحدد في المسارات. أيضا، يجب أن يتطابق سلسلة الاتصال EntityPath إذا تم تحديد نطاق سلسلة الاتصال إلى مركز أحداث واحد. هذا المطلب لأن مساحة اسم مراكز الأحداث مماثلة لنظام مجموعة Kafka واسم مركز الحدث مشابه لموضوع Kafka، لذلك يجب أن يتطابق اسم موضوع Kafka مع اسم مركز الحدث.

إزاحات مجموعة مستهلكين Kafka

إذا قطع الموصل الاتصال أو تمت إزالته وإعادة تثبيته بنفس معرف مجموعة مستهلك Kafka، يتم تخزين إزاحة مجموعة المستهلكين (الموضع الأخير من حيث قراءة رسائل مستهلك Kafka) في مراكز أحداث Azure. لمعرفة المزيد، راجع مجموعة مستهلكين مراكز الأحداث مقابل مجموعة مستهلكين Kafka.

إصدار MQTT

يستخدم هذا الموصل MQTT v5 فقط.

نشر رسائل MQTT والاشتراك فيها باستخدام Azure IoT MQ Preview