Share via


Envío de datos desde la versión preliminar de Azure IoT MQ a Data Lake Storage

Importante

Operaciones de IoT de Azure, habilitado por Azure Arc, está actualmente en VERSIÓN PRELIMINAR. No se debería usar este software en versión preliminar en entornos de producción.

Consulte Términos de uso complementarios para las versiones preliminares de Microsoft Azure para conocer los términos legales que se aplican a las características de Azure que se encuentran en la versión beta, en versión preliminar o que todavía no se han publicado para que estén disponibles con carácter general.

Puede usar el conector de lago de datos para enviar datos del agente de versión preliminar de Azure IoT MQ a un lago de datos, como Azure Data Lake Storage Gen2 (ADLSv2), Microsoft Fabric OneLake y Azure Data Explorer. El conector se suscribe a temas MQTT e ingiere los mensajes en tablas Delta en la cuenta de Data Lake Storage.

Requisitos previos

Configuración para enviar datos a Microsoft Fabric OneLake mediante la identidad administrada

Configure un conector de Data Lake para conectarse a Microsoft Fabric OneLake mediante la identidad administrada.

  1. Asegúrese de que se cumplen los pasos descritos en los requisitos previos, incluida un área de trabajo de Microsoft Fabric y un almacén de lago de datos. No se puede usar el valor predeterminado mi área de trabajo.

  2. Asegúrese de que la extensión de Arc de IoT MQ está instalada y configurada con identidad administrada.

  3. En Azure Portal, ve al clúster de Kubernetes conectado a Arc y seleccione Configuración>Extensiones. En la lista de extensiones, busque el nombre de la extensión de IoT MQ. El nombre comienza con mq- seguido de cinco caracteres aleatorios. Por ejemplo, mq-4jgjs.

  4. Obtenga el id. de la aplicación asociado a la identidad administrada de la extensión de Arc IoT MQ y anote el valor del GUID. El id. de la aplicación es diferente del objeto o el id. de entidad de seguridad. Para usar la CLI de Azure, busque el id. de objeto de la identidad administrada y, a continuación, consulte el id. de la aplicación de la entidad de servicio asociada a la identidad administrada. Por ejemplo:

    OBJECT_ID=$(az k8s-extension show --name <IOT_MQ_EXTENSION_NAME> --cluster-name <ARC_CLUSTER_NAME> --resource-group <RESOURCE_GROUP_NAME> --cluster-type connectedClusters --query identity.principalId -o tsv)
    az ad sp show --query appId --id $OBJECT_ID --output tsv
    

    Debe obtener una salida con un valor GUID:

    xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
    

    Este GUID es el id. de la aplicación que debe usar en el paso siguiente.

  5. En el área de trabajo de Microsoft Fabric, use Administrar acceso y seleccione +Agregar personas o grupos.

  6. Busque la extensión de Arc IoT MQ por su nombre "mq" y asegúrese de seleccionar el valor GUID del identificador de aplicación que encontró en el paso anterior.

  7. Seleccione Colaborador como rol y, a continuación, seleccione Agregar.

  8. Cree un recurso DataLakeConnector que defina la configuración y la configuración del punto de conexión para el conector. Puede usar YAML proporcionado como ejemplo, pero asegúrese de cambiar los campos siguientes:

    • target.fabricOneLake.endpoint: el punto de conexión de la cuenta de OneLake de Microsoft Fabric. Puede obtener la dirección URL del punto de conexión de Microsoft Fabric Lakehouse en Archivos>Propiedades. La dirección URL debe ser similar a https://onelake.dfs.fabric.microsoft.com.
    • target.fabricOneLake.names: los nombres del área de trabajo y el almacén de lago de datos. Use este campo o guids. No use ambos.
      • workspaceName: nombre del área de trabajo.
      • lakehouseName: el nombre del almacén de lago de datos.
    apiVersion: mq.iotoperations.azure.com/v1beta1
    kind: DataLakeConnector
    metadata:
      name: my-datalake-connector
      namespace: azure-iot-operations
    spec:
      protocol: v5
      image:
        repository: mcr.microsoft.com/azureiotoperations/datalake
        tag: 0.4.0-preview
        pullPolicy: IfNotPresent
      instances: 2
      logLevel: info
      databaseFormat: delta
      target:
        fabricOneLake:
          # Example: https://onelake.dfs.fabric.microsoft.com
          endpoint: <example-endpoint-url>
          names:
            workspaceName: <example-workspace-name>
            lakehouseName: <example-lakehouse-name>
          ## OR
          # guids:
          #   workspaceGuid: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
          #   lakehouseGuid: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
          fabricPath: tables
          authentication:
            systemAssignedManagedIdentity:
              audience: https://storage.azure.com/
      localBrokerConnection:
        endpoint: aio-mq-dmqtt-frontend:8883
        tls:
          tlsEnabled: true
          trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only
        authentication:
          kubernetes: {}
    
  9. Cree un recurso DataLakeConnectorTopicMap que defina la asignación entre el tema MQTT y la tabla Delta en Data Lake Storage. Puede usar YAML proporcionado como ejemplo, pero asegúrese de cambiar los campos siguientes:

    • dataLakeConnectorRef: el nombre del recurso DataLakeConnector que creó anteriormente.
    • clientId: un identificador único para el cliente MQTT.
    • mqttSourceTopic: el nombre del tema MQTT del que desea que provengan los datos.
    • table.tableName: el nombre de la tabla a la que desea anexar en el almacén de lago de datos. La tabla se crea automáticamente si no existe.
    • table.schema: el esquema de la tabla Delta que debe coincidir con el formato y los campos de los mensajes JSON que se envían al tema MQTT.
  10. Aplique los recursos DataLakeConnector y DataLakeConnectorTopicMap al clúster de Kubernetes mediante kubectl apply -f datalake-connector.yaml.

  11. Empiece a enviar mensajes JSON al tema MQTT mediante el publicador MQTT. La instancia del conector de lago de datos se suscribe al tema e ingiere los mensajes en la tabla Delta.

  12. Con un explorador, compruebe que los datos se importan en el almacén de lago de datos. En el área de trabajo de Microsoft Fabric, seleccione su instancia de almacén de lago de datos y, después, Tablas. Debería ver los datos en la tabla.

