Share via


Senden von Daten von Azure IoT MQ Preview an Data Lake Storage

Wichtig

Die von Azure Arc aktivierte Azure IoT Operations Preview befindet sich derzeit in der VORSCHAU. Sie sollten diese Vorschausoftware nicht in Produktionsumgebungen verwenden.

Die zusätzlichen Nutzungsbestimmungen für Microsoft Azure-Vorschauen enthalten rechtliche Bedingungen. Sie gelten für diejenigen Azure-Features, die sich in der Beta- oder Vorschauversion befinden oder aber anderweitig noch nicht zur allgemeinen Verfügbarkeit freigegeben sind.

Sie können den Data Lake-Connector verwenden, um Daten vom Azure IoT MQ-Broker (Vorschau) an einen Data Lake wie Azure Data Lake Storage Gen2 (ADLSv2), Microsoft Fabric OneLake und Azure Data Explorer zu senden. Der Connector abonniert MQTT-Themen und erfasst die Nachrichten in die Delta-Tabellen im Data Lake Storage-Konto.

Voraussetzungen

Konfigurieren des Sendens von Daten an Microsoft Fabric OneLake mithilfe der verwalteten Identität

Konfigurieren Sie einen Data Lake Connector zum Herstellen einer Verbindung mit Microsoft Fabric OneLake unter Verwendung einer verwalteten Identität.

  1. Stellen Sie sicher, dass die Schritte unter Voraussetzungen erfüllt sind, einschließlich eines Microsoft Fabric-Arbeitsbereichs und eines Lakehouse. Die Standardoption mein Arbeitsbereich kann nicht verwendet werden.

  2. Stellen Sie sicher, dass die IoT MQ Arc-Erweiterung installiert und mit verwalteter Identität konfiguriert ist.

  3. Wechseln Sie im Azure-Portal zum Arc-verbundenen Kubernetes-Cluster und wählen Sie Einstellungen>Erweiterungen aus. Suchen Sie in der Erweiterungsliste nach Ihrem IoT MQ-Erweiterungsnamen. Der Name beginnt mit mq- gefolgt von fünf zufälligen Zeichen. Beispiel: mq-4jgjs.

  4. Ermitteln Sie die mit der verwalteten Identität der IoT MQ Arc-Erweiterung verknüpfte App-ID und notieren Sie sich den GUID-Wert. Die App-ID unterscheidet sich von der Objekt- oder Prinzipal-ID. Sie können die Azure CLI verwenden, indem Sie die Objekt-ID der verwalteten Identität suchen und dann die App-ID des Dienstprinzipals abfragen, das mit der verwalteten Identität verbunden ist. Beispiel:

    OBJECT_ID=$(az k8s-extension show --name <IOT_MQ_EXTENSION_NAME> --cluster-name <ARC_CLUSTER_NAME> --resource-group <RESOURCE_GROUP_NAME> --cluster-type connectedClusters --query identity.principalId -o tsv)
    az ad sp show --query appId --id $OBJECT_ID --output tsv
    

    Sie sollten eine Ausgabe mit einem GUID-Wert erhalten:

    xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
    

    Diese GUID ist die App-ID, die Sie im nächsten Schritt verwenden müssen.

  5. Verwenden Sie im Microsoft Fabric-Arbeitsbereich Zugriff verwalten und wählen Sie dann + Personen oder Gruppen hinzufügen aus.

  6. Suchen Sie nach der IoT MQ Arc-Erweiterung mit dem Namen „mq“ und stellen Sie sicher, dass Sie den GUID-Wert der App-ID auswählen, den Sie im vorherigen Schritt gefunden haben.

  7. Wählen Sie die Rolle Mitwirkende*r und dann Hinzufügen aus.

  8. Erstellen Sie eine DataLakeConnector-Ressource, die die Konfigurations- und Endpunkteinstellungen für den Connector definiert. Sie können das bereitgestellte YAML als Beispiel verwenden, aber stellen Sie sicher, dass Sie die folgenden Felder ändern:

    • target.fabricOneLake.endpoint: Der Endpunkt des Microsoft Fabric OneLake-Kontos. Sie können die Endpunkt-URL von Microsoft Fabric Lakehouse unter Files>Eigenschaftenabrufen. Die URL sollte wie https://onelake.dfs.fabric.microsoft.com aussehen.
    • target.fabricOneLake.names: Die Namen des Arbeitsbereichs und des Lakehouse Verwenden Sie entweder dieses Feld oder guids. Sie sollten nicht beides verwenden.
      • workspaceName: Der Name des Arbeitsbereichs
      • lakehouseName: Den Namen des Lakehouse
    apiVersion: mq.iotoperations.azure.com/v1beta1
    kind: DataLakeConnector
    metadata:
      name: my-datalake-connector
      namespace: azure-iot-operations
    spec:
      protocol: v5
      image:
        repository: mcr.microsoft.com/azureiotoperations/datalake
        tag: 0.4.0-preview
        pullPolicy: IfNotPresent
      instances: 2
      logLevel: info
      databaseFormat: delta
      target:
        fabricOneLake:
          # Example: https://onelake.dfs.fabric.microsoft.com
          endpoint: <example-endpoint-url>
          names:
            workspaceName: <example-workspace-name>
            lakehouseName: <example-lakehouse-name>
          ## OR
          # guids:
          #   workspaceGuid: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
          #   lakehouseGuid: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
          fabricPath: tables
          authentication:
            systemAssignedManagedIdentity:
              audience: https://storage.azure.com/
      localBrokerConnection:
        endpoint: aio-mq-dmqtt-frontend:8883
        tls:
          tlsEnabled: true
          trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only
        authentication:
          kubernetes: {}
    
  9. Erstellen Sie eine DataLakeConnectorTopicMap-Ressource, die die Zuordnung zwischen dem MQTT-Thema und der Delta-Tabelle in Data Lake Storage definiert. Sie können das bereitgestellte YAML als Beispiel verwenden, aber stellen Sie sicher, dass Sie die folgenden Felder ändern:

    • dataLakeConnectorRef: Den Namen der DataLakeConnector-Ressource, die Sie zuvor erstellt haben
    • clientId: Dies ist ein eindeutiger Bezeichner für Ihren MQTT-Client.
    • mqttSourceTopic: Den Namen des MQTT-Topics, von dem die Daten stammen sollen
    • table.tableName: Den Namen der Tabelle, die Sie im Lakehouse ergänzen möchten. Wenn die Tabelle nicht vorhanden ist, wird sie automatisch erstellt.
    • table.schema: Das Schema der Delta-Tabelle, das dem Format und den Feldern der JSON-Nachrichten entsprechen soll, die Sie an das MQTT-Thema senden.
  10. Wenden Sie die Ressourcen DataLakeConnector und DataLakeConnectorTopicMap mit kubectl apply -f datalake-connector.yaml auf Ihren Kubernetes-Cluster an.

  11. Beginnen Sie mit dem Senden von JSON-Nachrichten an das MQTT-Thema über Ihren MQTT-Publisher. Die Data Lake Connector-Instanz abonniert das Thema und erfasst die Nachrichten in die Delta-Tabelle.

  12. Überprüfen Sie mit einem Browser, ob die Daten in das Lakehouse importiert wurden. Wählen Sie im Arbeitsbereich von Microsoft Fabric Ihr Lakehouse und dann Tabellen. Die Daten sollten in der Tabelle angezeigt werden.

