Enviar dados da Versão preliminar do MQ da Internet das Coisas do Azure para o Data Lake Storage

Importante

O recurso Pré-visualização de Operações do Azure IoT — habilitado pelo Azure Arc — está atualmente em VERSÃO PRÉVIA. Você não deve usar esse software em versão prévia em ambientes de produção.

Veja os Termos de Uso Complementares para Versões Prévias do Microsoft Azure para obter termos legais que se aplicam aos recursos do Azure que estão em versão beta, versão prévia ou que, de outra forma, ainda não foram lançados em disponibilidade geral.

Você pode usar o conector do data lake para enviar dados do agente da versão prévia do MQ da Internet das Coisas do Azure para um data lake, como o Azure Data Lake Storage Gen2 (ADLSv2), o Microsoft Fabric OneLake e o Azure Data Explorer. O conector assina os tópicos do MQTT e ingere as mensagens em tabelas Delta na conta do Data Lake Storage.

Pré-requisitos

Configurar para enviar dados ao Microsoft Fabric OneLake usando a identidade gerenciada

Configure um conector data lake para se conectar ao Microsoft Fabric OneLake usando a identidade gerenciada.

  1. Verifique se as etapas nos pré-requisitos são atendidas, incluindo um workspace do Microsoft Fabric e um lakehouse. O meu workspace padrão não pode ser usado.

  2. Verifique se a extensão do IoT MQ Arc está instalada e configurada com a identidade gerenciada.

  3. No portal do Azure, acesse o cluster do Kubernetes conectado ao Azure Arc e selecione Configurações>Extensões. Na lista de extensões, procure o nome da extensão do IoT MQ. O nome começa com mq- seguido por cinco caracteres aleatórios. Por exemplo, mq-4jgjs.

  4. Obtenha a ID do aplicativo associada à identidade gerenciada da extensão do IoT MQ Arc e anote o valor do GUID. A ID do aplicativo é diferente da ID do objeto ou da entidade de segurança. Você pode usar a CLI do Azure localizando a ID do objeto da identidade gerenciada e consultando a ID do aplicativo da entidade de serviço associada à identidade gerenciada. Por exemplo:

    OBJECT_ID=$(az k8s-extension show --name <IOT_MQ_EXTENSION_NAME> --cluster-name <ARC_CLUSTER_NAME> --resource-group <RESOURCE_GROUP_NAME> --cluster-type connectedClusters --query identity.principalId -o tsv)
    az ad sp show --query appId --id $OBJECT_ID --output tsv
    

    Você deve obter uma saída com um valor GUID:

    xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
    

    Esse GUID é a ID do aplicativo que você precisa usar na próxima etapa.

  5. No workspace do Microsoft Fabric, use Gerenciar acesso e selecione + Adicionar pessoas ou grupos.

  6. Pesquise a extensão do Arc MQ da Internet das Coisas pelo nome "mq" e selecione o valor GUID da ID do aplicativo encontrado na etapa anterior.

  7. Selecione Colaborador como a função e, em seguida, selecione Adicionar.

  8. Crie um recurso DataLakeConnector que defina as configurações e as configurações de ponto de extremidade para o conector. Você pode usar o YAML fornecido como exemplo, mas certifique-se de alterar os seguintes campos:

    • target.fabricOneLake.endpoint: o ponto de extremidade da conta do Microsoft Fabric OneLake. Você pode obter a URL do ponto de extremidade do Microsoft Fabric lakehouse em Propriedadesde Arquivos>. A URL deve ter a seguinte aparência: https://onelake.dfs.fabric.microsoft.com.
    • target.fabricOneLake.names: Os nomes do workspace e da lakehouse. Use este campo ou guids. Não use ambos.
      • workspaceName: nome do workspace.
      • lakehouseName: O nome da lakehouse.
    apiVersion: mq.iotoperations.azure.com/v1beta1
    kind: DataLakeConnector
    metadata:
      name: my-datalake-connector
      namespace: azure-iot-operations
    spec:
      protocol: v5
      image:
        repository: mcr.microsoft.com/azureiotoperations/datalake
        tag: 0.4.0-preview
        pullPolicy: IfNotPresent
      instances: 2
      logLevel: info
      databaseFormat: delta
      target:
        fabricOneLake:
          # Example: https://onelake.dfs.fabric.microsoft.com
          endpoint: <example-endpoint-url>
          names:
            workspaceName: <example-workspace-name>
            lakehouseName: <example-lakehouse-name>
          ## OR
          # guids:
          #   workspaceGuid: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
          #   lakehouseGuid: xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
          fabricPath: tables
          authentication:
            systemAssignedManagedIdentity:
              audience: https://storage.azure.com/
      localBrokerConnection:
        endpoint: aio-mq-dmqtt-frontend:8883
        tls:
          tlsEnabled: true
          trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only
        authentication:
          kubernetes: {}
    
  9. Crie um recurso DataLakeConnectorTopicMap que define o mapeamento entre o tópico MQTT e a tabela Delta no Data Lake Storage. Você pode usar o YAML fornecido como exemplo, mas certifique-se de alterar os seguintes campos:

    • dataLakeConnectorRef: O nome do recurso DataLakeConnector que você criou anteriormente.
    • clientId: Um identificador exclusivo para seu cliente MQTT.
    • mqttSourceTopic: O nome do tópico MQTT do qual você deseja que os dados sejam provenientes.
    • table.tableName: O nome da tabela à qual você deseja acrescentar na lakehouse. A tabela será criada automaticamente se não existir.
    • table.schema: O esquema da tabela Delta que deve corresponder ao formato e campos das mensagens JSON que você envia para o tópico MQTT.
  10. Aplique os recursos DataLakeConnector e DataLakeConnectorTopicMap ao cluster do Kubernetes usando kubectl apply -f datalake-connector.yaml.

  11. Comece a enviar mensagens JSON para o tópico MQTT usando seu editor MQTT. A instância do conector do data lake assina o tópico e ingere as mensagens na tabela Delta.

  12. Usando um navegador, verifique se os dados são importados para o lakehouse. No workspace do Microsoft Fabric, selecione seu lakehouse e, em seguida, Tabelas. Você deve ver os dados na tabela.

