在 Azure SQL Edge 中建立資料串流工作

重要

Azure SQL Edge 不再支援 ARM64 平台。

本文說明如何在 Azure SQL Edge 中建立 T-SQL 串流工作。 您可以建立外部串流輸入和輸出物件,然後將串流工作查詢定義為串流工作建立的一部分。

設定外部串流的輸入和輸出物件

T-SQL 串流會使用 SQL Server 的外部資料來源功能,來定義與外部串流的串流工作輸入和輸出相關聯的資料來源。 使用下列 T-SQL 命令來建立外部串流的輸入或輸出物件:

此外,如果將 Azure SQL Edge、SQL Server 或 Azure SQL Database 用作輸出串流,您需要 CREATE DATABASE SCOPED CREDENTIAL (Transact-SQL)。 此 T-SQL 命令會定義存取資料庫的認證。

所支援的輸入和輸出串流資料來源

Azure SQL Edge 目前僅支援使用下列資料來源作為串流的輸入和輸出。

資料來源類型 輸入 輸出 描述
Azure IoT Edge 中樞 Y Y 用來讀取和寫入串流資料至 Azure IoT Edge 中樞的資料來源。 如需詳細資訊,請參閱 IoT Edge 中樞
SQL Database 用來將串流資料寫入到 SQL Database 的資料來源連線。 資料庫可以是 Azure SQL Edge 中的本機資料庫,也可以是 SQL Server 或 Azure SQL Database 中的遠端資料庫。
Kafka 用來從 Kafka 主題讀取串流資料的資料來源。

範例:為 Azure IoT Edge 中樞建立外部串流的輸入/輸出物件

下列範例會為 Azure IoT Edge 中樞建立外部串流物件。 若要為 Azure IoT Edge 中樞建立外部串流的輸入/輸出資料來源,您必須先為要讀取/寫入資料的配置建立外部檔案格式。

  1. 建立 JSON 類型的外部檔案格式。

    CREATE EXTERNAL FILE format InputFileFormat
    WITH (FORMAT_TYPE = JSON);
    GO
    
  2. 建立 Azure IoT Edge 中樞的外部資料來源。 下列 T-SQL 指令碼會建立對 IoT Edge 中樞 (在與 Azure SQL Edge 相同的Docker 主機上執行) 的資料來源連線。

    CREATE EXTERNAL DATA SOURCE EdgeHubInput
    WITH (LOCATION = 'edgehub://');
    GO
    
  3. 建立 Azure IoT Edge 中樞的外部串流物件。 下列 T-SQL 指令碼會為 IoT Edge 中樞建立串流物件。 如果是 IoT Edge 中樞串流物件,LOCATION 參數會是所要讀取或寫入的 IoT Edge 中樞主題或通道名稱。

    CREATE EXTERNAL STREAM MyTempSensors
    WITH (
         DATA_SOURCE = EdgeHubInput,
         FILE_FORMAT = InputFileFormat,
         LOCATION = N'TemperatureSensors',
         INPUT_OPTIONS = N'',
         OUTPUT_OPTIONS = N''
    );
    GO
    

範例:建立對 Azure SQL Database 的外部串流物件

下列範例會在 Azure SQL Edge 中建立本機資料庫的外部串流物件。

  1. 在資料庫上建立主要金鑰。 這是加密認證祕密的必要項目。

    CREATE MASTER KEY ENCRYPTION BY PASSWORD = '<<Strong_Password_For_Master_Key_Encryption>>';
    
  2. 建立資料庫範圍認證以存取 SQL Server 來源。 下列範例會針對 IDENTITY = 'username' 和 SECRET = 'password' 的外部資料來源建立認證。

    CREATE DATABASE SCOPED CREDENTIAL SQLCredential
    WITH IDENTITY = '<SQL_Login>', SECRET = '<SQL_Login_PASSWORD>';
    GO
    
  3. 使用 CREATE EXTERNAL DATA SOURCE 建立外部資料來源。 下列範例將:

    • 建立名為 LocalSQLOutput 的外部資料來源。
    • 指定外部資料來源 (LOCATION = '<vendor>://<server>[:<port>]')。 在此範例中,其會指向 Azure SQL Edge 的本機執行個體。
    • 使用先前建立的認證。
    CREATE EXTERNAL DATA SOURCE LocalSQLOutput
    WITH (
         LOCATION = 'sqlserver://tcp:.,1433',
         CREDENTIAL = SQLCredential
    );
    GO
    
  4. 建立外部串流物件。 下列範例會建立指向資料庫 MySQLDatabase 中資料表 dbo.TemperatureMeasurements 的外部串流物件。

    CREATE EXTERNAL STREAM TemperatureMeasurements
    WITH
    (
        DATA_SOURCE = LocalSQLOutput,
        LOCATION = N'MySQLDatabase.dbo.TemperatureMeasurements',
        INPUT_OPTIONS = N'',
        OUTPUT_OPTIONS = N''
    );
    

範例:建立 Kafka 的外部串流物件