Tabelle „Unbekannt“

Wenn Ihre Daten in der Tabelle Unbekannt angezeigt werden:

Die Ursache könnten nicht unterstützte Zeichen im Tabellennamen sein. Der Tabellenname muss ein gültiger Azure Storage Containername sein, d.h. er kann jeden englischen Buchstaben, Groß- oder Kleinbuchstaben und den Unterstrich _ enthalten und bis zu 256 Zeichen lang sein. Es sind keine Bindestriche (-) oder Leerzeichen erlaubt.

Konfigurieren des Sendens von Daten an Azure Data Lake Storage Gen2 mithilfe des SAS-Tokens

Konfigurieren Sie einen Data Lake Connector für die Verbindung zu einem Azure Data Lake Storage Gen2 (ADLS Gen2) Konto mit einem SAS-Token (Shared Access Signature).

  1. Rufen Sie ein SAS-Token für ein Azure Data Lake Storage Gen2 (ADLS Gen2) Konto ab. Verwenden Sie zum Beispiel das Azure-Portal, um Ihr Speicherkonto aufzurufen. Wählen Sie im Menü unter Sicherheit und Netzwerk die Option Shared Access Signature aus. Verwenden Sie die folgende Tabelle, um die erforderlichen Berechtigungen festzulegen.

    Parameter Wert
    Zulässige Dienste Blob
    Zulässige Ressourcentypen Objekt, Container
    Zugelassene Berechtigungen Lesen, Schreiben, Löschen, Auflisten, Erstellen

    Zur Optimierung für die geringste Berechtigung können Sie auch die SAS für einen einzelnen Container abrufen. Um Authentifizierungsfehler zu vermeiden, stellen Sie sicher, dass der Container dem table.tableName Wert in der Themenzuordnungskonfiguration entspricht.

  2. Erstellen Sie ein Kubernetes-Geheimnis mit dem SAS-Token. Verzichten Sie auf das Fragezeichen ?, das am Anfang des Tokens stehen könnte.

    kubectl create secret generic my-sas \
    --from-literal=accessToken='sv=2022-11-02&ss=b&srt=c&sp=rwdlax&se=2023-07-22T05:47:40Z&st=2023-07-21T21:47:40Z&spr=https&sig=xDkwJUO....' \
    -n azure-iot-operations
    
  3. Erstellen Sie eine DataLakeConnector-Ressource, die die Konfigurations- und Endpunkteinstellungen für den Connector definiert. Sie können das bereitgestellte YAML als Beispiel verwenden, aber stellen Sie sicher, dass Sie die folgenden Felder ändern:

    • endpoint: Den Data Lake Storage-Endpunkt des ADLSv2 Speicherkontos in Form von https://example.blob.core.windows.net. Suchen Sie im Azure-Portal den Endpunkt unter Speicherkonto > Einstellungen > Endpunkte > Data Lake Storage.
    • accessTokenSecretName: Den Namen des Kubernetes-Geheimnisses, das das SAS-Token enthält (my-sas aus dem vorherigen Beispiel).
    apiVersion: mq.iotoperations.azure.com/v1beta1
    kind: DataLakeConnector
    metadata:
      name: my-datalake-connector
      namespace: azure-iot-operations
    spec:
      protocol: v5
      image:
        repository: mcr.microsoft.com/azureiotoperations/datalake
        tag: 0.4.0-preview
        pullPolicy: IfNotPresent
      instances: 2
      logLevel: "debug"
      databaseFormat: "delta"
      target:
        datalakeStorage:
          endpoint: "https://example.blob.core.windows.net"
          authentication:
            accessTokenSecretName: "my-sas"
      localBrokerConnection:
        endpoint: aio-mq-dmqtt-frontend:8883
        tls:
          tlsEnabled: true
          trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only
        authentication:
          kubernetes: {}
    
  4. Erstellen Sie eine DataLakeConnectorTopicMap-Ressource, die die Zuordnung zwischen dem MQTT-Thema und der Delta-Tabelle in Data Lake Storage definiert. Sie können das bereitgestellte YAML als Beispiel verwenden, aber stellen Sie sicher, dass Sie die folgenden Felder ändern:

    • dataLakeConnectorRef: Den Namen der DataLakeConnector-Ressource, die Sie zuvor erstellt haben
    • clientId: Dies ist ein eindeutiger Bezeichner für Ihren MQTT-Client.
    • mqttSourceTopic: Den Namen des MQTT-Topics, von dem die Daten stammen sollen
    • table.tableName: Den Namen des Containers, den Sie in Data Lake Storage ergänzen möchten. Wenn das SAS-Token auf das Konto beschränkt ist, wird der Container automatisch erstellt, falls er noch nicht vorhanden ist.
    • table.schema: Das Schema der Delta-Tabelle, das dem Format und den Feldern der JSON-Nachrichten entsprechen sollte, die Sie an das MQTT-Thema senden.
  5. Wenden Sie die Ressourcen DataLakeConnector und DataLakeConnectorTopicMap mit kubectl apply -f datalake-connector.yaml auf Ihren Kubernetes-Cluster an.

  6. Beginnen Sie mit dem Senden von JSON-Nachrichten an das MQTT-Thema über Ihren MQTT-Publisher. Die Data Lake Connector-Instanz abonniert das Thema und erfasst die Nachrichten in die Delta-Tabelle.

  7. Überprüfen Sie mithilfe des Azure-Portals, ob die Delta-Tabelle erstellt wurde. Die Dateien werden nach Client-ID, Connectorinstanzname, MQTT-Thema und Zeit organisiert. Öffnen Sie in Ihrem Speicherkonto >Container den Container, den Sie in DataLakeConnectorTopicMap angegeben haben. Stellen Sie sicher, dass _delta_log vorhanden ist und die Parquet-Dateien MQTT-Datenverkehr anzeigen. Öffnen Sie eine Parquet-Datei, um zu bestätigen, dass die Payload mit dem gesendeten und definierten Schema übereinstimmt.

