إرسال البيانات من Azure IoT MQ Preview إلى Data Lake Storage

هام

معاينة عمليات Azure IoT - التي تم تمكينها بواسطة Azure Arc قيد المعاينة حاليا. يجب عدم استخدام برنامج المعاينة هذا في بيئات الإنتاج.

للحصول على الشروط القانونية التي تنطبق على ميزات Azure الموجودة في الإصدار التجريبي، أو المعاينة، أو التي لم يتم إصدارها بعد في التوفر العام، راجع شروط الاستخدام التكميلية لمعاينات Microsoft Azure.

يمكنك استخدام موصل مستودع البيانات لإرسال البيانات من وسيط Azure IoT MQ Preview إلى مستودع بيانات، مثل Azure Data Lake Storage Gen2 (ADLSv2) وMicrosoft Fabric OneLake وAzure Data Explorer. يشترك الموصل في مواضيع MQTT ويتناول الرسائل في جداول Delta في حساب Data Lake Storage.

المتطلبات الأساسية

تكوين لإرسال البيانات إلى Microsoft Fabric OneLake باستخدام الهوية المدارة

تكوين موصل مستودع بيانات للاتصال ب Microsoft Fabric OneLake باستخدام الهوية المدارة.

  1. تأكد من استيفاء الخطوات الواردة في المتطلبات الأساسية، بما في ذلك مساحة عمل Microsoft Fabric و lakehouse. لا يمكن استخدام مساحة العمل الافتراضية.

  2. تأكد من تثبيت ملحق IoT MQ Arc وتكوينه باستخدام الهوية المدارة.

  3. في مدخل Microsoft Azure، انتقل إلى مجموعة Kubernetes المتصلة ب Arc وحدد الإعدادات> Extensions. في قائمة الملحقات، ابحث عن اسم ملحق IoT MQ. يبدأ الاسم ب mq- متبوعا بخمسة أحرف عشوائية. على سبيل المثال، mq-4jgjs.

  4. احصل على معرف التطبيق المقترن بالهوية المدارة لملحق IoT MQ Arc، ولاحظ قيمة GUID. يختلف معرف التطبيق عن الكائن أو المعرف الأساسي. يمكنك استخدام Azure CLI عن طريق العثور على معرف الكائن للهوية المدارة ثم الاستعلام عن معرف التطبيق الخاص بكيان الخدمة المرتبط بالهوية المدارة. على سبيل المثال:

    OBJECT_ID=$(az k8s-extension show --name <IOT_MQ_EXTENSION_NAME> --cluster-name <ARC_CLUSTER_NAME> --resource-group <RESOURCE_GROUP_NAME> --cluster-type connectedClusters --query identity.principalId -o tsv)
    az ad sp show --query appId --id $OBJECT_ID --output tsv
    

    يجب أن تحصل على إخراج بقيمة GUID:

    xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
    

    هذا المعرف الفريد العمومي هو معرف التطبيق الذي تحتاج إلى استخدامه في الخطوة التالية.

  5. في مساحة عمل Microsoft Fabric، استخدم إدارة الوصول، ثم حدد + إضافة أشخاص أو مجموعات.

  6. ابحث عن ملحق IoT MQ Arc باسمه "mq"، وتأكد من تحديد قيمة معرف التطبيق GUID التي وجدتها في الخطوة السابقة.

  7. حدد المساهم كدور، ثم حدد إضافة.

  8. إنشاء مورد DataLake الاتصال or الذي يحدد إعدادات التكوين ونقطة النهاية للموصل. يمكنك استخدام YAML المتوفر كمثال، ولكن تأكد من تغيير الحقول التالية:

    • target.fabricOneLake.endpoint: نقطة نهاية حساب Microsoft Fabric OneLake. يمكنك الحصول على عنوان URL لنقطة النهاية من Microsoft Fabric lakehouse ضمن خصائص الملفات>. يجب أن يبدو عنوان URL مثل https://onelake.dfs.fabric.microsoft.com.
    • target.fabricOneLake.names: أسماء مساحة العمل ومخزن البحيرة. استخدم إما هذا الحقل أو guids. لا تستخدم كليهما.
      • workspaceName: اسم مساحة العمل.
      • lakehouseName: اسم بحيرة.
    apiVersion: mq.iotoperations.azure.com/v1beta1
    kind: DataLakeConnector
    metadata:
      name: my-datalake-connector
      namespace: azure-iot-operations
    spec:
      protocol: v5
      image:
        repository: mcr.microsoft.com/azureiotoperations/datalake
        tag: 0.4.0-preview
        pullPolicy: IfNotPresent
      instances: 2
      logLevel: info
      databaseFormat: delta
      target:
        fabricOneLake:
          # Example: https://onelake.dfs.fabric.microsoft.com
          endpoint: <example-endpoint-url>
          names:
            workspaceName: <example-workspace-name>
            lakehouseName: <example-lakehouse-name>
          ## OR
          # guids:
          #   workspaceGuid: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
          #   lakehouseGuid: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
          fabricPath: tables
          authentication:
            systemAssignedManagedIdentity:
              audience: https://storage.azure.com/
      localBrokerConnection:
        endpoint: aio-mq-dmqtt-frontend:8883
        tls:
          tlsEnabled: true
          trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only
        authentication:
          kubernetes: {}
    
  9. إنشاء مورد DataLake الاتصال orTopicMap الذي يحدد التعيين بين موضوع MQTT وجدول Delta في Data Lake Storage. يمكنك استخدام YAML المتوفر كمثال، ولكن تأكد من تغيير الحقول التالية:

    • dataLakeConnectorRef: اسم مورد DataLake الاتصال or الذي قمت بإنشائه سابقا.
    • clientId: معرف فريد لعميل MQTT.
    • mqttSourceTopic: اسم موضوع MQTT الذي تريد أن تأتي منه البيانات.
    • table.tableName: اسم الجدول الذي تريد إلحاقه في بحيرة. يتم إنشاء الجدول تلقائيا إذا لم يكن موجودا.
    • table.schema: مخطط جدول Delta الذي يجب أن يتطابق مع تنسيق وحقول رسائل JSON التي ترسلها إلى موضوع MQTT.
  10. تطبيق موارد DataLake الاتصال or وDataLake الاتصال orTopicMap على مجموعة Kubernetes باستخدام kubectl apply -f datalake-connector.yaml.

  11. ابدأ في إرسال رسائل JSON إلى موضوع MQTT باستخدام ناشر MQTT. يشترك مثيل موصل مستودع البيانات في الموضوع ويتناول الرسائل في جدول Delta.

  12. باستخدام متصفح، تحقق من استيراد البيانات إلى lakehouse. في مساحة عمل Microsoft Fabric، حدد lakehouse ثم Tables. يجب أن تشاهد البيانات في الجدول.