Tablo no identificado

Si los datos se muestran en la tabla no identificado:

La causa podría ser caracteres no admitidos en el nombre de la tabla. El nombre de la tabla debe ser un nombre de contenedor de Azure Storage válido, lo que significa que puede contener cualquier letra en inglés, mayúscula o minúscula, y la barra inferior _, con una longitud de hasta 256 caracteres. No se permiten guiones - ni caracteres de espacio.

Configuración para enviar datos a Azure Data Lake Storage Gen2 mediante el token de SAS

Configure un conector de lago de datos para conectarse a una cuenta de Azure Data Lake Storage Gen2 (ADLS Gen2) mediante un token de firma de acceso compartido (SAS).

  1. Obtenga un token de SAS para una cuenta de Azure Data Lake Storage Gen2 (ADLS Gen2). Por ejemplo, use Azure Portal par la cuenta de almacenamiento. En el menú, en Seguridad y redes, seleccione Firma de acceso compartido. Use la tabla siguiente para establecer los permisos necesarios.

    Parámetro Value
    Servicios permitidos Blob
    Tipos de recursos permitidos Objeto, contenedor
    Permisos permitidos Leer, escribir, eliminar, enumerar, agregar, crear

    Para optimizar los privilegios mínimos, también puede optar por obtener la SAS para un contenedor individual. Para evitar errores de autenticación, asegúrese de que el contenedor coincide con el valor table.tableName en la configuración del mapa del tema.

  2. Cree un secreto de Kubernetes con el token de SAS. No incluya el signo de interrogación ? que podría estar al principio del token.

    kubectl create secret generic my-sas \
    --from-literal=accessToken='sv=2022-11-02&ss=b&srt=c&sp=rwdlax&se=2023-07-22T05:47:40Z&st=2023-07-21T21:47:40Z&spr=https&sig=xDkwJUO....' \
    -n azure-iot-operations
    
  3. Cree un recurso DataLakeConnector que defina la configuración y la configuración del punto de conexión para el conector. Puede usar YAML proporcionado como ejemplo, pero asegúrese de cambiar los campos siguientes:

    • endpoint: el punto de conexión de Data Lake Storage de la cuenta de almacenamiento de ADLSv2 en forma de https://example.blob.core.windows.net. En Azure Portal, busque el punto de conexión en cuenta de Storage > Configuración > Puntos de conexión > Data Lake Storage.
    • accessTokenSecretName: nombre del secreto de Kubernetes que contiene el token de SAS (my-sas del ejemplo anterior).
    apiVersion: mq.iotoperations.azure.com/v1beta1
    kind: DataLakeConnector
    metadata:
      name: my-datalake-connector
      namespace: azure-iot-operations
    spec:
      protocol: v5
      image:
        repository: mcr.microsoft.com/azureiotoperations/datalake
        tag: 0.4.0-preview
        pullPolicy: IfNotPresent
      instances: 2
      logLevel: "debug"
      databaseFormat: "delta"
      target:
        datalakeStorage:
          endpoint: "https://example.blob.core.windows.net"
          authentication:
            accessTokenSecretName: "my-sas"
      localBrokerConnection:
        endpoint: aio-mq-dmqtt-frontend:8883
        tls:
          tlsEnabled: true
          trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only
        authentication:
          kubernetes: {}
    
  4. Cree un recurso DataLakeConnectorTopicMap que defina la asignación entre el tema MQTT y la tabla Delta en Data Lake Storage. Puede usar YAML proporcionado como ejemplo, pero asegúrese de cambiar los campos siguientes:

    • dataLakeConnectorRef: el nombre del recurso DataLakeConnector que creó anteriormente.
    • clientId: un identificador único para el cliente MQTT.
    • mqttSourceTopic: el nombre del tema MQTT del que desea que provengan los datos.
    • table.tableName: el nombre del contenedor al que desea anexar en Data Lake Storage. Si el token de SAS está en el ámbito de la cuenta, el contenedor se crea automáticamente si falta.
    • table.schema: el esquema de la tabla Delta, que debe coincidir con el formato y los campos de los mensajes JSON que se envían al tema MQTT.
  5. Aplique los recursos de DataLakeConnector y DataLakeConnectorTopicMap al clúster de Kubernetes mediante kubectl apply -f datalake-connector.yaml.

  6. Empiece a enviar mensajes JSON al tema MQTT mediante el publicador MQTT. La instancia del conector de lago de datos se suscribe al tema e ingiere los mensajes en la tabla Delta.

  7. Con Azure Portal, compruebe que se crea la tabla Delta. Los archivos se organizan por identificador de cliente, nombre de instancia del conector, tema MQTT y hora. En la cuenta de almacenamiento >Contenedores, abra el contenedor especificado en el DataLakeConnectorTopicMap. Compruebe que _delta_log existe y los archivos de Parquet muestran el tráfico MQTT. Abra un archivo de Parquet para confirmar que la carga coincide con lo que se envió y definió en el esquema.