Verwenden der „Verwalteten Identität“ zur Authentifizierung bei ADLSv2

Um eine verwaltete Identität zu verwenden, geben Sie sie als einzige Authentifizierungsmethode unter DataLakeConnector authentication an. Verwenden Sie az k8s-extension show, um die Prinzipal-ID für die IoT MQ Arc-Erweiterung zu finden. Weisen Sie dann der verwalteten Identität eine Rolle zu, die Berechtigungen zum Schreiben in das Speicherkonto erteilt, z. B. „Storage Blob Data Contributor“. Weitere Informationen finden Sie unter Autorisieren des Zugriffs auf Blobs mit Microsoft Entra ID.

authentication:
  systemAssignedManagedIdentity:
    audience: https://my-account.blob.core.windows.net

Konfigurieren, um Daten mithilfe der verwalteten Identität an Azure Data Explorer zu senden

Konfigurieren Sie den Data Lake-Connector, um Daten mithilfe der verwalteten Identität an einen Azure Data Explorer-Endpunkt zu senden.

  1. Führen Sie die Schritte in den Voraussetzungen aus (einschließlich der Schritte für einen vollständigen Azure Data Explorer-Cluster). Die Option „Kostenloser Cluster“ funktioniert nicht.

  2. Nachdem das Cluster erstellt wurde, erstellen Sie eine Datenbank zum Speichern Ihrer Daten.

  3. Sie können eine Tabelle für bestimmte Daten über das Azure-Portal erstellen und Spalten manuell erstellen oder KQL auf der Abfrageregisterkarte verwenden. Beispiel:

    .create table thermostat (
        externalAssetId: string,
        assetName: string,
        CurrentTemperature: real,
        Pressure: real,
        MqttTopic: string,
        Timestamp: datetime
    )
    