جدول غير معروف

إذا ظهرت بياناتك في الجدول غير معروف :

قد يكون السبب أحرفا غير معتمدة في اسم الجدول. يجب أن يكون اسم الجدول اسم حاوية تخزين Azure صالحا مما يعني أنه يمكن أن يحتوي على أي حرف إنجليزي، وأحرف كبيرة أو صغيرة، وشريط أسفل _، بطول يصل إلى 256 حرفا. لا يسمح بشرطات - أو أحرف مسافة.

تكوين لإرسال البيانات إلى Azure Data Lake Storage Gen2 باستخدام رمز SAS المميز

تكوين موصل مستودع بيانات للاتصال بحساب Azure Data Lake Storage Gen2 (ADLS Gen2) باستخدام رمز مميز لتوقيع الوصول المشترك (SAS).

  1. احصل على رمز SAS المميز لحساب Azure Data Lake Storage Gen2 (ADLS Gen2). على سبيل المثال، استخدم مدخل Microsoft Azure للاستعراض وصولا إلى حساب التخزين الخاص بك. في القائمة ضمن الأمان + الشبكات، اختر توقيع الوصول المشترك. استخدم الجدول التالي لتعيين الأذونات المطلوبة.

    المعلمة القيمة
    الخدمات المسموح بها كائن ثنائي كبير الحجم
    ⁩أنواع الموارد غير المسموح بها⁦⁩ كائن، حاوية
    الأذونات المسموح بها قراءة، كتابة، حذف، قائمة، إنشاء

    لتحسين الامتياز الأقل، يمكنك أيضا اختيار الحصول على SAS لحاوية فردية. لمنع أخطاء المصادقة، تأكد من أن الحاوية تطابق table.tableName القيمة في تكوين مخطط الموضوع.

  2. إنشاء سر Kubernetes باستخدام رمز SAS المميز. لا تقم بتضمين علامة الاستفهام ? التي قد تكون في بداية الرمز المميز.

    kubectl create secret generic my-sas \
    --from-literal=accessToken='sv=2022-11-02&ss=b&srt=c&sp=rwdlax&se=2023-07-22T05:47:40Z&st=2023-07-21T21:47:40Z&spr=https&sig=xDkwJUO....' \
    -n azure-iot-operations
    
  3. إنشاء مورد DataLake الاتصال or الذي يحدد إعدادات التكوين ونقطة النهاية للموصل. يمكنك استخدام YAML المتوفر كمثال، ولكن تأكد من تغيير الحقول التالية:

    • endpoint: نقطة نهاية Data Lake Storage لحساب تخزين ADLSv2 في شكل https://example.blob.core.windows.net. في مدخل Microsoft Azure، ابحث عن نقطة النهاية ضمن Storage account > الإعدادات > Endpoints > Data Lake Storage.
    • accessTokenSecretName: اسم سر Kubernetes الذي يحتوي على رمز SAS المميز (my-sas من المثال السابق).
    apiVersion: mq.iotoperations.azure.com/v1beta1
    kind: DataLakeConnector
    metadata:
      name: my-datalake-connector
      namespace: azure-iot-operations
    spec:
      protocol: v5
      image:
        repository: mcr.microsoft.com/azureiotoperations/datalake
        tag: 0.4.0-preview
        pullPolicy: IfNotPresent
      instances: 2
      logLevel: "debug"
      databaseFormat: "delta"
      target:
        datalakeStorage:
          endpoint: "https://example.blob.core.windows.net"
          authentication:
            accessTokenSecretName: "my-sas"
      localBrokerConnection:
        endpoint: aio-mq-dmqtt-frontend:8883
        tls:
          tlsEnabled: true
          trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only
        authentication:
          kubernetes: {}
    
  4. إنشاء مورد DataLake الاتصال orTopicMap الذي يحدد التعيين بين موضوع MQTT وجدول Delta في Data Lake Storage. يمكنك استخدام YAML المتوفر كمثال، ولكن تأكد من تغيير الحقول التالية:

    • dataLakeConnectorRef: اسم مورد DataLake الاتصال or الذي قمت بإنشائه سابقا.
    • clientId: معرف فريد لعميل MQTT.
    • mqttSourceTopic: اسم موضوع MQTT الذي تريد أن تأتي منه البيانات.
    • table.tableName: اسم الحاوية التي تريد إلحاقها في Data Lake Storage. إذا تم تحديد نطاق رمز SAS المميز للحساب، يتم إنشاء الحاوية تلقائيا إذا كانت مفقودة.
    • table.schema: مخطط جدول Delta، والذي يجب أن يتطابق مع تنسيق وحقول رسائل JSON التي ترسلها إلى موضوع MQTT.
  5. تطبيق موارد DataLake الاتصال or وDataLake الاتصال orTopicMap على مجموعة Kubernetes باستخدام kubectl apply -f datalake-connector.yaml.

  6. ابدأ في إرسال رسائل JSON إلى موضوع MQTT باستخدام ناشر MQTT. يشترك مثيل موصل مستودع البيانات في الموضوع ويتناول الرسائل في جدول Delta.

  7. باستخدام مدخل Microsoft Azure، تحقق من إنشاء جدول Delta. يتم تنظيم الملفات حسب معرف العميل واسم مثيل الموصل وموضوع MQTT والوقت. في حاويات حساب> التخزين، افتح الحاوية التي حددتها في DataLake الاتصال orTopicMap. تحقق من وجود _delta_log وتظهر ملفات parque حركة مرور MQTT. افتح ملف parque لتأكيد تطابق الحمولة مع ما تم إرساله وتعريفه في المخطط.

