Enviar dados da Versão preliminar do MQ da Internet das Coisas do Azure para o Data Lake Storage
Importante
O recurso Pré-visualização de Operações do Azure IoT — habilitado pelo Azure Arc — está atualmente em VERSÃO PRÉVIA. Você não deve usar esse software em versão prévia em ambientes de produção.
Veja os Termos de Uso Complementares para Versões Prévias do Microsoft Azure para obter termos legais que se aplicam aos recursos do Azure que estão em versão beta, versão prévia ou que, de outra forma, ainda não foram lançados em disponibilidade geral.
Você pode usar o conector do data lake para enviar dados do agente da versão prévia do MQ da Internet das Coisas do Azure para um data lake, como o Azure Data Lake Storage Gen2 (ADLSv2), o Microsoft Fabric OneLake e o Azure Data Explorer. O conector assina os tópicos do MQTT e ingere as mensagens em tabelas Delta na conta do Data Lake Storage.
Pré-requisitos
Uma conta do Data Lake Storage no Azure com um contêiner e uma pasta para seus dados. Para obter mais informações sobre como criar um Data Lake Storage, use uma das seguintes opções de início rápido:
- Início rápido do Microsoft Fabric OneLake:
- Crie um workspace, pois o meu workspace padrão não tem suporte.
- Criar um lakehouse.
- Início rápido do Azure Data Lake Storage Gen2:
- Crie uma conta de armazenamento para usar com o Azure Data Lake Storage Gen2.
- Cluster do Azure Data Explorer:
- Siga as etapas de Cluster completo em Início Rápido: Criar um cluster do Azure Data Explorer e um banco de dados.
- Início rápido do Microsoft Fabric OneLake:
Um agente MQTT do MQ da Internet das Coisas. Para obter mais informações sobre como implantar um agente MQTT do MQ da Internet das Coisas, consulte Início Rápido: Implantar a versão prévia das Operações do Azure IoT em um cluster do Kubernetes habilitado para Arc.
Configurar para enviar dados ao Microsoft Fabric OneLake usando a identidade gerenciada
Configure um conector data lake para se conectar ao Microsoft Fabric OneLake usando a identidade gerenciada.
Verifique se as etapas nos pré-requisitos são atendidas, incluindo um workspace do Microsoft Fabric e um lakehouse. O meu workspace padrão não pode ser usado.
Verifique se a extensão do IoT MQ Arc está instalada e configurada com a identidade gerenciada.
No portal do Azure, acesse o cluster do Kubernetes conectado ao Azure Arc e selecione Configurações>Extensões. Na lista de extensões, procure o nome da extensão do IoT MQ. O nome começa com
mq-
seguido por cinco caracteres aleatórios. Por exemplo, mq-4jgjs.Obtenha a ID do aplicativo associada à identidade gerenciada da extensão do IoT MQ Arc e anote o valor do GUID. A ID do aplicativo é diferente da ID do objeto ou da entidade de segurança. Você pode usar a CLI do Azure localizando a ID do objeto da identidade gerenciada e consultando a ID do aplicativo da entidade de serviço associada à identidade gerenciada. Por exemplo:
OBJECT_ID=$(az k8s-extension show --name <IOT_MQ_EXTENSION_NAME> --cluster-name <ARC_CLUSTER_NAME> --resource-group <RESOURCE_GROUP_NAME> --cluster-type connectedClusters --query identity.principalId -o tsv) az ad sp show --query appId --id $OBJECT_ID --output tsv
Você deve obter uma saída com um valor GUID:
xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
Esse GUID é a ID do aplicativo que você precisa usar na próxima etapa.
No workspace do Microsoft Fabric, use Gerenciar acesso e selecione + Adicionar pessoas ou grupos.
Pesquise a extensão do Arc MQ da Internet das Coisas pelo nome "mq" e selecione o valor GUID da ID do aplicativo encontrado na etapa anterior.
Selecione Colaborador como a função e, em seguida, selecione Adicionar.
Crie um recurso DataLakeConnector que defina as configurações e as configurações de ponto de extremidade para o conector. Você pode usar o YAML fornecido como exemplo, mas certifique-se de alterar os seguintes campos:
target.fabricOneLake.endpoint
: o ponto de extremidade da conta do Microsoft Fabric OneLake. Você pode obter a URL do ponto de extremidade do Microsoft Fabric lakehouse em Propriedadesde Arquivos>. A URL deve ter a seguinte aparência:https://onelake.dfs.fabric.microsoft.com
.target.fabricOneLake.names
: Os nomes do workspace e da lakehouse. Use este campo ouguids
. Não use ambos.workspaceName
: nome do workspace.lakehouseName
: O nome da lakehouse.
apiVersion: mq.iotoperations.azure.com/v1beta1 kind: DataLakeConnector metadata: name: my-datalake-connector namespace: azure-iot-operations spec: protocol: v5 image: repository: mcr.microsoft.com/azureiotoperations/datalake tag: 0.4.0-preview pullPolicy: IfNotPresent instances: 2 logLevel: info databaseFormat: delta target: fabricOneLake: # Example: https://onelake.dfs.fabric.microsoft.com endpoint: <example-endpoint-url> names: workspaceName: <example-workspace-name> lakehouseName: <example-lakehouse-name> ## OR # guids: # workspaceGuid: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx # lakehouseGuid: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx fabricPath: tables authentication: systemAssignedManagedIdentity: audience: https://storage.azure.com/ localBrokerConnection: endpoint: aio-mq-dmqtt-frontend:8883 tls: tlsEnabled: true trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only authentication: kubernetes: {}
Crie um recurso DataLakeConnectorTopicMap que define o mapeamento entre o tópico MQTT e a tabela Delta no Data Lake Storage. Você pode usar o YAML fornecido como exemplo, mas certifique-se de alterar os seguintes campos:
dataLakeConnectorRef
: O nome do recurso DataLakeConnector que você criou anteriormente.clientId
: Um identificador exclusivo para seu cliente MQTT.mqttSourceTopic
: O nome do tópico MQTT do qual você deseja que os dados sejam provenientes.table.tableName
: O nome da tabela à qual você deseja acrescentar na lakehouse. A tabela será criada automaticamente se não existir.table.schema
: O esquema da tabela Delta que deve corresponder ao formato e campos das mensagens JSON que você envia para o tópico MQTT.
Aplique os recursos DataLakeConnector e DataLakeConnectorTopicMap ao cluster do Kubernetes usando
kubectl apply -f datalake-connector.yaml
.Comece a enviar mensagens JSON para o tópico MQTT usando seu editor MQTT. A instância do conector do data lake assina o tópico e ingere as mensagens na tabela Delta.
Usando um navegador, verifique se os dados são importados para o lakehouse. No workspace do Microsoft Fabric, selecione seu lakehouse e, em seguida, Tabelas. Você deve ver os dados na tabela.
Tabela não identificada
Se os dados forem exibidos na tabela Não Identificada:
A causa pode ser caracteres sem suporte no nome da tabela. O nome da tabela deve ser um nome de contêiner válido do Armazenamento do Microsoft Azure, o que significa que ele pode conter qualquer letra em inglês, maiúsculas ou minúsculas e barra inferior _
, com comprimento de até 256 caracteres. Não são permitidos traços -
ou caracteres de espaço.
Configurar para enviar dados ao Azure Data Lake Storage Gen2 usando o token SAS
Configure um conector data lake para se conectar a uma conta do ADLS Gen2 (Azure Data Lake Storage Gen2) usando um token SAS (assinatura de acesso compartilhado).
Obtenha um token SAS para uma conta do Azure Data Lake Storage Gen2 (ADLS Gen2). Por exemplo, use o portal do Azure para navegar até sua conta de armazenamento. No menu em Segurança + rede, escolha Assinatura de acesso compartilhado. Use a tabela a seguir para definir as permissões necessárias.
Parâmetro Valor Serviços permitidos Blob Tipos de recursos permitidos Objeto, Contêiner Permissões aceitas Ler, Gravar, Excluir, Listar, Criar Para otimizar para privilégios mínimos, você também pode optar por obter a SAS para um contêiner individual. Para evitar erros de autenticação, verifique se o contêiner corresponde ao valor
table.tableName
na configuração do mapa de tópicos.Crie um segredo do Kubernetes com o token SAS. Não inclua o ponto de interrogação
?
que pode estar no início do token.kubectl create secret generic my-sas \ --from-literal=accessToken='sv=2022-11-02&ss=b&srt=c&sp=rwdlax&se=2023-07-22T05:47:40Z&st=2023-07-21T21:47:40Z&spr=https&sig=xDkwJUO....' \ -n azure-iot-operations
Crie um recurso DataLakeConnector que defina as configurações e as configurações de ponto de extremidade para o conector. Você pode usar o YAML fornecido como exemplo, mas certifique-se de alterar os seguintes campos:
endpoint
: O ponto de extremidade do Data Lake Storage da conta de armazenamento do ADLSv2 na forma dehttps://example.blob.core.windows.net
. No portal do Azure, localize o ponto de extremidade em Conta de Armazenamento > Configurações > Pontos de extremidade > Data Lake Storage.accessTokenSecretName
: Nome do segredo do Kubernetes que contém o token SAS (my-sas
do exemplo anterior).
apiVersion: mq.iotoperations.azure.com/v1beta1 kind: DataLakeConnector metadata: name: my-datalake-connector namespace: azure-iot-operations spec: protocol: v5 image: repository: mcr.microsoft.com/azureiotoperations/datalake tag: 0.4.0-preview pullPolicy: IfNotPresent instances: 2 logLevel: "debug" databaseFormat: "delta" target: datalakeStorage: endpoint: "https://example.blob.core.windows.net" authentication: accessTokenSecretName: "my-sas" localBrokerConnection: endpoint: aio-mq-dmqtt-frontend:8883 tls: tlsEnabled: true trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only authentication: kubernetes: {}
Crie um recurso DataLakeConnectorTopicMap que define o mapeamento entre o tópico MQTT e a tabela Delta no Data Lake Storage. Você pode usar o YAML fornecido como exemplo, mas certifique-se de alterar os seguintes campos:
dataLakeConnectorRef
: O nome do recurso DataLakeConnector que você criou anteriormente.clientId
: Um identificador exclusivo para seu cliente MQTT.mqttSourceTopic
: O nome do tópico MQTT do qual você deseja que os dados sejam provenientes.table.tableName
: O nome do contêiner ao qual você deseja acrescentar no Data Lake Storage. Se o token SAS estiver no escopo da conta, o contêiner será criado automaticamente se estiver ausente.table.schema
: O esquema da tabela Delta, que deve corresponder ao formato e campos das mensagens JSON que você envia para o tópico MQTT.
Aplique os recursos DataLakeConnector e DataLakeConnectorTopicMap ao cluster de Kubernetes usando
kubectl apply -f datalake-connector.yaml
.Comece a enviar mensagens JSON para o tópico MQTT usando seu editor MQTT. A instância do conector do data lake assina o tópico e ingere as mensagens na tabela Delta.
Usando o portal do Azure, verifique se a tabela Delta foi criada. Os arquivos são organizados por ID do cliente, nome da instância do conector, tópico MQTT e hora. Em >Contêineres da sua conta de armazenamento, abra o contêiner especificado no DataLakeConnectorTopicMap. Verifique se _delta_log existe e os arquivos parque mostram o tráfego MQTT. Abra um arquivo parque para confirmar se a carga corresponde ao que foi enviado e definido no esquema.
Usar a identidade gerenciada para autenticação no ADLSv2
Para usar a identidade gerenciada, especifique-a como o único método em DataLakeConnector authentication
. Use az k8s-extension show
para localizar a ID principal da extensão do Arc do MQ da Internet das Coisas e, em seguida, atribua uma função à identidade gerenciada que concede permissão para gravar na conta de armazenamento, como Colaborador de Dados de Blobs de Armazenamento. Para saber mais, confira Autorizar o acesso a blobs usando o Microsoft Entra ID.
authentication:
systemAssignedManagedIdentity:
audience: https://my-account.blob.core.windows.net
Configurar para enviar dados para o Azure Data Explorer usando a identidade gerenciada
Configure o conector do data lake para enviar dados ao ponto de extremidade do Azure Data Explorer usando a identidade gerenciada.
Verifique se as etapas nos pré-requisitos são atendidas, incluindo um cluster completo do Azure Data Explorer. A opção "cluster gratuito" não funciona.
Depois que o cluster for criado, crie um banco de dados para armazenar seus dados.
Você pode criar uma tabela para determinados dados por meio do portal do Azure e criar colunas manualmente ou pode usar KQL na guia de consulta. Por exemplo:
.create table thermostat ( externalAssetId: string, assetName: string, CurrentTemperature: real, Pressure: real, MqttTopic: string, Timestamp: datetime )
Habilitar a ingestão de streaming
Habilite a ingestão de streaming em sua tabela e banco de dados. Na guia de consulta, execute o seguinte comando, substituindo <DATABASE_NAME>
pelo nome do banco de dados:
.alter database <DATABASE_NAME> policy streamingingestion enable
Adicionar a identidade gerenciada ao cluster do Azure Data Explorer
Para que o conector se autentique no Azure Data Explorer, você deve adicionar a identidade gerenciada ao cluster do Azure Data Explorer.
- No portal do Azure, acesse o cluster do Kubernetes conectado ao Azure Arc e selecione Configurações>Extensões. Na lista de extensões, procure o nome da extensão do IoT MQ. O nome começa com
mq-
seguido por cinco caracteres aleatórios. Por exemplo, mq-4jgjs. O nome da extensão do IoT MQ é o mesmo que o nome da identidade gerenciada do MQ. - No banco de dados do Azure Data Explorer, selecione Permissões>Adicionar>Ingestor. Pesquise o nome da identidade gerenciada do MQ e adicione-o.
Para obter mais informações sobre como adicionar permissões, consulte Gerenciar permissões de cluster do Azure Data Explorer.
Agora, você está pronto para implantar o conector e enviar dados para o Azure Data Explorer.
Exemplo de arquivo de implantação
Arquivo de implantação de exemplo para o conector do Azure Data Explorer. Comentários que começam com TODO
exigem que você substitua as configurações de espaço reservado por suas informações.
apiVersion: mq.iotoperations.azure.com/v1beta1
name: my-adx-connector
namespace: azure-iot-operations
spec:
repository: mcr.microsoft.com/azureiotoperations/datalake
tag: 0.4.0-preview
pullPolicy: Always
databaseFormat: adx
target:
# TODO: insert the ADX cluster endpoint
endpoint: https://<CLUSTER>.<REGION>.kusto.windows.net
authentication:
systemAssignedManagedIdentity:
audience: https://api.kusto.windows.net
localBrokerConnection:
endpoint: aio-mq-dmqtt-frontend:8883
tls:
tlsEnabled: true
trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only
authentication:
kubernetes: {}
---
apiVersion: mq.iotoperations.azure.com/v1beta1
kind: DataLakeConnectorTopicMap
metadata:
name: adx-topicmap
namespace: azure-iot-operations
spec:
mapping:
allowedLatencySecs: 1
messagePayloadType: json
maxMessagesPerBatch: 10
clientId: id
mqttSourceTopic: azure-iot-operations/data/thermostat
qos: 1
table:
# TODO: add DB and table name
tablePath: <DATABASE_NAME>
tableName: <TABLE_NAME>
schema:
- name: externalAssetId
format: utf8
optional: false
mapping: $property.externalAssetId
- name: assetName
format: utf8
optional: false
mapping: DataSetWriterName
- name: CurrentTemperature
format: float32
optional: false
mapping: Payload.temperature.Value
- name: Pressure
format: float32
optional: true
mapping: "Payload.Tag 10.Value"
- name: MqttTopic
format: utf8
optional: false
mapping: $topic
- name: Timestamp
format: timestamp
optional: false
mapping: $received_time
Este exemplo aceita dados do tópico azure-iot-operations/data/thermostat
com mensagens no formato JSON, como o seguinte:
{
"SequenceNumber": 4697,
"Timestamp": "2024-04-02T22:36:03.1827681Z",
"DataSetWriterName": "thermostat",
"MessageType": "ua-deltaframe",
"Payload": {
"temperature": {
"SourceTimestamp": "2024-04-02T22:36:02.6949717Z",
"Value": 5506
},
"Tag 10": {
"SourceTimestamp": "2024-04-02T22:36:02.6949888Z",
"Value": 5506
}
}
}
DataLakeConnector
Um DataLakeConnector é um recurso personalizado do Kubernetes que define a configuração e as propriedades de uma instância do conector data lake. Um conector data lake ingere dados de tópicos MQTT em tabelas Delta em uma conta do Data Lake Storage.
O campo de especificação de um recurso DataLakeConnector contém os seguintes subcampos:
protocol
: A versão do MQTT. Pode serv5
ouv3
.image
: O campo de imagem especifica a imagem de contêiner do módulo do conector do data lake. Ele tem os seguintes subcampos:repository
: O nome do registro de contêiner e do repositório em que a imagem é armazenada.tag
: A marca da imagem a ser usada.pullPolicy
: A política de pull para a imagem. Pode serAlways
,IfNotPresent
ouNever
.
instances
: O número de réplicas do conector data lake a ser executado.logLevel
: O nível de log do módulo do conector do data lake. Pode sertrace
,debug
,info
,warn
,error
oufatal
.databaseFormat
: O formato dos dados a serem ingeridos no Data Lake Storage. Pode serdelta
ouparquet
.target
: O campo de destino especifica o destino da ingestão de dados. Pode serdatalakeStorage
,fabricOneLake
,adx
oulocalStorage
.datalakeStorage
: especifica a configuração e as propriedades da conta do ADLSv2. Ele tem os seguintes subcampos:endpoint
: A URL do ponto de extremidade da conta do Data Lake Storage. Não inclua nenhuma barra à direita/
.authentication
: O campo de autenticação especifica o tipo e as credenciais para acessar a conta do Data Lake Storage. Pode ser um dos seguintes.accessTokenSecretName
: O nome do segredo do Kubernetes para usar a autenticação de token de acesso compartilhado para a conta do Data Lake Storage. Esse campo será necessário se o tipo foraccessToken
.systemAssignedManagedIdentity
: Para usar a identidade gerenciada do sistema para autenticação. Ele tem um subcampoaudience
: Uma cadeia de caracteres na forma dehttps://<my-account-name>.blob.core.windows.net
para o público de token de identidade gerenciada com escopo para o nível da conta ouhttps://storage.azure.com
para qualquer conta de armazenamento.
fabricOneLake
: Especifica a configuração e as propriedades do Microsoft Fabric OneLake. Ele tem os seguintes subcampos:endpoint
: A URL do ponto de extremidade do Microsoft Fabric OneLake. Geralmente éhttps://onelake.dfs.fabric.microsoft.com
porque esse é o ponto de extremidade global do OneLake. Se você estiver usando um ponto de extremidade regional, ele estará na forma dehttps://<region>-onelake.dfs.fabric.microsoft.com
. Não inclua nenhuma barra à direita/
. Para saber mais, confira Conectar-se ao Microsoft OneLake.names
: Especifica os nomes do workspace e da lakehouse. Use este campo ouguids
. Não use ambos. Ele tem os seguintes subcampos:workspaceName
: nome do workspace.lakehouseName
: O nome da lakehouse.
guids
: Especifica os GUIDs do workspace e da lakehouse. Use este campo ounames
. Não use ambos. Ele tem os seguintes subcampos:workspaceGuid
: O GUID do workspace.lakehouseGuid
: O GUID da lakehouse.
fabricPath
: O local dos dados no workspace do Fabric. Pode sertables
oufiles
. Se fortables
, os dados serão armazenados no Fabric OneLake como tabelas. Se forfiles
, os dados serão armazenados no Fabric OneLake como arquivos. Se forfiles
, odatabaseFormat
deve serparquet
.authentication
: O campo de autenticação especifica o tipo e as credenciais para acessar o Microsoft Fabric OneLake. Só pode sersystemAssignedManagedIdentity
por enquanto. Ele tem um subcampo:systemAssignedManagedIdentity
: Para usar a identidade gerenciada do sistema para autenticação. Ele tem um subcampoaudience
: Uma cadeia de caracteres para o público-alvo do token de identidade gerenciada e deve serhttps://storage.azure.com
.
adx
: especifica a configuração e as propriedades do banco de dados do Azure Data Explorer. Ele tem os seguintes subcampos:endpoint
: a URL do ponto de extremidade do cluster do Azure Data Explorer, comohttps://<CLUSTER>.<REGION>.kusto.windows.net
. Não inclua nenhuma barra à direita/
.authentication
: o campo de autenticação especifica o tipo e as credenciais para acessar o cluster do Azure Data Explorer. Só pode sersystemAssignedManagedIdentity
por enquanto. Ele tem um subcampo:systemAssignedManagedIdentity
: Para usar a identidade gerenciada do sistema para autenticação. Ele tem um subcampoaudience
: uma cadeia de caracteres para o público-alvo do token de identidade gerenciada e deve serhttps://api.kusto.windows.net
.
localStorage
: Especifica a configuração e as propriedades da conta de armazenamento local. Ele tem os seguintes subcampos:volumeName
: O nome do volume montado em cada um dos pods do conector.
localBrokerConnection
: Usado para substituir a configuração de conexão padrão para o agente MQTT do IoT MQ. Consulte Gerenciar conexão do agente local.
DataLakeConnectorTopicMap
Um DataLakeConnectorTopicMap é um recurso personalizado do Kubernetes que define o mapeamento entre um tópico MQTT e uma tabela Delta em uma conta do Data Lake Storage. Um recurso DataLakeConnectorTopicMap faz referência a um recurso DataLakeConnector que é executado no mesmo dispositivo de borda e ingere dados do tópico MQTT na tabela Delta.
O campo de especificação de um recurso DataLakeConnectorTopicMap contém os seguintes subcampos:
dataLakeConnectorRef
: O nome do recurso DataLakeConnector ao qual este mapa de tópico pertence.mapping
: O campo de mapeamento especifica os detalhes e as propriedades do tópico MQTT e da tabela Delta. Ele tem os seguintes subcampos:allowedLatencySecs
: A latência máxima em segundos entre o recebimento de uma mensagem do tópico MQTT e a ingestão na tabela Delta. Este campo é necessário.clientId
: Um identificador exclusivo para o cliente MQTT que assina o tópico.maxMessagesPerBatch
: O número máximo de mensagens a serem ingeridas em um lote na tabela Delta. Devido a uma restrição temporária, esse valor deverá ser menor que 16 seqos
for definido como 1. Este campo é necessário.messagePayloadType
: O tipo de conteúdo que é enviado para o tópico MQTT. Ele pode serjson
ouavro
(ainda não há suporte).mqttSourceTopic
: O nome dos tópicos do MQTT aos quais assinar. Dá suporte à notação curinga do tópico MQTT.qos
: A qualidade do nível de serviço para assinar o tópico MQTT. Pode ser um de 0 ou 1.table
: O campo de tabela especifica a configuração e as propriedades da tabela Delta na conta do Data Lake Storage. Ele tem os seguintes subcampos:tableName
: O nome da tabela Delta à qual criar ou acrescentar na conta do Data Lake Storage. Esse campo também é conhecido como o nome do contêiner quando usado com o Azure Data Lake Storage Gen2. Ele pode conter qualquer letra minúscula em Inglês e underline (_
), com comprimento de até 256 caracteres. Não são permitidos traços-
ou caracteres de espaço.tablePath
: o nome do banco de dados do Azure Data Explorer ao usar o conector de tipoadx
.schema
: O esquema da tabela Delta, que deve corresponder ao formato e aos campos do conteúdo da mensagem. É uma matriz de objetos, cada um com os seguintes subcampos:name
: O nome da coluna na tabela Delta.format
: O tipo de dados da coluna na tabela Delta. Pode serboolean
,int8
,int16
,int32
,int64
,uInt8
,uInt16
,uInt32
,uInt64
,float16
,float32
,float64
,date32
,timestamp
,binary
ouutf8
. Tipos não assinados, comouInt8
, não têm suporte total e são tratados como tipos assinados, se especificados aqui.optional
: Um valor booliano que indica se a coluna é opcional ou necessária. Esse campo é opcional e usa como padrão false.mapping
: Expressão de caminho JSON que define como extrair o valor da coluna do conteúdo da mensagem MQTT. Mapeamentos internos$client_id
,$topic
,$properties
e$received_time
estão disponíveis para uso como colunas para enriquecer o JSON no corpo da mensagem MQTT. Este campo é necessário. Use $properties para propriedades de usuário do MQTT. Por exemplo, $properties.assetId representa o valor da propriedade assetId da mensagem MQTT.
Aqui está um exemplo de um recurso DataLakeConnectorTopicMap:
apiVersion: mq.iotoperations.azure.com/v1beta1
kind: DataLakeConnectorTopicMap
metadata:
name: datalake-topicmap
namespace: azure-iot-operations
spec:
dataLakeConnectorRef: my-datalake-connector
mapping:
allowedLatencySecs: 1
messagePayloadType: json
maxMessagesPerBatch: 10
clientId: id
mqttSourceTopic: azure-iot-operations/data/thermostat
qos: 1
table:
tableName: thermostat
schema:
- name: externalAssetId
format: utf8
optional: false
mapping: $property.externalAssetId
- name: assetName
format: utf8
optional: false
mapping: DataSetWriterName
- name: CurrentTemperature
format: float32
optional: false
mapping: Payload.temperature.Value
- name: Pressure
format: float32
optional: true
mapping: "Payload.Tag 10.Value"
- name: Timestamp
format: timestamp
optional: false
mapping: $received_time
Não há suporte para JSON com cadeia de caracteres como "{\"SequenceNumber\": 4697, \"Timestamp\": \"2024-04-02T22:36:03.1827681Z\", \"DataSetWriterName\": \"thermostat-de\", \"MessageType\": \"ua-deltaframe\", \"Payload\": {\"temperature\": {\"SourceTimestamp\": \"2024-04-02T22:36:02.6949717Z\", \"Value\": 5506}, \"Tag 10\": {\"SourceTimestamp\": \"2024-04-02T22:36:02.6949888Z\", \"Value\": 5506}}}"
e faz com que o conector gere um erro conversor encontrou um valor nulo.
Uma mensagem de exemplo para o tópico azure-iot-operations/data/thermostat
que funciona com este esquema:
{
"SequenceNumber": 4697,
"Timestamp": "2024-04-02T22:36:03.1827681Z",
"DataSetWriterName": "thermostat",
"MessageType": "ua-deltaframe",
"Payload": {
"temperature": {
"SourceTimestamp": "2024-04-02T22:36:02.6949717Z",
"Value": 5506
},
"Tag 10": {
"SourceTimestamp": "2024-04-02T22:36:02.6949888Z",
"Value": 5506
}
}
}
O que mapeia para:
externalAssetId | assetName | CurrentTemperature | Pressão | mqttTopic | timestamp |
---|---|---|---|---|---|
xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx | termostat-de | 5506 | 5506 | DLC | 2024-04-02T22:36:03.1827681Z |
Importante
Se o esquema de dados for atualizado, por exemplo, um tipo de dados será alterado ou um nome será alterado, a transformação dos dados de entrada poderá parar de funcionar. Você precisará alterar o nome da tabela de dados se ocorrer uma alteração de esquema.
Delta ou parquet
Há suporte para formatos delta e parquet.
Gerenciar a conexão do agente local
Assim como a ponte MQTT, o conector do data lake atua como um cliente para o agente MQTT do MQ da Internet das Coisas. Se você personalizou a porta do ouvinte ou a autenticação do agente MQTT do MQ da Internet das Coisas, substitua a configuração de conexão MQTT local para o conector do data lake também. Para saber mais, consulte Conexão do agente local da ponte MQTT.
Conteúdo relacionado
Publicar e assinar mensagens MQTT usando a versão prévia do Azure IoT MQ