Streamingerfassung aktivieren

Aktivieren Sie die Streamingerfassung für Ihre Tabelle und Datenbank. Führen Sie auf der Registerkarte „Abfrage“ den folgenden Befehl aus, und ersetzen Sie <DATABASE_NAME> durch ihren Datenbanknamen:

.alter database <DATABASE_NAME> policy streamingingestion enable

Hinzufügen der verwalteten Identität zum Azure Data Explorer-Cluster

Damit sich der Connector beim Azure-Daten-Explorer authentifizieren kann, müssen Sie die verwaltete Identität dem Azure Data Explorer-Cluster hinzufügen.

  1. Wechseln Sie im Azure-Portal zum Arc-verbundenen Kubernetes-Cluster und wählen Sie Einstellungen>Erweiterungen aus. Suchen Sie in der Erweiterungsliste nach dem Namen Ihrer IoT MQ-Erweiterung. Der Name beginnt mit mq- gefolgt von fünf zufälligen Zeichen. Beispiel: mq-4jgjs. Der Name der IoT MQ-Erweiterung ist identisch mit dem Namen der verwalteten MQ-Identität.
  2. Wählen Sie in Ihrer Azure Data Explorer-Datenbank Berechtigungen>Hinzufügen>Ingestor aus. Suchen Sie nach dem Namen der verwalteten MQ-Identität, und fügen Sie ihn hinzu.

Weitere Informationen zum Hinzufügen von Berechtigungen finden Sie unter Verwalten von Azure Data Explorer-Clusterberechtigungen.

Jetzt können Sie den Connector bereitstellen und Daten an Azure Data Explorer senden.

Beispielbereitstellungsdatei