استخدام الهوية المدارة للمصادقة إلى ADLSv2

لاستخدام الهوية المدارة، حددها كطريقة وحيدة ضمن DataLake الاتصال or authentication. استخدم az k8s-extension show للعثور على المعرف الأساسي لملحق IoT MQ Arc، ثم قم بتعيين دور للهوية المدارة التي تمنح الإذن للكتابة إلى حساب التخزين، مثل Storage Blob Data Contributor. لمعرفة المزيد، راجع تخويل الوصول إلى الكائنات الثنائية كبيرة الحجم باستخدام معرف Microsoft Entra.

authentication:
  systemAssignedManagedIdentity:
    audience: https://my-account.blob.core.windows.net

تكوين لإرسال البيانات إلى Azure Data Explorer باستخدام الهوية المدارة

تكوين موصل مستودع البيانات لإرسال البيانات إلى نقطة نهاية Azure Data Explorer باستخدام الهوية المدارة.

  1. تأكد من استيفاء الخطوات في المتطلبات الأساسية، بما في ذلك نظام مجموعة Azure Data Explorer الكامل. لا يعمل خيار "نظام المجموعة المجاني".

  2. بعد إنشاء نظام المجموعة، قم بإنشاء قاعدة بيانات لتخزين بياناتك.

  3. يمكنك إنشاء جدول لبيانات معينة عبر مدخل Microsoft Azure وإنشاء أعمدة يدويا، أو يمكنك استخدام KQL في علامة تبويب الاستعلام. على سبيل المثال:

    .create table thermostat (
        externalAssetId: string,
        assetName: string,
        CurrentTemperature: real,
        Pressure: real,
        MqttTopic: string,
        Timestamp: datetime
    )
    