Tabela não identificada

Se os dados forem exibidos na tabela Não Identificada:

A causa pode ser caracteres sem suporte no nome da tabela. O nome da tabela deve ser um nome de contêiner válido do Armazenamento do Microsoft Azure, o que significa que ele pode conter qualquer letra em inglês, maiúsculas ou minúsculas e barra inferior _, com comprimento de até 256 caracteres. Não são permitidos traços - ou caracteres de espaço.

Configurar para enviar dados ao Azure Data Lake Storage Gen2 usando o token SAS

Configure um conector data lake para se conectar a uma conta do ADLS Gen2 (Azure Data Lake Storage Gen2) usando um token SAS (assinatura de acesso compartilhado).

  1. Obtenha um token SAS para uma conta do Azure Data Lake Storage Gen2 (ADLS Gen2). Por exemplo, use o portal do Azure para navegar até sua conta de armazenamento. No menu em Segurança + rede, escolha Assinatura de acesso compartilhado. Use a tabela a seguir para definir as permissões necessárias.

    Parâmetro Valor
    Serviços permitidos Blob
    Tipos de recursos permitidos Objeto, Contêiner
    Permissões aceitas Ler, Gravar, Excluir, Listar, Criar

    Para otimizar para privilégios mínimos, você também pode optar por obter a SAS para um contêiner individual. Para evitar erros de autenticação, verifique se o contêiner corresponde ao valor table.tableName na configuração do mapa de tópicos.

  2. Crie um segredo do Kubernetes com o token SAS. Não inclua o ponto de interrogação ? que pode estar no início do token.

    kubectl create secret generic my-sas \
    --from-literal=accessToken='sv=2022-11-02&ss=b&srt=c&sp=rwdlax&se=2023-07-22T05:47:40Z&st=2023-07-21T21:47:40Z&spr=https&sig=xDkwJUO....' \
    -n azure-iot-operations
    
  3. Crie um recurso DataLakeConnector que defina as configurações e as configurações de ponto de extremidade para o conector. Você pode usar o YAML fornecido como exemplo, mas certifique-se de alterar os seguintes campos:

    • endpoint: O ponto de extremidade do Data Lake Storage da conta de armazenamento do ADLSv2 na forma de https://example.blob.core.windows.net. No portal do Azure, localize o ponto de extremidade em Conta de Armazenamento > Configurações > Pontos de extremidade > Data Lake Storage.
    • accessTokenSecretName: Nome do segredo do Kubernetes que contém o token SAS (my-sas do exemplo anterior).
    apiVersion: mq.iotoperations.azure.com/v1beta1
    kind: DataLakeConnector
    metadata:
      name: my-datalake-connector
      namespace: azure-iot-operations
    spec:
      protocol: v5
      image:
        repository: mcr.microsoft.com/azureiotoperations/datalake
        tag: 0.4.0-preview
        pullPolicy: IfNotPresent
      instances: 2
      logLevel: "debug"
      databaseFormat: "delta"
      target:
        datalakeStorage:
          endpoint: "https://example.blob.core.windows.net"
          authentication:
            accessTokenSecretName: "my-sas"
      localBrokerConnection:
        endpoint: aio-mq-dmqtt-frontend:8883
        tls:
          tlsEnabled: true
          trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only
        authentication:
          kubernetes: {}
    
  4. Crie um recurso DataLakeConnectorTopicMap que define o mapeamento entre o tópico MQTT e a tabela Delta no Data Lake Storage. Você pode usar o YAML fornecido como exemplo, mas certifique-se de alterar os seguintes campos:

    • dataLakeConnectorRef: O nome do recurso DataLakeConnector que você criou anteriormente.
    • clientId: Um identificador exclusivo para seu cliente MQTT.
    • mqttSourceTopic: O nome do tópico MQTT do qual você deseja que os dados sejam provenientes.
    • table.tableName: O nome do contêiner ao qual você deseja acrescentar no Data Lake Storage. Se o token SAS estiver no escopo da conta, o contêiner será criado automaticamente se estiver ausente.
    • table.schema: O esquema da tabela Delta, que deve corresponder ao formato e campos das mensagens JSON que você envia para o tópico MQTT.
  5. Aplique os recursos DataLakeConnector e DataLakeConnectorTopicMap ao cluster de Kubernetes usando kubectl apply -f datalake-connector.yaml.

  6. Comece a enviar mensagens JSON para o tópico MQTT usando seu editor MQTT. A instância do conector do data lake assina o tópico e ingere as mensagens na tabela Delta.

  7. Usando o portal do Azure, verifique se a tabela Delta foi criada. Os arquivos são organizados por ID do cliente, nome da instância do conector, tópico MQTT e hora. Em >Contêineres da sua conta de armazenamento, abra o contêiner especificado no DataLakeConnectorTopicMap. Verifique se _delta_log existe e os arquivos parque mostram o tráfego MQTT. Abra um arquivo parque para confirmar se a carga corresponde ao que foi enviado e definido no esquema.