Beispielbereitstellungsdatei für den Azure Data Explorer-Connector. Kommentare, die mit TODO beginnen, erfordern, dass Sie Platzhaltereinstellungen durch Ihre Angaben ersetzen.

apiVersion: mq.iotoperations.azure.com/v1beta1
  name: my-adx-connector
  namespace: azure-iot-operations
spec:
    repository: mcr.microsoft.com/azureiotoperations/datalake
    tag: 0.4.0-preview
    pullPolicy: Always
  databaseFormat: adx
  target:
      # TODO: insert the ADX cluster endpoint
      endpoint: https://<CLUSTER>.<REGION>.kusto.windows.net
      authentication:
        systemAssignedManagedIdentity:
          audience: https://api.kusto.windows.net
  localBrokerConnection:
    endpoint: aio-mq-dmqtt-frontend:8883
    tls:
      tlsEnabled: true
      trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only
    authentication:
      kubernetes: {}
---
apiVersion: mq.iotoperations.azure.com/v1beta1
kind: DataLakeConnectorTopicMap
metadata:
  name: adx-topicmap
  namespace: azure-iot-operations
spec:
  mapping:
    allowedLatencySecs: 1
    messagePayloadType: json
    maxMessagesPerBatch: 10
    clientId: id
    mqttSourceTopic: azure-iot-operations/data/thermostat
    qos: 1
    table:
      # TODO: add DB and table name
      tablePath: <DATABASE_NAME>
      tableName: <TABLE_NAME>
      schema:
      - name: externalAssetId
        format: utf8
        optional: false
        mapping: $property.externalAssetId
      - name: assetName
        format: utf8
        optional: false
        mapping: DataSetWriterName
      - name: CurrentTemperature
        format: float32
        optional: false
        mapping: Payload.temperature.Value
      - name: Pressure
        format: float32
        optional: true
        mapping: "Payload.Tag 10.Value"
      - name: MqttTopic
        format: utf8
        optional: false
        mapping: $topic
      - name: Timestamp
        format: timestamp
        optional: false
        mapping: $received_time

In diesem Beispiel werden Daten aus dem azure-iot-operations/data/thermostat-Thema mit Nachrichten im JSON-Format akzeptiert, z. B. folgendes:

{
  "SequenceNumber": 4697,
  "Timestamp": "2024-04-02T22:36:03.1827681Z",
  "DataSetWriterName": "thermostat",
  "MessageType": "ua-deltaframe",
  "Payload": {
    "temperature": {
      "SourceTimestamp": "2024-04-02T22:36:02.6949717Z",
      "Value": 5506
    },
    "Tag 10": {
      "SourceTimestamp": "2024-04-02T22:36:02.6949888Z",
      "Value": 5506
    }
  }
}

DataLakeConnector

Ein DataLakeConnector ist eine benutzerdefinierte Kubernetes-Ressource, die die Konfiguration und Eigenschaften einer Data Lake Connector-Instanz definiert. Ein Data Lake Connector erfasst Daten aus MQTT-Themen in Delta-Tabellen in einem Data Lake Storage-Konto.