تمكين استيعاب الدفق

تمكين استيعاب البث على الجدول وقاعدة البيانات. في علامة تبويب الاستعلام، قم بتشغيل الأمر التالي، واستبدال باسم <DATABASE_NAME> قاعدة البيانات:

.alter database <DATABASE_NAME> policy streamingingestion enable

إضافة الهوية المدارة إلى نظام مجموعة Azure Data Explorer

لكي يقوم الموصل بالمصادقة على Azure Data Explorer، يجب إضافة الهوية المدارة إلى مجموعة Azure Data Explorer.

  1. في مدخل Microsoft Azure، انتقل إلى مجموعة Kubernetes المتصلة ب Arc وحدد الإعدادات> Extensions. في قائمة الملحقات، ابحث عن اسم ملحق IoT MQ الخاص بك. يبدأ الاسم ب mq- متبوعا بخمسة أحرف عشوائية. على سبيل المثال، mq-4jgjs. اسم ملحق IoT MQ هو نفس اسم الهوية المدارة MQ.
  2. في قاعدة بيانات Azure Data Explorer، حدد Permissions>Add>Ingestor. ابحث عن اسم الهوية المدارة MQ وأضفه.

لمزيد من المعلومات حول إضافة الأذونات، راجع إدارة أذونات نظام مجموعة Azure Data Explorer.

الآن، أنت مستعد لنشر الموصل وإرسال البيانات إلى Azure Data Explorer.

مثال على ملف التوزيع

مثال لملف النشر لموصل Azure Data Explorer. تتطلب منك التعليقات التي تبدأ باستبدال TODO إعدادات العنصر النائب بمعلوماتك.

apiVersion: mq.iotoperations.azure.com/v1beta1
  name: my-adx-connector
  namespace: azure-iot-operations