下列範例會在 Azure SQL Edge 中建立本機資料庫的外部串流物件。 此範例假設 kafka 伺服器已針對匿名存取設定。

  1. 使用 CREATE EXTERNAL DATA SOURCE 建立外部資料來源。 下列範例將:

    CREATE EXTERNAL DATA SOURCE [KafkaInput]
    WITH (LOCATION = N'kafka://<kafka_bootstrap_server_name_ip>:<port_number>');
    GO
    
  2. 建立 Kafka 輸入的外部檔案格式。 下列範例會使用 GZipped 壓縮建立 JSON 檔案格式。

    CREATE EXTERNAL FILE FORMAT JsonGzipped
    WITH (
         FORMAT_TYPE = JSON,
         DATA_COMPRESSION = 'org.apache.hadoop.io.compress.GzipCodec'
    );
    GO
    
  3. 建立外部串流物件。 下列範例會建立指向 Kafka 主題 TemperatureMeasurement 的外部串流物件。

    CREATE EXTERNAL STREAM TemperatureMeasurement
    WITH
    (
        DATA_SOURCE = KafkaInput,
        FILE_FORMAT = JsonGzipped,
        LOCATION = 'TemperatureMeasurement',
        INPUT_OPTIONS = 'PARTITIONS: 10'
    );
    GO
    

建立串流工作和串流查詢

使用 sys.sp_create_streaming_job 系統預存程序來定義串流查詢,並建立串流作業。 sp_create_streaming_job 預存程序會採用下列參數:

下列範例會建立具有一個串流查詢的簡易串流工作。 此查詢會從 IoT Edge 中樞讀取輸入,並寫入到資料庫中的 dbo.TemperatureMeasurements

EXEC sys.sp_create_streaming_job @name = N'StreamingJob1',
    @statement = N'Select * INTO TemperatureMeasurements from MyEdgeHubInput'

下列範例會建立具有多個不同查詢的較複雜的串流工作。 這些查詢包含一個使用內建 AnomalyDetection_ChangePoint 函式來識別溫度資料異常的查詢。

EXEC sys.sp_create_streaming_job @name = N'StreamingJob2',
    @statement = N'
        SELECT *
        INTO TemperatureMeasurements1
        FROM MyEdgeHubInput1

        SELECT *
        INTO TemperatureMeasurements2
        FROM MyEdgeHubInput2

        SELECT *
        INTO TemperatureMeasurements3
        FROM MyEdgeHubInput3

        SELECT timestamp AS [Time],
            [Temperature] AS [Temperature],
            GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER (LIMIT DURATION(minute, 20)), '' Score '') AS ChangePointScore,
            GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER (LIMIT DURATION(minute, 20)), '' IsAnomaly '') AS IsChangePointAnomaly
        INTO TemperatureAnomalies
        FROM MyEdgeHubInput2;
';
GO

開始、停止、捨棄和監視串流工作

若要在 Azure SQL Edge 中開始串流工作,請執行 sys.sp_start_streaming_job 預存程序。 預存程序需要要開始的串流工作的名稱 (作為輸入)。

EXEC sys.sp_start_streaming_job @name = N'StreamingJob1';
GO

若要停止串流工作,請執行 sys.sp_stop_streaming_job 預存程序。 預存程序需要要停止的串流工作的名稱 (作為輸入)。

EXEC sys.sp_stop_streaming_job @name = N'StreamingJob1';
GO

若要捨棄 (或刪除) 串流工作,請執行 sys.sp_drop_streaming_job 預存程序。 預存程序需要要捨棄的串流工作的名稱 (作為輸入)。

EXEC sys.sp_drop_streaming_job @name = N'StreamingJob1';
GO

若要取得串流工作的目前狀態,請執行 sys.sp_get_streaming_job 預存程序。 預存程序需要要捨棄的串流工作的名稱 (作為輸入)。 其會輸出串流工作的名稱和目前狀態。

EXEC sys.sp_get_streaming_job @name = N'StreamingJob1'
WITH RESULT SETS (
        (
            name NVARCHAR(256),
            status NVARCHAR(256),
            error NVARCHAR(256)
        )
    );
GO

串流工作可以具有下列任何一種狀態:

狀態 描述
已建立 已建立串流工作,但尚未開始。
啟動中 串流作業處於正在啟動的階段。
閒置 串流工作正在執行,但沒有任何要處理的輸入。
正在處理 串流作業正在執行,且正在處理輸入。 此狀態表示串流作業處於健全狀態。
已降級 串流工作正在執行,但在輸入處理期間有一些非嚴重錯誤。 輸入作業會繼續執行,但會捨棄發生錯誤的輸入。
已停止 串流作業已停止。
失敗 串流工作失敗。 這通常表示處理期間發生嚴重錯誤。

注意

由於串流工作是非同步執行,因此工作可能會在執行階段遇到錯誤。 為針對串流工作失敗進行疑難排解,請使用 sys.sp_get_streaming_job 預存程序,或檢閱來自 Azure SQL Edge 容器的 Docker 記錄,其可提供串流工作的錯誤詳細資料。

下一步