Usar a identidade gerenciada para autenticação no ADLSv2

Para usar a identidade gerenciada, especifique-a como o único método em DataLakeConnector authentication. Use az k8s-extension show para localizar a ID principal da extensão do Arc do MQ da Internet das Coisas e, em seguida, atribua uma função à identidade gerenciada que concede permissão para gravar na conta de armazenamento, como Colaborador de Dados de Blobs de Armazenamento. Para saber mais, confira Autorizar o acesso a blobs usando o Microsoft Entra ID.

authentication:
  systemAssignedManagedIdentity:
    audience: https://my-account.blob.core.windows.net

Configurar para enviar dados para o Azure Data Explorer usando a identidade gerenciada

Configure o conector do data lake para enviar dados ao ponto de extremidade do Azure Data Explorer usando a identidade gerenciada.

  1. Verifique se as etapas nos pré-requisitos são atendidas, incluindo um cluster completo do Azure Data Explorer. A opção "cluster gratuito" não funciona.

  2. Depois que o cluster for criado, crie um banco de dados para armazenar seus dados.

  3. Você pode criar uma tabela para determinados dados por meio do portal do Azure e criar colunas manualmente ou pode usar KQL na guia de consulta. Por exemplo:

    .create table thermostat (
        externalAssetId: string,
        assetName: string,
        CurrentTemperature: real,
        Pressure: real,
        MqttTopic: string,
        Timestamp: datetime
    )
    