Das Feld Spezifikation einer DataLakeConnector-Ressource enthält die folgenden Unterfelder:

  • protocol: Die MQTT-Version Kann entweder v5 oder v3 sein.
  • image: Das Image-Feld gibt das Containerimage des Data Lake Connector-Moduls an. Es hat die folgenden Subfelder:
    • repository: Den Namen der Containerregistrierung und des Repositorys, in dem das Image gespeichert ist.
    • tag: Das Tag des zu verwendenden Images.
    • pullPolicy: Die Pullrichtlinie für das Image. Kann entweder Always, IfNotPresent oder Never sein.
  • instances: Die Anzahl der Replikate des auszuführenden Data Lake-Connectors.
  • logLevel: Die Protokollebene für das Data Lake Connector-Modul. Kann entweder trace, debug, info, warn, error, oder fatal sein.
  • databaseFormat: Das Format der Daten, die in Data Lake Storage erfasst werden sollen. Kann entweder delta oder parquet sein.
  • target: Das Zielfeld gibt das Ziel der Datenerfassung an. Es kann datalakeStorage, fabricOneLake, adx oder localStorage sein.
    • datalakeStorage: Gibt die Konfiguration und die Eigenschaften des ADLSv2-Kontos an. Es hat die folgenden Subfelder:
      • endpoint: Die URL des Data Lake Storage-Kontoendpunkts. Fügen Sie keinen nachgestellten Schrägstrich / ein.
      • authentication: Das Feld Authentifizierung gibt den Typ und die Anmeldeinformationen für den Zugriff auf das Data Lake Storage-Konto an. Folgende Werte sind möglich:
        • accessTokenSecretName: Der Name des Kubernetes-Geheimnisses für die Verwendung der gemeinsamen Zugriffstoken-Authentifizierung für das Data Lake Storage-Konto. Dieses Feld ist ein Pflichtfeld, wenn der Typ accessToken verwendet wird.
        • systemAssignedManagedIdentity: Für die Verwendung der vom System verwalteten Identität zur Authentifizierung. Es gibt ein Subfeld:
          • audience: Eine Zeichenfolge in Form von https://<my-account-name>.blob.core.windows.net für die Zielgruppe der verwalteten Identitätstoken, die auf die Kontoebene beschränkt ist, oder https://storage.azure.com für ein beliebiges Speicherkonto.
    • fabricOneLake: Gibt die Konfiguration und die Eigenschaften von Microsoft Fabric OneLake an. Es hat die folgenden Subfelder:
      • endpoint: Die URL des Microsoft Fabric OneLake-Endpunkts. In der Regel https://onelake.dfs.fabric.microsoft.com, da dies der globale OneLake-Endpunkt ist. Wenn Sie einen regionalen Endpunkt verwenden, hat dieser die Form https://<region>-onelake.dfs.fabric.microsoft.com. Fügen Sie keinen nachgestellten Schrägstrich / ein. Weitere Informationen finden Sie unter Herstellen einer Verbindung mit Microsoft OneLake.
      • names: Gibt die Namen des Arbeitsbereichs und des Lakehouse an. Verwenden Sie entweder dieses Feld oder guids. Sie sollten nicht beides verwenden. Es hat die folgenden Subfelder:
        • workspaceName: Der Name des Arbeitsbereichs
        • lakehouseName: Den Namen des Lakehouse
      • guids: Gibt die GUIDs des Arbeitsbereichs und des Lakehouse an. Verwenden Sie entweder dieses Feld oder names. Sie sollten nicht beides verwenden. Es hat die folgenden Subfelder:
        • workspaceGuid: Die GUID des Arbeitsbereichs.
        • lakehouseGuid: Die GUID des Lakehouse.
      • fabricPath: Den Speicherort der Daten im Fabric-Arbeitsbereich. Es kann entweder tables oder files sein. Wenn es tables ist, werden die Daten in Fabric OneLake als Tabellen gespeichert. Wenn es files ist, werden die Daten in Fabric OneLake als Dateien gespeichert. Wenn es files ist, muss das databaseFormatparquet sein.
      • authentication: Das Feld Authentifizierung gibt den Typ und die Anmeldedaten für den Zugriff auf Microsoft Fabric OneLake an. Kann im Moment nur systemAssignedManagedIdentity sein. Es gibt ein Subfeld:
      • systemAssignedManagedIdentity: Für die Verwendung der vom System verwalteten Identität zur Authentifizierung. Es gibt ein Subfeld:
        • audience: Eine Zeichenkette für die Zielgruppe des verwalteten Identitätstokens und sie muss https://storage.azure.comsein.
    • adx: Gibt die Konfiguration und die Eigenschaften der Azure Data Explorer-Datenbank an. Es hat die folgenden Subfelder:
      • endpoint: Die URL des Azure Data Explorer-Clusterendpunkts, z. B. https://<CLUSTER>.<REGION>.kusto.windows.net. Fügen Sie keinen nachgestellten Schrägstrich / ein.
      • authentication: Das Authentifizierungsfeld gibt den Typ und die Anmeldeinformationen für den Zugriff auf den Azure Data Explorer-Cluster an. Kann im Moment nur systemAssignedManagedIdentity sein. Es gibt ein Subfeld:
        • systemAssignedManagedIdentity: Für die Verwendung der vom System verwalteten Identität zur Authentifizierung. Es gibt ein Subfeld:
          • audience: Eine Zeichenfolge für die Tokenzielgruppe der verwalteten Identität, die https://api.kusto.windows.net lauten muss.
    • localStorage: Gibt die Konfiguration und die Eigenschaften des lokalen Speicherkontos an. Es hat die folgenden Subfelder:
      • volumeName: Den Namen des Volumes, das in die einzelnen Connector-Pods eingebunden ist.
  • localBrokerConnection: Wird verwendet, um die Standardverbindungskonfiguration zum IoT MQ MQTT Vermittler außer Kraft zu setzen. Siehe Verwalten der lokalen Brokerverbindung.