Uso de la identidad administrada para la autenticación en ADLSv2

Para usar la identidad administrada, especifíquela como el único método en DataLakeConnector authentication. Use az k8s-extension show para buscar el identificador de entidad de seguridad de la extensión de Arc IoT MQ y, a continuación, asigne un rol a la identidad administrada que conceda permiso para escribir en la cuenta de almacenamiento, como colaborador de datos de Storage Blob. Para obtener más información, consulte Autorizar el acceso a blobs mediante Microsoft Entra ID.

authentication:
  systemAssignedManagedIdentity:
    audience: https://my-account.blob.core.windows.net

Configuración para enviar datos a Azure Data Explorer mediante la identidad administrada

Configure el conector de Data Lake para enviar datos a un punto de conexión de Azure Data Explorer mediante la identidad administrada.

  1. Asegúrese de que se cumplen los pasos descritos en los requisitos previos, incluido un clúster completo de Azure Data Explorer. La opción "clúster libre" no funciona.

  2. Una vez creado el clúster, cree una base de datos para almacenar los datos.

  3. Puede crear una tabla para los datos especificados a través de Azure Portal y crear columnas manualmente, o puede usar KQL en la pestaña de consulta. Por ejemplo:

    .create table thermostat (
        externalAssetId: string,
        assetName: string,
        CurrentTemperature: real,
        Pressure: real,
        MqttTopic: string,
        Timestamp: datetime
    )
    