Habilitar a ingestão de streaming

Habilite a ingestão de streaming em sua tabela e banco de dados. Na guia de consulta, execute o seguinte comando, substituindo <DATABASE_NAME> pelo nome do banco de dados:

.alter database <DATABASE_NAME> policy streamingingestion enable

Adicionar a identidade gerenciada ao cluster do Azure Data Explorer

Para que o conector se autentique no Azure Data Explorer, você deve adicionar a identidade gerenciada ao cluster do Azure Data Explorer.

  1. No portal do Azure, acesse o cluster do Kubernetes conectado ao Azure Arc e selecione Configurações>Extensões. Na lista de extensões, procure o nome da extensão do IoT MQ. O nome começa com mq- seguido por cinco caracteres aleatórios. Por exemplo, mq-4jgjs. O nome da extensão do IoT MQ é o mesmo que o nome da identidade gerenciada do MQ.
  2. No banco de dados do Azure Data Explorer, selecione Permissões>Adicionar>Ingestor. Pesquise o nome da identidade gerenciada do MQ e adicione-o.

Para obter mais informações sobre como adicionar permissões, consulte Gerenciar permissões de cluster do Azure Data Explorer.

Agora, você está pronto para implantar o conector e enviar dados para o Azure Data Explorer.

Exemplo de arquivo de implantação

Arquivo de implantação de exemplo para o conector do Azure Data Explorer. Comentários que começam com TODO exigem que você substitua as configurações de espaço reservado por suas informações.

apiVersion: mq.iotoperations.azure.com/v1beta1
  name: my-adx-connector
  namespace: azure-iot-operations
spec:
    repository: mcr.microsoft.com/azureiotoperations/datalake
    tag: 0.4.0-preview
    pullPolicy: Always
  databaseFormat: adx
  target:
      # TODO: insert the ADX cluster endpoint
      endpoint: https://<CLUSTER>.<REGION>.kusto.windows.net
      authentication:
        systemAssignedManagedIdentity:
          audience: https://api.kusto.windows.net
  localBrokerConnection:
    endpoint: aio-mq-dmqtt-frontend:8883
    tls:
      tlsEnabled: true
      trustedCaCertificateConfigMap: aio-ca-trust-bundle-test-only
    authentication:
      kubernetes: {}
---
apiVersion: mq.iotoperations.azure.com/v1beta1
kind: DataLakeConnectorTopicMap
metadata:
  name: adx-topicmap
  namespace: azure-iot-operations
spec:
  mapping:
    allowedLatencySecs: 1
    messagePayloadType: json
    maxMessagesPerBatch: 10
    clientId: id
    mqttSourceTopic: azure-iot-operations/data/thermostat
    qos: 1
    table:
      # TODO: add DB and table name
      tablePath: <DATABASE_NAME>
      tableName: <TABLE_NAME>
      schema:
      - name: externalAssetId
        format: utf8
        optional: false
        mapping: $property.externalAssetId
      - name: assetName
        format: utf8
        optional: false
        mapping: DataSetWriterName
      - name: CurrentTemperature
        format: float32
        optional: false
        mapping: Payload.temperature.Value
      - name: Pressure
        format: float32
        optional: true
        mapping: "Payload.Tag 10.Value"
      - name: MqttTopic
        format: utf8
        optional: false
        mapping: $topic
      - name: Timestamp
        format: timestamp
        optional: false
        mapping: $received_time

Este exemplo aceita dados do tópico azure-iot-operations/data/thermostat com mensagens no formato JSON, como o seguinte:

{
  "SequenceNumber": 4697,
  "Timestamp": "2024-04-02T22:36:03.1827681Z",
  "DataSetWriterName": "thermostat",
  "MessageType": "ua-deltaframe",
  "Payload": {
    "temperature": {
      "SourceTimestamp": "2024-04-02T22:36:02.6949717Z",
      "Value": 5506
    },
    "Tag 10": {
      "SourceTimestamp": "2024-04-02T22:36:02.6949888Z",
      "Value": 5506
    }
  }
}

DataLakeConnector

Um DataLakeConnector é um recurso personalizado do Kubernetes que define a configuração e as propriedades de uma instância do conector data lake. Um conector data lake ingere dados de tópicos MQTT em tabelas Delta em uma conta do Data Lake Storage.

O campo de especificação de um recurso DataLakeConnector contém os seguintes subcampos:

  • protocol: A versão do MQTT. Pode ser v5 ou v3.
  • image: O campo de imagem especifica a imagem de contêiner do módulo do conector do data lake. Ele tem os seguintes subcampos:
    • repository: O nome do registro de contêiner e do repositório em que a imagem é armazenada.
    • tag: A marca da imagem a ser usada.
    • pullPolicy: A política de pull para a imagem. Pode ser Always, IfNotPresent ou Never.
  • instances: O número de réplicas do conector data lake a ser executado.
  • logLevel: O nível de log do módulo do conector do data lake. Pode ser trace, debug, info, warn, error ou fatal.
  • databaseFormat: O formato dos dados a serem ingeridos no Data Lake Storage. Pode ser delta ou parquet.
  • target: O campo de destino especifica o destino da ingestão de dados. Pode ser datalakeStorage, fabricOneLake, adx ou localStorage.
    • datalakeStorage: especifica a configuração e as propriedades da conta do ADLSv2. Ele tem os seguintes subcampos:
      • endpoint: A URL do ponto de extremidade da conta do Data Lake Storage. Não inclua nenhuma barra à direita /.
      • authentication: O campo de autenticação especifica o tipo e as credenciais para acessar a conta do Data Lake Storage. Pode ser um dos seguintes.
        • accessTokenSecretName: O nome do segredo do Kubernetes para usar a autenticação de token de acesso compartilhado para a conta do Data Lake Storage. Esse campo será necessário se o tipo for accessToken.
        • systemAssignedManagedIdentity: Para usar a identidade gerenciada do sistema para autenticação. Ele tem um subcampo
          • audience: Uma cadeia de caracteres na forma de https://<my-account-name>.blob.core.windows.net para o público de token de identidade gerenciada com escopo para o nível da conta ou https://storage.azure.com para qualquer conta de armazenamento.
    • fabricOneLake: Especifica a configuração e as propriedades do Microsoft Fabric OneLake. Ele tem os seguintes subcampos:
      • endpoint: A URL do ponto de extremidade do Microsoft Fabric OneLake. Geralmente é https://onelake.dfs.fabric.microsoft.com porque esse é o ponto de extremidade global do OneLake. Se você estiver usando um ponto de extremidade regional, ele estará na forma de https://<region>-onelake.dfs.fabric.microsoft.com. Não inclua nenhuma barra à direita /. Para saber mais, confira Conectar-se ao Microsoft OneLake.
      • names: Especifica os nomes do workspace e da lakehouse. Use este campo ou guids. Não use ambos. Ele tem os seguintes subcampos:
        • workspaceName: nome do workspace.
        • lakehouseName: O nome da lakehouse.
      • guids: Especifica os GUIDs do workspace e da lakehouse. Use este campo ou names. Não use ambos. Ele tem os seguintes subcampos:
        • workspaceGuid: O GUID do workspace.
        • lakehouseGuid: O GUID da lakehouse.
      • fabricPath: O local dos dados no workspace do Fabric. Pode ser tables ou files. Se for tables, os dados serão armazenados no Fabric OneLake como tabelas. Se for files, os dados serão armazenados no Fabric OneLake como arquivos. Se for files, o databaseFormat deve ser parquet.
      • authentication: O campo de autenticação especifica o tipo e as credenciais para acessar o Microsoft Fabric OneLake. Só pode ser systemAssignedManagedIdentity por enquanto. Ele tem um subcampo:
      • systemAssignedManagedIdentity: Para usar a identidade gerenciada do sistema para autenticação. Ele tem um subcampo
        • audience: Uma cadeia de caracteres para o público-alvo do token de identidade gerenciada e deve ser https://storage.azure.com.
    • adx: especifica a configuração e as propriedades do banco de dados do Azure Data Explorer. Ele tem os seguintes subcampos:
      • endpoint: a URL do ponto de extremidade do cluster do Azure Data Explorer, como https://<CLUSTER>.<REGION>.kusto.windows.net. Não inclua nenhuma barra à direita /.
      • authentication: o campo de autenticação especifica o tipo e as credenciais para acessar o cluster do Azure Data Explorer. Só pode ser systemAssignedManagedIdentity por enquanto. Ele tem um subcampo:
        • systemAssignedManagedIdentity: Para usar a identidade gerenciada do sistema para autenticação. Ele tem um subcampo
          • audience: uma cadeia de caracteres para o público-alvo do token de identidade gerenciada e deve ser https://api.kusto.windows.net.
    • localStorage: Especifica a configuração e as propriedades da conta de armazenamento local. Ele tem os seguintes subcampos:
      • volumeName: O nome do volume montado em cada um dos pods do conector.
  • localBrokerConnection: Usado para substituir a configuração de conexão padrão para o agente MQTT do IoT MQ. Consulte Gerenciar conexão do agente local.