DataLakeConnectorTopicMap

Eine DataLakeConnectorTopicMap ist eine benutzerdefinierte Kubernetes-Ressource, die die Zuordnung zwischen einem MQTT-Thema und einer Delta-Tabelle in einem Data Lake Storage-Konto definiert. Eine DataLakeConnectorTopicMap-Ressource verweist auf eine DataLakeConnector-Ressource, die auf demselben Edge-Gerät läuft und Daten aus dem MQTT-Topic in die Delta-Tabelle erfasst.

Das Feld Spezifikation einer DataLakeConnectorTopicMap-Ressource enthält die folgenden Unterfelder:

  • dataLakeConnectorRef: Den Namen der DataLakeConnector-Ressource, zu der diese Themenzuordnung gehört.
  • mapping: Das Mapping-Feld gibt die Details und Eigenschaften des MQTT-Themas und der Delta-Tabelle an. Es hat die folgenden Subfelder:
    • allowedLatencySecs: Die maximale Wartezeit in Sekunden zwischen dem Empfang einer Nachricht vom MQTT-Topic und ihrer Aufnahme in die Delta-Tabelle. Dieses Feld ist ein Pflichtfeld.
    • clientId: Einen eindeutigen Bezeichner für den MQTT-Client, der das Thema abonniert.
    • maxMessagesPerBatch: Die maximale Anzahl von Nachrichten, die in einem Batch in die Delta-Tabelle erfasst werden. Aufgrund einer vorübergehenden Einschränkung muss dieser Wert kleiner als 16 sein, wenn qos auf 1 festgelegt ist. Dieses Feld ist ein Pflichtfeld.
    • messagePayloadType: Den Typ der Payload, die an das MQTT-Thema gesendet wird. Kann entweder json oder avro (noch nicht unterstützt) sein.
    • mqttSourceTopic: Den Namen der MQTT-Themen, die abonniert werden sollen. Unterstützt die MQTT-Themen-Platzhalternotation.
    • qos: Die Dienstgüte für das Abonnieren des MQTT-Themas. Kann entweder 0 oder 1 sein.
    • table: Das Feld Tabelle gibt die Konfiguration und die Eigenschaften der Delta-Tabelle im Data Lake Storage-Konto an. Es hat die folgenden Subfelder:
      • tableName: Den Namen der Delta-Tabelle, die im Data Lake Storage-Konto erstellt oder ergänzt werden soll. Dieses Feld wird auch als Containername bezeichnet, wenn es mit Azure Data Lake Storage Gen2 verwendet wird. Er kann jeden englischen Kleinbuchstaben und den Unterstrich (_) enthalten und eine Länge von bis zu 256 Zeichen haben. Es sind keine Bindestriche (-) oder Leerzeichen erlaubt.
      • tablePath: Der Name der Azure Data Explorer-Datenbank, wenn ein Connector vom Typ adx verwendet wird.
      • schema: Das Schema der Delta-Tabelle, das mit dem Format und den Feldern der Payload der Nachricht übereinstimmen sollte. Es handelt sich um ein Array von Objekten, die jeweils die folgenden Unterfelder enthalten:
        • name: Den Namen der Spalte in der Delta-Tabelle.
        • format: Den Datentyp der Spalte in der Delta-Tabelle. Kann entweder boolean, int8, int16, int32, int64, uInt8, uInt16, uInt32, uInt64, float16, float32, float64, date32, timestamp, binary, oder utf8 sein. Typen ohne Vorzeichen, wie uInt8, werden nicht vollständig unterstützt und werden wie Typen mit Vorzeichen behandelt, wenn sie hier angegeben werden.
        • optional: Ein boolescher Wert, der angibt, ob die Spalte optional ist oder nicht. Dieses Feld ist optional und hat den Standardwert false.
        • mapping: JSON-Pfadausdruck, der definiert, wie der Wert der Spalte aus der Payload der MQTT-Nachricht extrahiert werden soll. Die integrierten Zuordnungen $client_id, $topic, $property und $received_time stehen zur Verfügung, um den JSON-Code im MQTT-Nachrichtentext zu erweitern. Dieses Feld ist ein Pflichtfeld. Verwenden Sie $property für MQTT-Benutzereigenschaften. Beispielsweise stellt $property.assetId den Wert der assetId-Eigenschaft aus der MQTT-Nachricht dar.

