教學課程:使用數據表更新原則路由數據
當您的源數據牽涉到簡單且快速的轉換時,最好使用事件數據流在管線中執行它們。 不過,這種方法可能不適用於其他複雜或需要特殊功能運作的轉換。
在本教學課程中,您會了解如何:
本教學課程中的範例示範如何使用 數據路由 的更新原則來執行複雜的轉換,以在擷取時擴充、清理和轉換數據。 如需其他常見使用案例的清單,請參閱 數據表更新原則的常見使用案例。
必要條件
- Microsoft 帳戶或 Microsoft Entra 使用者身分識別。 不需要 Azure 訂用帳戶。
- Azure 資料總管叢集和資料庫。 在 Microsoft Fabric 的 Real-Time Analytics 中建立叢集和資料庫或 KQL 資料庫。
1 - 建立數據表和更新原則
下列步驟會引導您建立源數據表、轉換函式、目的地數據表和更新原則。 本教學課程示範如何使用數據表更新原則來執行複雜的轉換,並將結果儲存至一或多個目的地數據表。 此範例使用名為 Raw_Table 的單一源數據表,以及名為Device_Telemetry、Device_Alarms 和 Error_Log 的三個目的地數據表。
執行下列命令以建立名為 Raw_Table 的數據表。
.create table Raw_Table (RawData: dynamic)
源數據表是儲存內嵌數據的位置。 數據表具有名為 Dynamic 類型的 RawData 單一數據行。 動態類型可用來依原樣儲存原始數據,而不需要任何架構。 如需詳細資訊,請參閱 .create table 命令。
執行下列命令來建立名為 Get_Telemetry、 Get_Alarms 和 Log_Error 函式的函式。
.execute database script <| .create-or-alter function Get_Telemetry() { Raw_Table | where todynamic(RawData).MessageType == 'Telemetry' | extend Timestamp = unixtime_seconds_todatetime(tolong(RawData.Timestamp)), DeviceId = tostring(RawData.DeviceId), DeviceType = tostring(RawData.DeviceType), SensorName = tostring(RawData.SensorName), SensorValue = toreal(RawData.SensorValue), SensorUnit = tostring(RawData.SensorUnit) | project-away RawData } .create-or-alter function Get_Alarms() { Raw_Table | where RawData.MessageType == 'Alarms' | extend Timestamp = unixtime_seconds_todatetime(tolong(RawData.Timestamp)), DeviceId = tostring(RawData.DeviceId), DeviceType = tostring(RawData.DeviceTpe) , AlarmType = tostring(RawData.AlarmType) | project-away RawData } .create-or-alter function Log_Error() { Raw_Table | where RawData.MessageType !in ('Telemetry', 'Alarms') | extend TimeStamp = datetime(now), ErrorType = 'Unknown MessageType' | project TimeStamp, RawData, ErrorType }
建立更新原則時,您可以指定執行內嵌腳本。 不過,我們建議將轉換邏輯封裝成函式。 使用函式可改善程式代碼維護。 當新數據送達時,會執行 函式來轉換數據。 函式可以跨多個更新原則重複使用。 如需詳細資訊,請參閱 .create 函式命令。
執行下列命令以建立目的地數據表。
.execute database script <| .create table Device_Telemetry (Timestamp: datetime, DeviceId: string, DeviceType: string, SensorName: string, SensorValue: real, SensorUnit: string) .set-or-append Device_Alarms <| Get_Alarms | take 0 .set-or-append Error_Log <| Log_Error | take 0
目的地數據表的架構必須與轉換函式的輸出具有相同的架構。 您可以透過下列方式建立目的地資料表:
.create table
使用 命令並手動指定架構,如建立Device_Telemetry數據表所示。 不過,這種方法可能會容易出錯且耗時。.set-or-append
如果您已經建立函式來轉換數據,請使用 命令。 這個方法會建立新的數據表,其架構與函式的輸出相同,使用take 0
以確保函式只會傳回架構。 如需詳細資訊,請參閱 .set-or-append 命令。
執行下列命令以建立目的地數據表的更新原則
.execute database script <| .alter table Device_Telemetry policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Get_Telemetry\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]" .alter table Device_Alarms policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Get_Alarms\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]" .alter table Error_Log policy update "[{\"IsEnabled\":true,\"Source\":\"Raw_Table\",\"Query\":\"Log_Error\",\"IsTransactional\":false,\"PropagateIngestionProperties\":true,\"ManagedIdentity\":null}]"
命令
.alter table policy update
可用來連結源數據表、轉換函式和目的地數據表。 更新原則會在目的地數據表上建立,並指定源數據表和轉換函式。 如需詳細資訊,請參閱 .alter table policy update 命令。
2 - 擷取範例數據
若要測試更新原則,您可以使用 命令將範例數據內嵌至源數據表 .set-or-append
。 如需詳細資訊,請參閱 從查詢擷取數據。
.set-or-append Raw_Table <|
let Raw_Stream = datatable(RawData: dynamic)
[
dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Telemetry", "DeviceType": "Laminator", "SensorName": "Temperature", "SensorValue": 78.3, "SensorUnit": "Celcius"}),
dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Alarms", "DeviceType": "Laminator", "AlarmType": "Temperature threshold breached"}),
dynamic({"TimeStamp": 1691757932, "DeviceId": "Sensor01", "MessageType": "Foo", "ErrorType": "Unknown"})
];
Raw_Stream
3 - 驗證結果
若要驗證結果,您可以執行查詢,以確認數據已轉換並路由傳送至目的地數據表。 在下列範例中 union
,運算符可用來將來源和目的地數據表的結果合併成單一結果集。
Raw_Table | summarize Rows=count() by TableName = "Raw_Table"
| union (Device_Telemetry | summarize Rows=count() by TableName = "Device_Telemetry")
| union (Device_Alarms | summarize Rows=count() by TableName = "Device_Alarms")
| union (Error_Log | summarize Rows=count() by TableName = "Error_Log")
| sort by Rows desc
輸出
您應該會看到下列輸出,其中Raw_Table有三個數據列,而目的地數據表各有一個數據列。
TableName | 資料列 |
---|---|
Raw_Table | 3 |
Error_Log | 1 |
Device_Alarms | 1 |
Device_Telemetry | 1 |
清除資源
在資料庫中執行下列命令,以清除在本教學課程中建立的數據表和函式。
.execute database script <|
.drop table Raw_Table
.drop table Device_Telemetry
.drop table Device_Alarms
.drop table Error_Log
.drop function Get_Telemetry
.drop function Get_Alarms
.drop function Log_Error
相關內容
意見反應
https://aka.ms/ContentUserFeedback。
即將登場:在 2024 年,我們將逐步淘汰 GitHub 問題作為內容的意見反應機制,並將它取代為新的意見反應系統。 如需詳細資訊,請參閱:提交並檢視相關的意見反應