DataLakeConnectorTopicMap

Um DataLakeConnectorTopicMap é um recurso personalizado do Kubernetes que define o mapeamento entre um tópico MQTT e uma tabela Delta em uma conta do Data Lake Storage. Um recurso DataLakeConnectorTopicMap faz referência a um recurso DataLakeConnector que é executado no mesmo dispositivo de borda e ingere dados do tópico MQTT na tabela Delta.

O campo de especificação de um recurso DataLakeConnectorTopicMap contém os seguintes subcampos:

  • dataLakeConnectorRef: O nome do recurso DataLakeConnector ao qual este mapa de tópico pertence.
  • mapping: O campo de mapeamento especifica os detalhes e as propriedades do tópico MQTT e da tabela Delta. Ele tem os seguintes subcampos:
    • allowedLatencySecs: A latência máxima em segundos entre o recebimento de uma mensagem do tópico MQTT e a ingestão na tabela Delta. Este campo é necessário.
    • clientId: Um identificador exclusivo para o cliente MQTT que assina o tópico.
    • maxMessagesPerBatch: O número máximo de mensagens a serem ingeridas em um lote na tabela Delta. Devido a uma restrição temporária, esse valor deverá ser menor que 16 se qos for definido como 1. Este campo é necessário.
    • messagePayloadType: O tipo de conteúdo que é enviado para o tópico MQTT. Ele pode ser json ou avro (ainda não há suporte).
    • mqttSourceTopic: O nome dos tópicos do MQTT aos quais assinar. Dá suporte à notação curinga do tópico MQTT.
    • qos: A qualidade do nível de serviço para assinar o tópico MQTT. Pode ser um de 0 ou 1.
    • table: O campo de tabela especifica a configuração e as propriedades da tabela Delta na conta do Data Lake Storage. Ele tem os seguintes subcampos:
      • tableName: O nome da tabela Delta à qual criar ou acrescentar na conta do Data Lake Storage. Esse campo também é conhecido como o nome do contêiner quando usado com o Azure Data Lake Storage Gen2. Ele pode conter qualquer letra minúscula em Inglês e underline (_), com comprimento de até 256 caracteres. Não são permitidos traços - ou caracteres de espaço.
      • tablePath: o nome do banco de dados do Azure Data Explorer ao usar o conector de tipo adx.
      • schema: O esquema da tabela Delta, que deve corresponder ao formato e aos campos do conteúdo da mensagem. É uma matriz de objetos, cada um com os seguintes subcampos:
        • name: O nome da coluna na tabela Delta.
        • format: O tipo de dados da coluna na tabela Delta. Pode ser boolean, int8, int16, int32, int64, uInt8, uInt16, uInt32, uInt64, float16, float32, float64, date32, timestamp, binary ou utf8. Tipos não assinados, como uInt8, não têm suporte total e são tratados como tipos assinados, se especificados aqui.
        • optional: Um valor booliano que indica se a coluna é opcional ou necessária. Esse campo é opcional e usa como padrão false.
        • mapping: Expressão de caminho JSON que define como extrair o valor da coluna do conteúdo da mensagem MQTT. Mapeamentos internos $client_id, $topic, $properties e $received_time estão disponíveis para uso como colunas para enriquecer o JSON no corpo da mensagem MQTT. Este campo é necessário. Use $properties para propriedades de usuário do MQTT. Por exemplo, $properties.assetId representa o valor da propriedade assetId da mensagem MQTT.