Habilitar ingesta con streaming

Habilite la ingesta con streaming en la tabla y la base de datos. En la pestaña de consulta, ejecute el comando siguiente, sustituyendo <DATABASE_NAME> por el nombre de la base de datos:

.alter database <DATABASE_NAME> policy streamingingestion enable

Incorporación de la identidad administrada al clúster de Azure Data Explorer

Para que el conector se autentique en Azure Data Explorer, debe agregar la identidad administrada al clúster de Azure Data Explorer.

  1. En Azure Portal, ve al clúster de Kubernetes conectado a Arc y seleccione Configuración>Extensiones. En la lista de extensiones, busque el nombre de la extensión de IoT MQ. El nombre comienza con mq- seguido de cinco caracteres aleatorios. Por ejemplo, mq-4jgjs. El nombre de la extensión de IoT MQ es el mismo que el nombre de la identidad administrada de MQ.
  2. En la base de datos de Azure Data Explorer, seleccione Permisos>Agregar>Agente de ingesta. Busque el nombre de la identidad administrada de MQ y agréguelo.

Para obtener más información sobre cómo agregar permisos, consulte Administración de permisos de clúster de Azure Data Explorer.

Ahora está listo para implementar el conector y enviar datos a Azure Data Explorer.

Archivo de implementación de ejemplo

Archivo de implementación de ejemplo para el conector de Azure Data Explorer. Los comentarios que comienzan con TODO requieren que reemplace la configuración del marcador de posición por su información.

apiVersion: mq.iotoperations.azure.com/v1beta1
  name: my-adx-connector
  namespace: azure-iot-operations
spec:
    repository: mcr.microsoft.com/azureiotoperations/datalake
    tag: 0.4.0-preview
    pullPolicy: Always
  databaseFormat: adx
  target:
      # TODO: insert the ADX cluster endpoint
      endpoint: https://<CLUSTER>.<REGION>.kusto.windows.net
      authentication:
        systemAssignedManagedIdentity:
          audience: https://api.kusto.windows.net
  localBrokerConnection:
    endpoint: aio-mq-dmqtt-frontend:8883
    tls:
      tlsEnabled: true
      trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only
    authentication:
      kubernetes: {}
---
apiVersion: mq.iotoperations.azure.com/v1beta1
kind: DataLakeConnectorTopicMap
metadata:
  name: adx-topicmap
  namespace: azure-iot-operations
spec:
  mapping:
    allowedLatencySecs: 1
    messagePayloadType: json
    maxMessagesPerBatch: 10
    clientId: id
    mqttSourceTopic: azure-iot-operations/data/thermostat
    qos: 1
    table:
      # TODO: add DB and table name
      tablePath: <DATABASE_NAME>
      tableName: <TABLE_NAME>
      schema:
      - name: externalAssetId
        format: utf8
        optional: false
        mapping: $property.externalAssetId
      - name: assetName
        format: utf8
        optional: false
        mapping: DataSetWriterName
      - name: CurrentTemperature
        format: float32
        optional: false
        mapping: Payload.temperature.Value
      - name: Pressure
        format: float32
        optional: true
        mapping: "Payload.Tag 10.Value"
      - name: MqttTopic
        format: utf8
        optional: false
        mapping: $topic
      - name: Timestamp
        format: timestamp
        optional: false
        mapping: $received_time

En este ejemplo, se aceptan datos del tema de azure-iot-operations/data/thermostat con mensajes en formato JSON, como los siguientes:

{
  "SequenceNumber": 4697,
  "Timestamp": "2024-04-02T22:36:03.1827681Z",
  "DataSetWriterName": "thermostat",
  "MessageType": "ua-deltaframe",
  "Payload": {
    "temperature": {
      "SourceTimestamp": "2024-04-02T22:36:02.6949717Z",
      "Value": 5506
    },
    "Tag 10": {
      "SourceTimestamp": "2024-04-02T22:36:02.6949888Z",
      "Value": 5506
    }
  }
}

DataLakeConnector

Un DataLakeConnector es un recurso personalizado de Kubernetes que define la configuración y las propiedades de una instancia del conector de lago de datos. Un conector de lago de datos ingiere datos de temas MQTT en tablas Delta en una cuenta de Data Lake Storage.

El campo de especificación de un recurso de DataLakeConnector contiene los siguientes subcampos:

  • protocol: la versión MQTT. Puede ser v5 o v3.
  • image: el campo de imagen especifica la imagen de contenedor del módulo del conector de lago de datos. Tiene los siguientes subcampos:
    • repository: el nombre del registro de contenedor y el repositorio donde se almacena la imagen.
    • tag: etiqueta de la imagen que se va a usar.
    • pullPolicy: directiva de extracción de la imagen. Puede ser Always, IfNotPresent o Never.
  • instances: número de réplicas del conector de lago de datos que se va a ejecutar.
  • logLevel: nivel de registro del módulo del conector de lago de datos. Puede ser trace, debug, info, warn, error o fatal.
  • databaseFormat: el formato de los datos que se van a ingerir en Data Lake Storage. Puede ser delta o parquet.
  • target: el campo de destino especifica el destino de la ingesta de datos. Puede ser datalakeStorage, fabricOneLake, adx o localStorage.
    • datalakeStorage: especifica la configuración y las propiedades de la cuenta de ADLSv2. Tiene los siguientes subcampos:
      • endpoint: la dirección URL del punto de conexión de la cuenta de Data Lake Storage. No incluya ninguna barra diagonal final /.
      • authentication: el campo de autenticación especifica el tipo y las credenciales para acceder a la cuenta de Data Lake Storage. Puede tener uno de los valores siguientes.
        • accessTokenSecretName: el nombre del secreto de Kubernetes para usar la autenticación de token de acceso compartido para la cuenta de Data Lake Storage. Este campo es necesario si el tipo es accessToken.
        • systemAssignedManagedIdentity: para usar la identidad administrada del sistema para la autenticación. Tiene un subcampo
          • audience: una cadena en forma de https://<my-account-name>.blob.core.windows.net para la audiencia de tokens de identidad administrada con ámbito del nivel de cuenta o https://storage.azure.com para cualquier cuenta de almacenamiento.
    • fabricOneLake: especifica la configuración y las propiedades de Microsoft Fabric OneLake. Tiene los siguientes subcampos:
      • endpoint: la dirección URL del punto de conexión de OneLake de Microsoft Fabric. Por lo general, es https://onelake.dfs.fabric.microsoft.com porque es el punto de conexión global de OneLake. Si usa un punto de conexión regional, se encuentra en forma de https://<region>-onelake.dfs.fabric.microsoft.com. No incluya ninguna barra diagonal final /. Para obtener más información, consulte Conexión a Microsoft OneLake.
      • names: especifica los nombres del área de trabajo y el almacén de lago de datos. Use este campo o guids. No use ambos. Tiene los siguientes subcampos:
        • workspaceName: nombre del área de trabajo.
        • lakehouseName: el nombre del almacén de lago de datos.
      • guids: especifica los GUID del área de trabajo y el almacén de lago de datos. Use este campo o names. No use ambos. Tiene los siguientes subcampos:
        • workspaceGuid: GUID del área de trabajo.
        • lakehouseGuid: el GUID del almacén de lago de datos.
      • fabricPath: la ubicación de los datos en el área de trabajo de Fabric. Puede ser tables o files. Si es tables, los datos se almacenan en Fabric OneLake como tablas. Si es files, los datos se almacenan en Fabric OneLake como archivos. Si es files, el databaseFormat debe ser parquet.
      • authentication: el campo de autenticación especifica el tipo y las credenciales para acceder a Microsoft Fabric OneLake. Solo puede ser systemAssignedManagedIdentity por ahora. Tiene un subcampo:
      • systemAssignedManagedIdentity: para usar la identidad administrada del sistema para la autenticación. Tiene un subcampo
        • audience: una cadena para la audiencia de tokens de identidad administrada y debe ser https://storage.azure.com.
    • adx: especifica la configuración y las propiedades de la base de datos de Azure Data Explorer. Tiene los siguientes subcampos:
      • endpoint: la dirección URL del punto de conexión del clúster de Azure Data Explorer, como https://<CLUSTER>.<REGION>.kusto.windows.net. No incluya ninguna barra diagonal final /.
      • authentication: el campo de autenticación especifica el tipo y las credenciales para acceder al clúster de Azure Data Explorer. Solo puede ser systemAssignedManagedIdentity por ahora. Tiene un subcampo:
        • systemAssignedManagedIdentity: para usar la identidad administrada del sistema para la autenticación. Tiene un subcampo
          • audience: una cadena para la audiencia de tokens de identidad administrada y debe ser https://api.kusto.windows.net.
    • localStorage: especifica la configuración y las propiedades de la cuenta de almacenamiento local. Tiene los siguientes subcampos:
      • volumeName: el nombre del volumen montado en cada uno de los pods del conector.
  • localBrokerConnection: se usa para invalidar la configuración de conexión predeterminada al MQTT broker de IoT MQ. Consulte Administrar la conexión de agente local.

