Share via


Tutoriel : Acheminer des données à l’aide de stratégies de mise à jour de table

Lorsque vos données sources impliquent des transformations simples et rapides, il est préférable de les exécuter amont dans le pipeline à l’aide d’un flux d’événements. Toutefois, cette approche peut ne pas fonctionner correctement pour d’autres transformations qui sont complexes ou qui nécessitent des fonctionnalités spécialisées pour fonctionner.

Dans ce tutoriel, vous allez apprendre à :

L’exemple de ce tutoriel montre comment utiliser des stratégies de mise à jour pour le routage des données afin d’effectuer des transformations complexes afin d’enrichir, de nettoyer et de transformer des données au moment de l’ingestion. Pour obtenir la liste des autres cas d’usage courants, consultez Cas d’usage courants pour les stratégies de mise à jour de table.

Prérequis

1 - Créer des tables et mettre à jour des stratégies

Les étapes suivantes vous guident dans la création d’une table source, de fonctions de transformation, de tables de destination et de stratégies de mise à jour. Ce tutoriel montre comment utiliser des stratégies de mise à jour de table pour effectuer des transformations complexes et enregistrer les résultats dans une ou plusieurs tables de destination. L’exemple utilise une table source unique nommée Raw_Table et trois tables de destination nommées Device_Telemetry, Device_Alarms et Error_Log.

  1. Exécutez la commande suivante pour créer une table nommée Raw_Table.

    .create table Raw_Table (RawData: dynamic)
    

    La table source est l’endroit où les données ingérées sont enregistrées. La table comporte une seule colonne nommée RawData de type dynamique. Le type dynamique est utilisé pour stocker les données brutes telles quelles, sans schéma. Pour plus d’informations, consultez commande .create table.

  2. Exécutez la commande suivante pour créer une fonction nommée Get_Telemetry, Get_Alarms et Log_Error fonctions.

    .execute database script <|
      .create-or-alter function Get_Telemetry() {
        Raw_Table
        | where todynamic(RawData).MessageType == 'Telemetry'
        | extend
          Timestamp = unixtime_seconds_todatetime(tolong(RawData.Timestamp)),
          DeviceId = tostring(RawData.DeviceId),
          DeviceType = tostring(RawData.DeviceType),
          SensorName = tostring(RawData.SensorName),
          SensorValue = toreal(RawData.SensorValue),
          SensorUnit = tostring(RawData.SensorUnit)
        | project-away RawData
      }
      .create-or-alter function Get_Alarms() {
        Raw_Table
        | where RawData.MessageType == 'Alarms'
        | extend
          Timestamp = unixtime_seconds_todatetime(tolong(RawData.Timestamp)),
          DeviceId = tostring(RawData.DeviceId),
          DeviceType = tostring(RawData.DeviceTpe) ,
          AlarmType = tostring(RawData.AlarmType)
        | project-away RawData
      }
      .create-or-alter function Log_Error() {
        Raw_Table
        | where RawData.MessageType !in ('Telemetry', 'Alarms')
        | extend
          TimeStamp = datetime(now),
          ErrorType = 'Unknown MessageType'
        | project TimeStamp, RawData, ErrorType
      }
    

    Lors de la création d’une stratégie de mise à jour, vous pouvez spécifier un script en ligne pour l’exécution. Toutefois, nous vous recommandons d’encapsuler la logique de transformation dans une fonction. L’utilisation d’une fonction améliore la maintenance du code. Lorsque de nouvelles données arrivent, la fonction est exécutée pour transformer les données. La fonction peut être réutilisée dans plusieurs stratégies de mise à jour. Pour plus d’informations, consultez commande .create function.

  3. Exécutez la commande suivante pour créer les tables de destination.

    .execute database script <|
      .create table Device_Telemetry (Timestamp: datetime, DeviceId: string, DeviceType: string, SensorName: string, SensorValue: real, SensorUnit: string)
      .set-or-append Device_Alarms <| Get_Alarms | take 0
      .set-or-append Error_Log <| Log_Error | take 0
    

    La table de destination doit avoir le même schéma que la sortie de la fonction de transformation. Vous pouvez créer des tables de destination des manières suivantes :

    • Utilisation de la .create table commande et spécification manuelle du schéma, comme illustré lors de la création de la table Device_Telemetry . Toutefois, cette approche peut être sujette aux erreurs et prendre du temps.
    • Utilisez la .set-or-append commande si vous avez déjà créé une fonction pour transformer les données. Cette méthode crée une table avec le même schéma que la sortie de la fonction, en utilisant take 0 pour vous assurer que la fonction retourne uniquement le schéma. Pour plus d’informations, consultez commande .set-or-append.
  4. Exécutez la commande suivante pour créer les stratégies de mise à jour pour les tables de destination

    .execute database script <|
      .alter table Device_Telemetry policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Get_Telemetry\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]"
      .alter table Device_Alarms policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Get_Alarms\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]"
      .alter table Error_Log policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Log_Error\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]"
    

    La .alter table policy update commande est utilisée pour lier la table source, la fonction de transformation et la table de destination. La stratégie de mise à jour est créée sur la table de destination et spécifie la table source et la fonction de transformation. Pour plus d’informations, consultez commande .alter table policy update.

2 - Ingérer des exemples de données

Pour tester les stratégies de mise à jour, vous pouvez ingérer des exemples de données dans la table source à l’aide de la .set-or-append commande . Pour plus d’informations, consultez Ingérer des données à partir d’une requête.

.set-or-append Raw_Table <|
  let Raw_Stream = datatable(RawData: dynamic)
    [
    dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Telemetry", "DeviceType": "Laminator", "SensorName": "Temperature", "SensorValue": 78.3, "SensorUnit": "Celcius"}),
    dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Alarms", "DeviceType": "Laminator", "AlarmType": "Temperature threshold breached"}),
    dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Foo", "ErrorType": "Unknown"})
  ];
  Raw_Stream

3 - Vérifier les résultats

Pour valider les résultats, vous pouvez exécuter une requête pour vérifier que les données ont été transformées et routées vers les tables de destination. Dans l’exemple suivant, l’opérateur union est utilisé pour combiner la source et les résultats des tables de destination dans un jeu de résultats unique.

Raw_Table | summarize Rows=count() by TableName = "Raw_Table"
| union (Device_Telemetry | summarize Rows=count() by TableName = "Device_Telemetry")
| union (Device_Alarms | summarize Rows=count() by TableName = "Device_Alarms")
| union (Error_Log | summarize Rows=count() by TableName = "Error_Log")
| sort by Rows desc

Sortie

Vous devriez voir la sortie suivante où le Raw_Table a trois lignes et les tables de destination ont une ligne chacune.

TableName Lignes
Raw_Table 3
Error_log 1
Device_Alarms 1
Device_Telemetry 1

Nettoyer les ressources

Exécutez la commande suivante dans votre base de données pour propre les tables et fonctions créées dans ce didacticiel.

.execute database script <|
  .drop table Raw_Table
  .drop table Device_Telemetry
  .drop table Device_Alarms
  .drop table Error_Log
  .drop function Get_Telemetry
  .drop function Get_Alarms
  .drop function Log_Error