Aqui está um exemplo de um recurso DataLakeConnectorTopicMap:

apiVersion: mq.iotoperations.azure.com/v1beta1
kind: DataLakeConnectorTopicMap
metadata:
  name: datalake-topicmap
  namespace: azure-iot-operations
spec:
  dataLakeConnectorRef: my-datalake-connector
  mapping:
    allowedLatencySecs: 1
    messagePayloadType: json
    maxMessagesPerBatch: 10
    clientId: id
    mqttSourceTopic: azure-iot-operations/data/thermostat
    qos: 1
    table:
      tableName: thermostat
      schema:
      - name: externalAssetId
        format: utf8
        optional: false
        mapping: $property.externalAssetId
      - name: assetName
        format: utf8
        optional: false
        mapping: DataSetWriterName
      - name: CurrentTemperature
        format: float32
        optional: false
        mapping: Payload.temperature.Value
      - name: Pressure
        format: float32
        optional: true
        mapping: "Payload.Tag 10.Value"
      - name: Timestamp
        format: timestamp
        optional: false
        mapping: $received_time

Não há suporte para JSON com cadeia de caracteres como "{\"SequenceNumber\": 4697, \"Timestamp\": \"2024-04-02T22:36:03.1827681Z\", \"DataSetWriterName\": \"thermostat-de\", \"MessageType\": \"ua-deltaframe\", \"Payload\": {\"temperature\": {\"SourceTimestamp\": \"2024-04-02T22:36:02.6949717Z\", \"Value\": 5506}, \"Tag 10\": {\"SourceTimestamp\": \"2024-04-02T22:36:02.6949888Z\", \"Value\": 5506}}}" e faz com que o conector gere um erro conversor encontrou um valor nulo.

Uma mensagem de exemplo para o tópico azure-iot-operations/data/thermostat que funciona com este esquema:

{
  "SequenceNumber": 4697,
  "Timestamp": "2024-04-02T22:36:03.1827681Z",
  "DataSetWriterName": "thermostat",
  "MessageType": "ua-deltaframe",
  "Payload": {
    "temperature": {
      "SourceTimestamp": "2024-04-02T22:36:02.6949717Z",
      "Value": 5506
    },
    "Tag 10": {
      "SourceTimestamp": "2024-04-02T22:36:02.6949888Z",
      "Value": 5506
    }
  }
}

O que mapeia para:

externalAssetId assetName CurrentTemperature Pressão mqttTopic timestamp
xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx termostat-de 5506 5506 DLC 2024-04-02T22:36:03.1827681Z

Importante

Se o esquema de dados for atualizado, por exemplo, um tipo de dados será alterado ou um nome será alterado, a transformação dos dados de entrada poderá parar de funcionar. Você precisará alterar o nome da tabela de dados se ocorrer uma alteração de esquema.

Delta ou parquet

Há suporte para formatos delta e parquet.

Gerenciar a conexão do agente local

Assim como a ponte MQTT, o conector do data lake atua como um cliente para o agente MQTT do MQ da Internet das Coisas. Se você personalizou a porta do ouvinte ou a autenticação do agente MQTT do MQ da Internet das Coisas, substitua a configuração de conexão MQTT local para o conector do data lake também. Para saber mais, consulte Conexão do agente local da ponte MQTT.

Publicar e assinar mensagens MQTT usando a versão prévia do Azure IoT MQ