DataLakeConnectorTopicMap

DataLakeConnectorTopicMap es un recurso personalizado de Kubernetes que define la asignación entre un tema MQTT y una tabla Delta en una cuenta de Data Lake Storage. Un recurso DataLakeConnectorTopicMap hace referencia a un recurso DataLakeConnector que se ejecuta en el mismo dispositivo perimetral e ingiere datos del tema MQTT en la tabla Delta.

El campo de especificación de un recurso DataLakeConnectorTopicMap contiene los siguientes subcampos:

  • dataLakeConnectorRef: el nombre del recurso DataLakeConnector al que pertenece este tema.
  • mapping: el campo de asignación especifica los detalles y las propiedades del tema MQTT y la tabla Delta. Tiene los siguientes subcampos:
    • allowedLatencySecs: la latencia máxima en segundos entre recibir un mensaje del tema MQTT e ingerirlo en la tabla Delta. Este campo es obligatorio.
    • clientId: un identificador único para el cliente MQTT que se suscribe al tema.
    • maxMessagesPerBatch: el número máximo de mensajes que se van a ingerir en un lote en la tabla Delta. Debido a una restricción temporal, este valor debe ser menor que 16 si qos está establecido en 1. Este campo es obligatorio.
    • messagePayloadType: tipo de carga que se envía al tema MQTT. Puede ser uno de json o avro (aún no se admite).
    • mqttSourceTopic: el nombre de los temas MQTT a los que suscribirse. Admite notación comodín del tema MQTT.
    • qos: la calidad del nivel de servicio para suscribirse al tema MQTT. Puede ser 0 o 1.
    • table: el campo de tabla especifica la configuración y las propiedades de la tabla Delta en la cuenta de Data Lake Storage. Tiene los siguientes subcampos:
      • tableName: nombre de la tabla Delta a la que se va a crear o anexar en la cuenta de Data Lake Storage. Este campo también se conoce como el nombre del contenedor cuando se usa con Azure Data Lake Storage Gen2. Puede contener cualquier minúscula letra en inglés y _de barra inferior, con una longitud de hasta 256 caracteres. No se permiten guiones - ni caracteres de espacio.
      • tablePath: el nombre de la base de datos de Azure Data Explorer cuando se utiliza el conector de tipo adx.
      • schema: el esquema de la tabla Delta, que debe coincidir con el formato y los campos de la carga del mensaje. Se trata de una matriz de objetos, cada uno con los subcampos siguientes:
        • name: el nombre de la columna de la tabla Delta.
        • format: tipo de datos de la columna de la tabla Delta. Puede ser boolean, int8, int16, int32, int64, uInt8, uInt16, uInt32, uInt64, float16, float32, float64, date32, timestamp, binary o utf8. Los tipos sin firmar, como uInt8, no son totalmente compatibles y se tratan como tipos firmados si se especifican aquí.
        • optional: un valor booleano que indica si la columna es opcional o necesaria. Este campo es opcional y el valor predeterminado es false.
        • mapping: expresión de ruta de acceso JSON que define cómo extraer el valor de la columna de la carga del mensaje MQTT. Las asignaciones integradas $client_id, $topic, $property y $received_time están disponibles para su uso como columnas a fin de enriquecer el JSON en el cuerpo del mensaje MQTT. Este campo es obligatorio. Use $property para las propiedades de usuario de MQTT. Por ejemplo, $properties.assetId representa el valor de la propiedad assetId del mensaje MQTT.