spec:
    repository: mcr.microsoft.com/azureiotoperations/datalake
    tag: 0.4.0-preview
    pullPolicy: Always
  databaseFormat: adx
  target:
      # TODO: insert the ADX cluster endpoint
      endpoint: https://<CLUSTER>.<REGION>.kusto.windows.net
      authentication:
        systemAssignedManagedIdentity:
          audience: https://api.kusto.windows.net
  localBrokerConnection:
    endpoint: aio-mq-dmqtt-frontend:8883
    tls:
      tlsEnabled: true
      trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only
    authentication:
      kubernetes: {}
---
apiVersion: mq.iotoperations.azure.com/v1beta1
kind: DataLakeConnectorTopicMap
metadata:
  name: adx-topicmap
  namespace: azure-iot-operations
spec:
  mapping:
    allowedLatencySecs: 1
    messagePayloadType: json
    maxMessagesPerBatch: 10
    clientId: id
    mqttSourceTopic: azure-iot-operations/data/thermostat
    qos: 1
    table:
      # TODO: add DB and table name
      tablePath: <DATABASE_NAME>
      tableName: <TABLE_NAME>
      schema:
      - name: externalAssetId
        format: utf8
        optional: false
        mapping: $property.externalAssetId
      - name: assetName
        format: utf8
        optional: false
        mapping: DataSetWriterName
      - name: CurrentTemperature
        format: float32
        optional: false
        mapping: Payload.temperature.Value
      - name: Pressure
        format: float32
        optional: true
        mapping: "Payload.Tag 10.Value"
      - name: MqttTopic
        format: utf8
        optional: false
        mapping: $topic
      - name: Timestamp
        format: timestamp
        optional: false
        mapping: $received_time

يقبل هذا المثال البيانات من azure-iot-operations/data/thermostat الموضوع مع رسائل بتنسيق JSON مثل ما يلي:

{
  "SequenceNumber": 4697,
  "Timestamp": "2024-04-02T22:36:03.1827681Z",
  "DataSetWriterName": "thermostat",
  "MessageType": "ua-deltaframe",
  "Payload": {
    "temperature": {
      "SourceTimestamp": "2024-04-02T22:36:02.6949717Z",
      "Value": 5506
    },
    "Tag 10": {
      "SourceTimestamp": "2024-04-02T22:36:02.6949888Z",
      "Value": 5506
    }
  }
}

DataLake الاتصال or

DataLake الاتصال or هو مورد مخصص Kubernetes يحدد تكوين وخصائص مثيل موصل مستودع البيانات. يدمج موصل مستودع البيانات البيانات من مواضيع MQTT في جداول Delta في حساب Data Lake Storage.