Hier ist ein Beispiel für eine DataLakeConnectorTopicMap-Ressource:

apiVersion: mq.iotoperations.azure.com/v1beta1
kind: DataLakeConnectorTopicMap
metadata:
  name: datalake-topicmap
  namespace: azure-iot-operations
spec:
  dataLakeConnectorRef: my-datalake-connector
  mapping:
    allowedLatencySecs: 1
    messagePayloadType: json
    maxMessagesPerBatch: 10
    clientId: id
    mqttSourceTopic: azure-iot-operations/data/thermostat
    qos: 1
    table:
      tableName: thermostat
      schema:
      - name: externalAssetId
        format: utf8
        optional: false
        mapping: $property.externalAssetId
      - name: assetName
        format: utf8
        optional: false
        mapping: DataSetWriterName
      - name: CurrentTemperature
        format: float32
        optional: false
        mapping: Payload.temperature.Value
      - name: Pressure
        format: float32
        optional: true
        mapping: "Payload.Tag 10.Value"
      - name: Timestamp
        format: timestamp
        optional: false
        mapping: $received_time

Stringified mit Escapezeichen wie "{\"SequenceNumber\": 4697, \"Timestamp\": \"2024-04-02T22:36:03.1827681Z\", \"DataSetWriterName\": \"thermostat-de\", \"MessageType\": \"ua-deltaframe\", \"Payload\": {\"temperature\": {\"SourceTimestamp\": \"2024-04-02T22:36:02.6949717Z\", \"Value\": 5506}, \"Tag 10\": {\"SourceTimestamp\": \"2024-04-02T22:36:02.6949888Z\", \"Value\": 5506}}}" wird nicht unterstützt und führt dazu, dass der Connector den Fehler convertor found a null value (Konverter hat einen Nullwert gefunden) ausgibt.

Eine Beispielnachricht für das Thema azure-iot-operations/data/thermostat, das mit diesem Schema funktioniert:

{
  "SequenceNumber": 4697,
  "Timestamp": "2024-04-02T22:36:03.1827681Z",
  "DataSetWriterName": "thermostat",
  "MessageType": "ua-deltaframe",
  "Payload": {
    "temperature": {
      "SourceTimestamp": "2024-04-02T22:36:02.6949717Z",
      "Value": 5506
    },
    "Tag 10": {
      "SourceTimestamp": "2024-04-02T22:36:02.6949888Z",
      "Value": 5506
    }
  }
}

Dies entspricht:

externalAssetId assetName CurrentTemperature Druck mqttTopic Zeitstempel
xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx thermostat-de 5506 5506 dlc 2024-04-02T22:36:03.1827681Z

Wichtig

Wenn das Datenschema aktualisiert wird, z. B. wenn ein Datentyp oder ein Name geändert wird, funktioniert die Transformation der eingehenden Daten möglicherweise nicht mehr. Sie müssen den Namen der Datentabelle ändern, wenn sich das Schema ändert.

Delta oder Parquet

Es werden sowohl Delta- als auch Parquetformate unterstützt.

Verwalten der lokalen Brokerverbindung

Wie die MQTT-Brücke fungiert der Data Lake Connector als Client für den IoT MQ MQTT Vermittler. Wenn Sie den Listener-Port oder die Authentifizierung Ihres IoT MQ MQTT Vermittlers angepasst haben, überschreiben Sie auch die lokale MQTT-Verbindungskonfiguration für den Data Lake Connector. Weitere Informationen finden Sie unter MQTT-Bridge lokale Broker-Verbindung.

Veröffentlichen und Abonnieren von MQTT-Nachrichten mit Azure IoT MQ (Vorschau)