Envío de datos desde la versión preliminar de Azure IoT MQ a Data Lake Storage
Importante
Operaciones de IoT de Azure, habilitado por Azure Arc, está actualmente en VERSIÓN PRELIMINAR. No se debería usar este software en versión preliminar en entornos de producción.
Consulte Términos de uso complementarios para las versiones preliminares de Microsoft Azure para conocer los términos legales que se aplican a las características de Azure que se encuentran en la versión beta, en versión preliminar o que todavía no se han publicado para que estén disponibles con carácter general.
Puede usar el conector de lago de datos para enviar datos del agente de versión preliminar de Azure IoT MQ a un lago de datos, como Azure Data Lake Storage Gen2 (ADLSv2), Microsoft Fabric OneLake y Azure Data Explorer. El conector se suscribe a temas MQTT e ingiere los mensajes en tablas Delta en la cuenta de Data Lake Storage.
Requisitos previos
Una cuenta de Data Lake Storage en Azure con un contenedor y una carpeta para los datos. Para obtener más información sobre cómo crear una instancia de Data Lake Storage, use una de las siguientes opciones de inicio rápido:
- Inicio rápido de Microsoft Fabric OneLake:
- Cree un área de trabajo, ya que no se admite mi área de trabajo.
- Cree un almacén de lago.
- Inicio rápido de Azure Data Lake Storage Gen2:
- Crea una cuenta de almacenamiento para usar con Azure Data Lake Storage Gen2.
- Clúster de Azure Data Explorer:
- Siga los pasos del Clúster completo en el inicio rápido: Crear un clúster y una base de datos de Azure Data Explorer.
- Inicio rápido de Microsoft Fabric OneLake:
Un MQTT broker de IoT MQ. Para más información sobre cómo implementar un agente MQTT de IoT MQ, consulte Inicio rápido: Implementación de Azure IoT Operations Preview en un clúster de Kubernetes habilitado para Arc.
Configuración para enviar datos a Microsoft Fabric OneLake mediante la identidad administrada
Configure un conector de Data Lake para conectarse a Microsoft Fabric OneLake mediante la identidad administrada.
Asegúrese de que se cumplen los pasos descritos en los requisitos previos, incluida un área de trabajo de Microsoft Fabric y un almacén de lago de datos. No se puede usar el valor predeterminado mi área de trabajo.
Asegúrese de que la extensión de Arc de IoT MQ está instalada y configurada con identidad administrada.
En Azure Portal, ve al clúster de Kubernetes conectado a Arc y seleccione Configuración>Extensiones. En la lista de extensiones, busque el nombre de la extensión de IoT MQ. El nombre comienza con
mq-
seguido de cinco caracteres aleatorios. Por ejemplo, mq-4jgjs.Obtenga el id. de la aplicación asociado a la identidad administrada de la extensión de Arc IoT MQ y anote el valor del GUID. El id. de la aplicación es diferente del objeto o el id. de entidad de seguridad. Para usar la CLI de Azure, busque el id. de objeto de la identidad administrada y, a continuación, consulte el id. de la aplicación de la entidad de servicio asociada a la identidad administrada. Por ejemplo:
OBJECT_ID=$(az k8s-extension show --name <IOT_MQ_EXTENSION_NAME> --cluster-name <ARC_CLUSTER_NAME> --resource-group <RESOURCE_GROUP_NAME> --cluster-type connectedClusters --query identity.principalId -o tsv) az ad sp show --query appId --id $OBJECT_ID --output tsv
Debe obtener una salida con un valor GUID:
xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
Este GUID es el id. de la aplicación que debe usar en el paso siguiente.
En el área de trabajo de Microsoft Fabric, use Administrar acceso y seleccione +Agregar personas o grupos.
Busque la extensión de Arc IoT MQ por su nombre "mq" y asegúrese de seleccionar el valor GUID del identificador de aplicación que encontró en el paso anterior.
Seleccione Colaborador como rol y, a continuación, seleccione Agregar.
Cree un recurso DataLakeConnector que defina la configuración y la configuración del punto de conexión para el conector. Puede usar YAML proporcionado como ejemplo, pero asegúrese de cambiar los campos siguientes:
target.fabricOneLake.endpoint
: el punto de conexión de la cuenta de OneLake de Microsoft Fabric. Puede obtener la dirección URL del punto de conexión de Microsoft Fabric Lakehouse en Archivos>Propiedades. La dirección URL debe ser similar ahttps://onelake.dfs.fabric.microsoft.com
.target.fabricOneLake.names
: los nombres del área de trabajo y el almacén de lago de datos. Use este campo oguids
. No use ambos.workspaceName
: nombre del área de trabajo.lakehouseName
: el nombre del almacén de lago de datos.
apiVersion: mq.iotoperations.azure.com/v1beta1 kind: DataLakeConnector metadata: name: my-datalake-connector namespace: azure-iot-operations spec: protocol: v5 image: repository: mcr.microsoft.com/azureiotoperations/datalake tag: 0.4.0-preview pullPolicy: IfNotPresent instances: 2 logLevel: info databaseFormat: delta target: fabricOneLake: # Example: https://onelake.dfs.fabric.microsoft.com endpoint: <example-endpoint-url> names: workspaceName: <example-workspace-name> lakehouseName: <example-lakehouse-name> ## OR # guids: # workspaceGuid: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx # lakehouseGuid: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx fabricPath: tables authentication: systemAssignedManagedIdentity: audience: https://storage.azure.com/ localBrokerConnection: endpoint: aio-mq-dmqtt-frontend:8883 tls: tlsEnabled: true trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only authentication: kubernetes: {}
Cree un recurso DataLakeConnectorTopicMap que defina la asignación entre el tema MQTT y la tabla Delta en Data Lake Storage. Puede usar YAML proporcionado como ejemplo, pero asegúrese de cambiar los campos siguientes:
dataLakeConnectorRef
: el nombre del recurso DataLakeConnector que creó anteriormente.clientId
: un identificador único para el cliente MQTT.mqttSourceTopic
: el nombre del tema MQTT del que desea que provengan los datos.table.tableName
: el nombre de la tabla a la que desea anexar en el almacén de lago de datos. La tabla se crea automáticamente si no existe.table.schema
: el esquema de la tabla Delta que debe coincidir con el formato y los campos de los mensajes JSON que se envían al tema MQTT.
Aplique los recursos DataLakeConnector y DataLakeConnectorTopicMap al clúster de Kubernetes mediante
kubectl apply -f datalake-connector.yaml
.Empiece a enviar mensajes JSON al tema MQTT mediante el publicador MQTT. La instancia del conector de lago de datos se suscribe al tema e ingiere los mensajes en la tabla Delta.
Con un explorador, compruebe que los datos se importan en el almacén de lago de datos. En el área de trabajo de Microsoft Fabric, seleccione su instancia de almacén de lago de datos y, después, Tablas. Debería ver los datos en la tabla.
Tablo no identificado
Si los datos se muestran en la tabla no identificado:
La causa podría ser caracteres no admitidos en el nombre de la tabla. El nombre de la tabla debe ser un nombre de contenedor de Azure Storage válido, lo que significa que puede contener cualquier letra en inglés, mayúscula o minúscula, y la barra inferior _
, con una longitud de hasta 256 caracteres. No se permiten guiones -
ni caracteres de espacio.
Configuración para enviar datos a Azure Data Lake Storage Gen2 mediante el token de SAS
Configure un conector de lago de datos para conectarse a una cuenta de Azure Data Lake Storage Gen2 (ADLS Gen2) mediante un token de firma de acceso compartido (SAS).
Obtenga un token de SAS para una cuenta de Azure Data Lake Storage Gen2 (ADLS Gen2). Por ejemplo, use Azure Portal par la cuenta de almacenamiento. En el menú, en Seguridad y redes, seleccione Firma de acceso compartido. Use la tabla siguiente para establecer los permisos necesarios.
Parámetro Value Servicios permitidos Blob Tipos de recursos permitidos Objeto, contenedor Permisos permitidos Leer, escribir, eliminar, enumerar, agregar, crear Para optimizar los privilegios mínimos, también puede optar por obtener la SAS para un contenedor individual. Para evitar errores de autenticación, asegúrese de que el contenedor coincide con el valor
table.tableName
en la configuración del mapa del tema.Cree un secreto de Kubernetes con el token de SAS. No incluya el signo de interrogación
?
que podría estar al principio del token.kubectl create secret generic my-sas \ --from-literal=accessToken='sv=2022-11-02&ss=b&srt=c&sp=rwdlax&se=2023-07-22T05:47:40Z&st=2023-07-21T21:47:40Z&spr=https&sig=xDkwJUO....' \ -n azure-iot-operations
Cree un recurso DataLakeConnector que defina la configuración y la configuración del punto de conexión para el conector. Puede usar YAML proporcionado como ejemplo, pero asegúrese de cambiar los campos siguientes:
endpoint
: el punto de conexión de Data Lake Storage de la cuenta de almacenamiento de ADLSv2 en forma dehttps://example.blob.core.windows.net
. En Azure Portal, busque el punto de conexión en cuenta de Storage > Configuración > Puntos de conexión > Data Lake Storage.accessTokenSecretName
: nombre del secreto de Kubernetes que contiene el token de SAS (my-sas
del ejemplo anterior).
apiVersion: mq.iotoperations.azure.com/v1beta1 kind: DataLakeConnector metadata: name: my-datalake-connector namespace: azure-iot-operations spec: protocol: v5 image: repository: mcr.microsoft.com/azureiotoperations/datalake tag: 0.4.0-preview pullPolicy: IfNotPresent instances: 2 logLevel: "debug" databaseFormat: "delta" target: datalakeStorage: endpoint: "https://example.blob.core.windows.net" authentication: accessTokenSecretName: "my-sas" localBrokerConnection: endpoint: aio-mq-dmqtt-frontend:8883 tls: tlsEnabled: true trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only authentication: kubernetes: {}
Cree un recurso DataLakeConnectorTopicMap que defina la asignación entre el tema MQTT y la tabla Delta en Data Lake Storage. Puede usar YAML proporcionado como ejemplo, pero asegúrese de cambiar los campos siguientes:
dataLakeConnectorRef
: el nombre del recurso DataLakeConnector que creó anteriormente.clientId
: un identificador único para el cliente MQTT.mqttSourceTopic
: el nombre del tema MQTT del que desea que provengan los datos.table.tableName
: el nombre del contenedor al que desea anexar en Data Lake Storage. Si el token de SAS está en el ámbito de la cuenta, el contenedor se crea automáticamente si falta.table.schema
: el esquema de la tabla Delta, que debe coincidir con el formato y los campos de los mensajes JSON que se envían al tema MQTT.
Aplique los recursos de DataLakeConnector y DataLakeConnectorTopicMap al clúster de Kubernetes mediante
kubectl apply -f datalake-connector.yaml
.Empiece a enviar mensajes JSON al tema MQTT mediante el publicador MQTT. La instancia del conector de lago de datos se suscribe al tema e ingiere los mensajes en la tabla Delta.
Con Azure Portal, compruebe que se crea la tabla Delta. Los archivos se organizan por identificador de cliente, nombre de instancia del conector, tema MQTT y hora. En la cuenta de almacenamiento >Contenedores, abra el contenedor especificado en el DataLakeConnectorTopicMap. Compruebe que _delta_log existe y los archivos de Parquet muestran el tráfico MQTT. Abra un archivo de Parquet para confirmar que la carga coincide con lo que se envió y definió en el esquema.
Uso de la identidad administrada para la autenticación en ADLSv2
Para usar la identidad administrada, especifíquela como el único método en DataLakeConnector authentication
. Use az k8s-extension show
para buscar el identificador de entidad de seguridad de la extensión de Arc IoT MQ y, a continuación, asigne un rol a la identidad administrada que conceda permiso para escribir en la cuenta de almacenamiento, como colaborador de datos de Storage Blob. Para obtener más información, consulte Autorizar el acceso a blobs mediante Microsoft Entra ID.
authentication:
systemAssignedManagedIdentity:
audience: https://my-account.blob.core.windows.net
Configuración para enviar datos a Azure Data Explorer mediante la identidad administrada
Configure el conector de Data Lake para enviar datos a un punto de conexión de Azure Data Explorer mediante la identidad administrada.
Asegúrese de que se cumplen los pasos descritos en los requisitos previos, incluido un clúster completo de Azure Data Explorer. La opción "clúster libre" no funciona.
Una vez creado el clúster, cree una base de datos para almacenar los datos.
Puede crear una tabla para los datos especificados a través de Azure Portal y crear columnas manualmente, o puede usar KQL en la pestaña de consulta. Por ejemplo:
.create table thermostat ( externalAssetId: string, assetName: string, CurrentTemperature: real, Pressure: real, MqttTopic: string, Timestamp: datetime )
Habilitar ingesta con streaming
Habilite la ingesta con streaming en la tabla y la base de datos. En la pestaña de consulta, ejecute el comando siguiente, sustituyendo <DATABASE_NAME>
por el nombre de la base de datos:
.alter database <DATABASE_NAME> policy streamingingestion enable
Incorporación de la identidad administrada al clúster de Azure Data Explorer
Para que el conector se autentique en Azure Data Explorer, debe agregar la identidad administrada al clúster de Azure Data Explorer.
- En Azure Portal, ve al clúster de Kubernetes conectado a Arc y seleccione Configuración>Extensiones. En la lista de extensiones, busque el nombre de la extensión de IoT MQ. El nombre comienza con
mq-
seguido de cinco caracteres aleatorios. Por ejemplo, mq-4jgjs. El nombre de la extensión de IoT MQ es el mismo que el nombre de la identidad administrada de MQ. - En la base de datos de Azure Data Explorer, seleccione Permisos>Agregar>Agente de ingesta. Busque el nombre de la identidad administrada de MQ y agréguelo.
Para obtener más información sobre cómo agregar permisos, consulte Administración de permisos de clúster de Azure Data Explorer.
Ahora está listo para implementar el conector y enviar datos a Azure Data Explorer.
Archivo de implementación de ejemplo
Archivo de implementación de ejemplo para el conector de Azure Data Explorer. Los comentarios que comienzan con TODO
requieren que reemplace la configuración del marcador de posición por su información.
apiVersion: mq.iotoperations.azure.com/v1beta1
name: my-adx-connector
namespace: azure-iot-operations
spec:
repository: mcr.microsoft.com/azureiotoperations/datalake
tag: 0.4.0-preview
pullPolicy: Always
databaseFormat: adx
target:
# TODO: insert the ADX cluster endpoint
endpoint: https://<CLUSTER>.<REGION>.kusto.windows.net
authentication:
systemAssignedManagedIdentity:
audience: https://api.kusto.windows.net
localBrokerConnection:
endpoint: aio-mq-dmqtt-frontend:8883
tls:
tlsEnabled: true
trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only
authentication:
kubernetes: {}
---
apiVersion: mq.iotoperations.azure.com/v1beta1
kind: DataLakeConnectorTopicMap
metadata:
name: adx-topicmap
namespace: azure-iot-operations
spec:
mapping:
allowedLatencySecs: 1
messagePayloadType: json
maxMessagesPerBatch: 10
clientId: id
mqttSourceTopic: azure-iot-operations/data/thermostat
qos: 1
table:
# TODO: add DB and table name
tablePath: <DATABASE_NAME>
tableName: <TABLE_NAME>
schema:
- name: externalAssetId
format: utf8
optional: false
mapping: $property.externalAssetId
- name: assetName
format: utf8
optional: false
mapping: DataSetWriterName
- name: CurrentTemperature
format: float32
optional: false
mapping: Payload.temperature.Value
- name: Pressure
format: float32
optional: true
mapping: "Payload.Tag 10.Value"
- name: MqttTopic
format: utf8
optional: false
mapping: $topic
- name: Timestamp
format: timestamp
optional: false
mapping: $received_time
En este ejemplo, se aceptan datos del tema de azure-iot-operations/data/thermostat
con mensajes en formato JSON, como los siguientes:
{
"SequenceNumber": 4697,
"Timestamp": "2024-04-02T22:36:03.1827681Z",
"DataSetWriterName": "thermostat",
"MessageType": "ua-deltaframe",
"Payload": {
"temperature": {
"SourceTimestamp": "2024-04-02T22:36:02.6949717Z",
"Value": 5506
},
"Tag 10": {
"SourceTimestamp": "2024-04-02T22:36:02.6949888Z",
"Value": 5506
}
}
}
DataLakeConnector
Un DataLakeConnector es un recurso personalizado de Kubernetes que define la configuración y las propiedades de una instancia del conector de lago de datos. Un conector de lago de datos ingiere datos de temas MQTT en tablas Delta en una cuenta de Data Lake Storage.
El campo de especificación de un recurso de DataLakeConnector contiene los siguientes subcampos:
protocol
: la versión MQTT. Puede serv5
ov3
.image
: el campo de imagen especifica la imagen de contenedor del módulo del conector de lago de datos. Tiene los siguientes subcampos:repository
: el nombre del registro de contenedor y el repositorio donde se almacena la imagen.tag
: etiqueta de la imagen que se va a usar.pullPolicy
: directiva de extracción de la imagen. Puede serAlways
,IfNotPresent
oNever
.
instances
: número de réplicas del conector de lago de datos que se va a ejecutar.logLevel
: nivel de registro del módulo del conector de lago de datos. Puede sertrace
,debug
,info
,warn
,error
ofatal
.databaseFormat
: el formato de los datos que se van a ingerir en Data Lake Storage. Puede serdelta
oparquet
.target
: el campo de destino especifica el destino de la ingesta de datos. Puede serdatalakeStorage
,fabricOneLake
,adx
olocalStorage
.datalakeStorage
: especifica la configuración y las propiedades de la cuenta de ADLSv2. Tiene los siguientes subcampos:endpoint
: la dirección URL del punto de conexión de la cuenta de Data Lake Storage. No incluya ninguna barra diagonal final/
.authentication
: el campo de autenticación especifica el tipo y las credenciales para acceder a la cuenta de Data Lake Storage. Puede tener uno de los valores siguientes.accessTokenSecretName
: el nombre del secreto de Kubernetes para usar la autenticación de token de acceso compartido para la cuenta de Data Lake Storage. Este campo es necesario si el tipo esaccessToken
.systemAssignedManagedIdentity
: para usar la identidad administrada del sistema para la autenticación. Tiene un subcampoaudience
: una cadena en forma dehttps://<my-account-name>.blob.core.windows.net
para la audiencia de tokens de identidad administrada con ámbito del nivel de cuenta ohttps://storage.azure.com
para cualquier cuenta de almacenamiento.
fabricOneLake
: especifica la configuración y las propiedades de Microsoft Fabric OneLake. Tiene los siguientes subcampos:endpoint
: la dirección URL del punto de conexión de OneLake de Microsoft Fabric. Por lo general, eshttps://onelake.dfs.fabric.microsoft.com
porque es el punto de conexión global de OneLake. Si usa un punto de conexión regional, se encuentra en forma dehttps://<region>-onelake.dfs.fabric.microsoft.com
. No incluya ninguna barra diagonal final/
. Para obtener más información, consulte Conexión a Microsoft OneLake.names
: especifica los nombres del área de trabajo y el almacén de lago de datos. Use este campo oguids
. No use ambos. Tiene los siguientes subcampos:workspaceName
: nombre del área de trabajo.lakehouseName
: el nombre del almacén de lago de datos.
guids
: especifica los GUID del área de trabajo y el almacén de lago de datos. Use este campo onames
. No use ambos. Tiene los siguientes subcampos:workspaceGuid
: GUID del área de trabajo.lakehouseGuid
: el GUID del almacén de lago de datos.
fabricPath
: la ubicación de los datos en el área de trabajo de Fabric. Puede sertables
ofiles
. Si estables
, los datos se almacenan en Fabric OneLake como tablas. Si esfiles
, los datos se almacenan en Fabric OneLake como archivos. Si esfiles
, eldatabaseFormat
debe serparquet
.authentication
: el campo de autenticación especifica el tipo y las credenciales para acceder a Microsoft Fabric OneLake. Solo puede sersystemAssignedManagedIdentity
por ahora. Tiene un subcampo:systemAssignedManagedIdentity
: para usar la identidad administrada del sistema para la autenticación. Tiene un subcampoaudience
: una cadena para la audiencia de tokens de identidad administrada y debe serhttps://storage.azure.com
.
adx
: especifica la configuración y las propiedades de la base de datos de Azure Data Explorer. Tiene los siguientes subcampos:endpoint
: la dirección URL del punto de conexión del clúster de Azure Data Explorer, comohttps://<CLUSTER>.<REGION>.kusto.windows.net
. No incluya ninguna barra diagonal final/
.authentication
: el campo de autenticación especifica el tipo y las credenciales para acceder al clúster de Azure Data Explorer. Solo puede sersystemAssignedManagedIdentity
por ahora. Tiene un subcampo:systemAssignedManagedIdentity
: para usar la identidad administrada del sistema para la autenticación. Tiene un subcampoaudience
: una cadena para la audiencia de tokens de identidad administrada y debe serhttps://api.kusto.windows.net
.
localStorage
: especifica la configuración y las propiedades de la cuenta de almacenamiento local. Tiene los siguientes subcampos:volumeName
: el nombre del volumen montado en cada uno de los pods del conector.
localBrokerConnection
: se usa para invalidar la configuración de conexión predeterminada al MQTT broker de IoT MQ. Consulte Administrar la conexión de agente local.
DataLakeConnectorTopicMap
DataLakeConnectorTopicMap es un recurso personalizado de Kubernetes que define la asignación entre un tema MQTT y una tabla Delta en una cuenta de Data Lake Storage. Un recurso DataLakeConnectorTopicMap hace referencia a un recurso DataLakeConnector que se ejecuta en el mismo dispositivo perimetral e ingiere datos del tema MQTT en la tabla Delta.
El campo de especificación de un recurso DataLakeConnectorTopicMap contiene los siguientes subcampos:
dataLakeConnectorRef
: el nombre del recurso DataLakeConnector al que pertenece este tema.mapping
: el campo de asignación especifica los detalles y las propiedades del tema MQTT y la tabla Delta. Tiene los siguientes subcampos:allowedLatencySecs
: la latencia máxima en segundos entre recibir un mensaje del tema MQTT e ingerirlo en la tabla Delta. Este campo es obligatorio.clientId
: un identificador único para el cliente MQTT que se suscribe al tema.maxMessagesPerBatch
: el número máximo de mensajes que se van a ingerir en un lote en la tabla Delta. Debido a una restricción temporal, este valor debe ser menor que 16 siqos
está establecido en 1. Este campo es obligatorio.messagePayloadType
: tipo de carga que se envía al tema MQTT. Puede ser uno dejson
oavro
(aún no se admite).mqttSourceTopic
: el nombre de los temas MQTT a los que suscribirse. Admite notación comodín del tema MQTT.qos
: la calidad del nivel de servicio para suscribirse al tema MQTT. Puede ser 0 o 1.table
: el campo de tabla especifica la configuración y las propiedades de la tabla Delta en la cuenta de Data Lake Storage. Tiene los siguientes subcampos:tableName
: nombre de la tabla Delta a la que se va a crear o anexar en la cuenta de Data Lake Storage. Este campo también se conoce como el nombre del contenedor cuando se usa con Azure Data Lake Storage Gen2. Puede contener cualquier minúscula letra en inglés y_
de barra inferior, con una longitud de hasta 256 caracteres. No se permiten guiones-
ni caracteres de espacio.tablePath
: el nombre de la base de datos de Azure Data Explorer cuando se utiliza el conector de tipoadx
.schema
: el esquema de la tabla Delta, que debe coincidir con el formato y los campos de la carga del mensaje. Se trata de una matriz de objetos, cada uno con los subcampos siguientes:name
: el nombre de la columna de la tabla Delta.format
: tipo de datos de la columna de la tabla Delta. Puede serboolean
,int8
,int16
,int32
,int64
,uInt8
,uInt16
,uInt32
,uInt64
,float16
,float32
,float64
,date32
,timestamp
,binary
outf8
. Los tipos sin firmar, comouInt8
, no son totalmente compatibles y se tratan como tipos firmados si se especifican aquí.optional
: un valor booleano que indica si la columna es opcional o necesaria. Este campo es opcional y el valor predeterminado es false.mapping
: expresión de ruta de acceso JSON que define cómo extraer el valor de la columna de la carga del mensaje MQTT. Las asignaciones integradas$client_id
,$topic
,$property
y$received_time
están disponibles para su uso como columnas a fin de enriquecer el JSON en el cuerpo del mensaje MQTT. Este campo es obligatorio. Use $property para las propiedades de usuario de MQTT. Por ejemplo, $properties.assetId representa el valor de la propiedad assetId del mensaje MQTT.
Este es un ejemplo de un recurso de DataLakeConnectorTopicMap:
apiVersion: mq.iotoperations.azure.com/v1beta1
kind: DataLakeConnectorTopicMap
metadata:
name: datalake-topicmap
namespace: azure-iot-operations
spec:
dataLakeConnectorRef: my-datalake-connector
mapping:
allowedLatencySecs: 1
messagePayloadType: json
maxMessagesPerBatch: 10
clientId: id
mqttSourceTopic: azure-iot-operations/data/thermostat
qos: 1
table:
tableName: thermostat
schema:
- name: externalAssetId
format: utf8
optional: false
mapping: $property.externalAssetId
- name: assetName
format: utf8
optional: false
mapping: DataSetWriterName
- name: CurrentTemperature
format: float32
optional: false
mapping: Payload.temperature.Value
- name: Pressure
format: float32
optional: true
mapping: "Payload.Tag 10.Value"
- name: Timestamp
format: timestamp
optional: false
mapping: $received_time
No se admite un código JSON con cadenas como "{\"SequenceNumber\": 4697, \"Timestamp\": \"2024-04-02T22:36:03.1827681Z\", \"DataSetWriterName\": \"thermostat-de\", \"MessageType\": \"ua-deltaframe\", \"Payload\": {\"temperature\": {\"SourceTimestamp\": \"2024-04-02T22:36:02.6949717Z\", \"Value\": 5506}, \"Tag 10\": {\"SourceTimestamp\": \"2024-04-02T22:36:02.6949888Z\", \"Value\": 5506}}}"
y hace que el conector inicie el error El convertidor encontró un valor nulo.
Un mensaje de ejemplo para el tema de azure-iot-operations/data/thermostat
que funciona con este esquema:
{
"SequenceNumber": 4697,
"Timestamp": "2024-04-02T22:36:03.1827681Z",
"DataSetWriterName": "thermostat",
"MessageType": "ua-deltaframe",
"Payload": {
"temperature": {
"SourceTimestamp": "2024-04-02T22:36:02.6949717Z",
"Value": 5506
},
"Tag 10": {
"SourceTimestamp": "2024-04-02T22:36:02.6949888Z",
"Value": 5506
}
}
}
Que se asigna a:
externalAssetId | assetName | CurrentTemperature | Presión | mqttTopic | timestamp |
---|---|---|---|---|---|
xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx | thermostat-de | 5506 | 5506 | dlc | 2024-04-02T22:36:03.1827681Z |
Importante
Si se actualiza el esquema de datos, por ejemplo, se cambia un tipo de datos o se cambia un nombre, la transformación de los datos entrantes podría dejar de funcionar. Debe cambiar el nombre de la tabla de datos si se produce un cambio de esquema.
Delta o Parquet
Se admiten los formatos Delta y Parquet.
Administración de la conexión de agente local
Al igual que el puente MQTT, el conector de lago de datos actúa como un cliente para el MQTT broker de IoT MQ. Si ha personalizado el puerto de escucha o la autenticación del MQTT broker de IoT MQ, reemplace también la configuración de conexión MQTT local para el conector de lago de datos. Para más información, consulte conexión de agente local del puente MQTT.
Contenido relacionado
Publicar y suscribir mensajes MQTT mediante Azure IoT MQ Preview