يحتوي حقل المواصفات لمورد DataLake الاتصال or على الحقول الفرعية التالية:

  • protocol: إصدار MQTT. يمكن أن يكون واحدا من v5 أو v3.
  • image: يحدد حقل الصورة صورة الحاوية لوحدة موصل مستودع البيانات. يحتوي على الحقول الفرعية التالية:
    • repository: اسم سجل الحاوية والمستودع حيث يتم تخزين الصورة.
    • tag: علامة الصورة المراد استخدامها.
    • pullPolicy: نهج السحب للصورة. يمكن أن يكون واحدا من Alwaysأو IfNotPresentأو Never.
  • instances: عدد النسخ المتماثلة لموصل مستودع البيانات المراد تشغيله.
  • logLevel: مستوى السجل لوحدة موصل مستودع البيانات. يمكن أن يكون واحدا من traceأو debugأو infowarnerror.fatal
  • databaseFormat: تنسيق البيانات لاستيعابها في Data Lake Storage. يمكن أن يكون واحدا من delta أو parquet.
  • target: يحدد الحقل الهدف وجهة استيعاب البيانات. يمكن أن يكون datalakeStorageأو fabricOneLakeadxأو أو localStorage.
    • datalakeStorage: يحدد تكوين وخصائص حساب ADLSv2. يحتوي على الحقول الفرعية التالية:
      • endpoint: عنوان URL لنقطة نهاية حساب Data Lake Storage. لا تقم بتضمين أي شرطة /مائلة لاحقة .
      • authentication: يحدد حقل المصادقة النوع وبيانات الاعتماد للوصول إلى حساب Data Lake Storage. يمكن أن يكون واحدا مما يلي.
        • accessTokenSecretName: اسم سر Kubernetes لاستخدام مصادقة رمز الوصول المشترك لحساب Data Lake Storage. هذا الحقل مطلوب إذا كان النوع هو accessToken.
        • systemAssignedManagedIdentity: لاستخدام هوية مدارة من قبل النظام للمصادقة. يحتوي على حقل فرعي واحد
          • audience: سلسلة في شكل https://<my-account-name>.blob.core.windows.net لجمهور الرمز المميز للهوية المدارة التي تم تحديد نطاقها إلى مستوى الحساب أو https://storage.azure.com لأي حساب تخزين.
    • fabricOneLake: يحدد تكوين وخصائص Microsoft Fabric OneLake. يحتوي على الحقول الفرعية التالية:
      • endpoint: عنوان URL لنقطة نهاية Microsoft Fabric OneLake. عادة ما https://onelake.dfs.fabric.microsoft.com يكون ذلك لأن هذه هي نقطة النهاية العمومية OneLake. إذا كنت تستخدم نقطة نهاية إقليمية، فهي في شكل https://<region>-onelake.dfs.fabric.microsoft.com. لا تقم بتضمين أي شرطة /مائلة لاحقة . لمعرفة المزيد، راجع الاتصال إلى Microsoft OneLake.
      • names: يحدد أسماء مساحة العمل ومخزن البحيرة. استخدم إما هذا الحقل أو guids. لا تستخدم كليهما. يحتوي على الحقول الفرعية التالية:
        • workspaceName: اسم مساحة العمل.
        • lakehouseName: اسم بحيرة.
      • guids: يحدد معرفات المستخدم الرسومية لمساحة العمل ومخزن البحيرة. استخدم إما هذا الحقل أو names. لا تستخدم كليهما. يحتوي على الحقول الفرعية التالية:
        • workspaceGuid: المعرف الفريد العمومي لمساحة العمل.
        • lakehouseGuid: المعرف الفريد العمومي لمخزن البحيرة.
      • fabricPath: موقع البيانات في مساحة عمل Fabric. يمكن أن يكون إما tables أو files. إذا كان ، tablesيتم تخزين البيانات في Fabric OneLake كجداول. إذا كان ، filesيتم تخزين البيانات في Fabric OneLake كملفات. إذا كان ، filesdatabaseFormat يجب أن يكون parquet.
      • authentication: يحدد حقل المصادقة نوع وبيانات الاعتماد للوصول إلى Microsoft Fabric OneLake. يمكن أن يكون systemAssignedManagedIdentity فقط في الوقت الحالي. يحتوي على حقل فرعي واحد:
      • systemAssignedManagedIdentity: لاستخدام هوية مدارة من قبل النظام للمصادقة. يحتوي على حقل فرعي واحد
        • audience: سلسلة لجمهور الرمز المميز للهوية المدارة ويجب أن تكون https://storage.azure.com.
    • adx: يحدد تكوين وخصائص قاعدة بيانات Azure Data Explorer. يحتوي على الحقول الفرعية التالية:
      • endpoint: عنوان URL لنقطة نهاية مجموعة Azure Data Explorer مثل https://<CLUSTER>.<REGION>.kusto.windows.net. لا تقم بتضمين أي شرطة /مائلة لاحقة .
      • authentication: يحدد حقل المصادقة النوع وبيانات الاعتماد للوصول إلى مجموعة Azure Data Explorer. يمكن أن يكون systemAssignedManagedIdentity فقط في الوقت الحالي. يحتوي على حقل فرعي واحد:
        • systemAssignedManagedIdentity: لاستخدام هوية مدارة من قبل النظام للمصادقة. يحتوي على حقل فرعي واحد
          • audience: سلسلة لجمهور الرمز المميز للهوية المدارة ويجب أن تكون https://api.kusto.windows.net.
    • localStorage: يحدد تكوين وخصائص حساب التخزين المحلي. يحتوي على الحقول الفرعية التالية:
      • volumeName: اسم وحدة التخزين التي يتم تحميلها في كل من pods الموصل.
  • localBrokerConnection: يستخدم لتجاوز تكوين الاتصال الافتراضي إلى وسيط MQTT IoT MQTT. راجع إدارة اتصال الوسيط المحلي.