Este es un ejemplo de un recurso de DataLakeConnectorTopicMap:

apiVersion: mq.iotoperations.azure.com/v1beta1
kind: DataLakeConnectorTopicMap
metadata:
  name: datalake-topicmap
  namespace: azure-iot-operations
spec:
  dataLakeConnectorRef: my-datalake-connector
  mapping:
    allowedLatencySecs: 1
    messagePayloadType: json
    maxMessagesPerBatch: 10
    clientId: id
    mqttSourceTopic: azure-iot-operations/data/thermostat
    qos: 1
    table:
      tableName: thermostat
      schema:
      - name: externalAssetId
        format: utf8
        optional: false
        mapping: $property.externalAssetId
      - name: assetName
        format: utf8
        optional: false
        mapping: DataSetWriterName
      - name: CurrentTemperature
        format: float32
        optional: false
        mapping: Payload.temperature.Value
      - name: Pressure
        format: float32
        optional: true
        mapping: "Payload.Tag 10.Value"
      - name: Timestamp
        format: timestamp
        optional: false
        mapping: $received_time

No se admite un código JSON con cadenas como "{\"SequenceNumber\": 4697, \"Timestamp\": \"2024-04-02T22:36:03.1827681Z\", \"DataSetWriterName\": \"thermostat-de\", \"MessageType\": \"ua-deltaframe\", \"Payload\": {\"temperature\": {\"SourceTimestamp\": \"2024-04-02T22:36:02.6949717Z\", \"Value\": 5506}, \"Tag 10\": {\"SourceTimestamp\": \"2024-04-02T22:36:02.6949888Z\", \"Value\": 5506}}}" y hace que el conector inicie el error El convertidor encontró un valor nulo.

Un mensaje de ejemplo para el tema de azure-iot-operations/data/thermostat que funciona con este esquema:

{
  "SequenceNumber": 4697,
  "Timestamp": "2024-04-02T22:36:03.1827681Z",
  "DataSetWriterName": "thermostat",
  "MessageType": "ua-deltaframe",
  "Payload": {
    "temperature": {
      "SourceTimestamp": "2024-04-02T22:36:02.6949717Z",
      "Value": 5506
    },
    "Tag 10": {
      "SourceTimestamp": "2024-04-02T22:36:02.6949888Z",
      "Value": 5506
    }
  }
}

Que se asigna a:

externalAssetId assetName CurrentTemperature Presión mqttTopic timestamp
xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx thermostat-de 5506 5506 dlc 2024-04-02T22:36:03.1827681Z

Importante

Si se actualiza el esquema de datos, por ejemplo, se cambia un tipo de datos o se cambia un nombre, la transformación de los datos entrantes podría dejar de funcionar. Debe cambiar el nombre de la tabla de datos si se produce un cambio de esquema.

Delta o Parquet

Se admiten los formatos Delta y Parquet.

Administración de la conexión de agente local

Al igual que el puente MQTT, el conector de lago de datos actúa como un cliente para el MQTT broker de IoT MQ. Si ha personalizado el puerto de escucha o la autenticación del MQTT broker de IoT MQ, reemplace también la configuración de conexión MQTT local para el conector de lago de datos. Para más información, consulte conexión de agente local del puente MQTT.

Publicar y suscribir mensajes MQTT mediante Azure IoT MQ Preview