DataLake الاتصال orTopicMap

DataLake الاتصال orTopicMap هو مورد مخصص Kubernetes يحدد التعيين بين موضوع MQTT وجدول Delta في حساب Data Lake Storage. يشير مورد DataLake الاتصال orTopicMap إلى مورد DataLake الاتصال or الذي يعمل على نفس جهاز الحافة ويتناول البيانات من موضوع MQTT في جدول Delta.

يحتوي حقل المواصفات لمورد DataLake الاتصال orTopicMap على الحقول الفرعية التالية:

  • dataLakeConnectorRef: اسم مورد DataLake الاتصال or الذي ينتمي إليه مخطط الموضوع هذا.
  • mapping: يحدد حقل التعيين تفاصيل وخصائص موضوع MQTT وجدول Delta. يحتوي على الحقول الفرعية التالية:
    • allowedLatencySecs: الحد الأقصى لزمن الانتقال بالثوان بين تلقي رسالة من موضوع MQTT واستيعابها في جدول Delta. هذا الحقل مطلوب.
    • clientId: معرف فريد لعميل MQTT الذي يشترك في الموضوع.
    • maxMessagesPerBatch: الحد الأقصى لعدد الرسائل التي يجب استيعابها في دفعة واحدة في جدول Delta. نظرا لقيود مؤقتة، يجب أن تكون هذه القيمة أقل من 16 إذا qos تم تعيينها إلى 1. هذا الحقل مطلوب.
    • messagePayloadType: نوع الحمولة التي يتم إرسالها إلى موضوع MQTT. يمكن أن يكون واحدا من json أو avro (غير مدعوم بعد).
    • mqttSourceTopic: اسم موضوع (مواضيع) MQTT للاشتراك فيها. يدعم تدوين حرف البدل لموضوع MQTT.
    • qos: جودة مستوى الخدمة للاشتراك في موضوع MQTT. يمكن أن تكون واحدة من 0 أو 1.
    • table: يحدد حقل الجدول تكوين وخصائص جدول Delta في حساب Data Lake Storage. يحتوي على الحقول الفرعية التالية:
      • tableName: اسم جدول Delta المراد إنشاؤه أو إلحاقه في حساب Data Lake Storage. يعرف هذا الحقل أيضا باسم اسم الحاوية عند استخدامه مع Azure Data Lake Storage Gen2. يمكن أن يحتوي على أي حرف إنجليزي صغير ، وأدنى _، بطول يصل إلى 256 حرفا. لا يسمح بشرطات - أو أحرف مسافة.
      • tablePath: اسم قاعدة بيانات Azure Data Explorer عند استخدام adx موصل النوع.
      • schema: مخطط جدول Delta، والذي يجب أن يتطابق مع تنسيق حمولة الرسالة وحقولها. إنه صفيف من الكائنات، لكل منها الحقول الفرعية التالية:
        • name: اسم العمود في جدول Delta.
        • format: نوع بيانات العمود في جدول Delta. يمكن أن يكون واحدا من booleanأو int8أو int16أو int32أو int64أو uInt8أو uInt16أو uInt32أو uInt64أو float16أو float32. float64utf8date32timestampbinary الأنواع غير الموقعة، مثل uInt8، غير مدعومة بالكامل، ويتم التعامل معها على أنها أنواع موقعة إذا تم تحديدها هنا.
        • optional: قيمة منطقية تشير إلى ما إذا كان العمود اختياريا أو مطلوبا. هذا الحقل اختياري ويتم تعيينه افتراضيا إلى خطأ.
        • mapping: تعبير مسار JSON الذي يحدد كيفية استخراج قيمة العمود من حمولة رسالة MQTT. تتوفر التعيينات المضمنة $client_idو $properties$topic$received_time و لاستخدامها كأعمدة لإثراء JSON في نص رسالة MQTT. هذا الحقل مطلوب. استخدم $properties لخصائص مستخدم MQTT. على سبيل المثال، يمثل $properties.assetId قيمة خاصية assetId من رسالة MQTT.

فيما يلي مثال على مورد DataLake الاتصال orTopicMap:

apiVersion: mq.iotoperations.azure.com/v1beta1
kind: DataLakeConnectorTopicMap
metadata:
  name: datalake-topicmap
  namespace: azure-iot-operations
spec:
  dataLakeConnectorRef: my-datalake-connector
  mapping:
    allowedLatencySecs: 1
    messagePayloadType: json
    maxMessagesPerBatch: 10
    clientId: id
    mqttSourceTopic: azure-iot-operations/data/thermostat
    qos: 1
    table:
      tableName: thermostat
      schema:
      - name: externalAssetId
        format: utf8
        optional: false
        mapping: $property.externalAssetId
      - name: assetName
        format: utf8
        optional: false
        mapping: DataSetWriterName
      - name: CurrentTemperature
        format: float32
        optional: false
        mapping: Payload.temperature.Value
      - name: Pressure
        format: float32
        optional: true
        mapping: "Payload.Tag 10.Value"
      - name: Timestamp
        format: timestamp
        optional: false
        mapping: $received_time

JSON stringified مثل "{\"SequenceNumber\": 4697, \"Timestamp\": \"2024-04-02T22:36:03.1827681Z\", \"DataSetWriterName\": \"thermostat-de\", \"MessageType\": \"ua-deltaframe\", \"Payload\": {\"temperature\": {\"SourceTimestamp\": \"2024-04-02T22:36:02.6949717Z\", \"Value\": 5506}, \"Tag 10\": {\"SourceTimestamp\": \"2024-04-02T22:36:02.6949888Z\", \"Value\": 5506}}}" غير مدعوم ويتسبب في أن يقوم الموصل برمي محول عثر على خطأ قيمة خالية.

رسالة مثال للموضوع azure-iot-operations/data/thermostat الذي يعمل مع هذا المخطط:

{
  "SequenceNumber": 4697,
  "Timestamp": "2024-04-02T22:36:03.1827681Z",
  "DataSetWriterName": "thermostat",
  "MessageType": "ua-deltaframe",
  "Payload": {
    "temperature": {
      "SourceTimestamp": "2024-04-02T22:36:02.6949717Z",
      "Value": 5506
    },
    "Tag 10": {
      "SourceTimestamp": "2024-04-02T22:36:02.6949888Z",
      "Value": 5506
    }
  }
}

الذي يعين إلى:

معرف المجموعة الخارجية assetName CurrentTemperature الضغط mqttTopic الطابع الزمني
xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx thermostat-de 5506 5506 Dlc 2024-04-02T22:36:03.1827681Z

هام

إذا تم تحديث مخطط البيانات، على سبيل المثال تم تغيير نوع بيانات أو تغيير اسم، فقد يتوقف تحويل البيانات الواردة عن العمل. تحتاج إلى تغيير اسم جدول البيانات في حالة حدوث تغيير في المخطط.

دلتا أو باركيه

يتم دعم كل من تنسيقات دلتا وparquet.

إدارة اتصال الوسيط المحلي

مثل جسر MQTT، يعمل موصل مستودع البيانات كجهة عميل إلى وسيط IoT MQ MQTT. إذا قمت بتخصيص منفذ وحدة الاستماع أو مصادقة وسيط IoT MQTT، فتجاوز تكوين اتصال MQTT المحلي لموصل مستودع البيانات أيضا. لمعرفة المزيد، راجع وصلة وسيط MQTT المحلية.

نشر رسائل MQTT والاشتراك فيها باستخدام Azure IoT MQ Preview