CREATE EXTERNAL STREAM (Transact-SQL)

重要

Azure SQL Edge 不再支援 ARM64 平台。

EXTERNAL STREAM 物件具有輸入和輸出串流的雙重用途。 其既可作為輸入以查詢來自事件擷取服務 (例如 Azure 事件中樞、Azure IoT 中樞 (或 Edge 中樞) 或 Kafka) 的串流資料,也可作為輸出以指定串流查詢所得結果的儲存位置和方式。

您也可以指定和建立 EXTERNAL STREAM 來作為事件中樞或 Blob 儲存體等服務的輸出和輸入。 這有助於鏈結案例,也就是某個串流查詢將結果保存到外部串流作為輸出,而從相同外部串流讀取來的另一個串流查詢則作為輸入。

Azure SQL Edge 目前僅支援使用下列資料來源作為串流的輸入和輸出。

資料來源類型 輸入 輸出 描述
Azure IoT Edge 中樞 Y Y 用來讀取和寫入串流資料至 Azure IoT Edge 中樞的資料來源。 如需詳細資訊,請參閱 IoT Edge 中樞
SQL Database 用來將串流資料寫入到 SQL Database 的資料來源連線。 資料庫可以是 Azure SQL Edge 中的本機資料庫,也可以是 SQL Server 或 Azure SQL Database 中的遠端資料庫。
Kafka 用來從 Kafka 主題讀取串流資料的資料來源。

語法

CREATE EXTERNAL STREAM { external_stream_name }
( <column_definition> [ , <column_definition> ] * ) -- Used for Inputs - optional
WITH  ( <with_options> )

<column_definition> ::=
  column_name <column_data_type>

<data_type> ::=
[ type_schema_name . ] type_name
    [ ( precision [ , scale ] | max ) ]

<with_options> ::=
  DATA_SOURCE = data_source_name ,
  LOCATION = location_name ,
  [ FILE_FORMAT = external_file_format_name ] , --Used for Inputs - optional
  [ <optional_input_options> ] ,
  [ <optional_output_options> ] ,
  TAGS = <tag_column_value>

<optional_input_options> ::=
  INPUT_OPTIONS = ' [ <input_options_data> ] '

<Input_option_data> ::=
      <input_option_values> [ , <input_option_values> ]

<input_option_values> ::=
  PARTITIONS: [ number_of_partitions ]
  | CONSUMER_GROUP: [ consumer_group_name ]
  | TIME_POLICY: [ time_policy ]
  | LATE_EVENT_TOLERANCE: [ late_event_tolerance_value ]
  | OUT_OF_ORDER_EVENT_TOLERANCE: [ out_of_order_tolerance_value ]

<optional_output_options> ::=
  OUTPUT_OPTIONS = ' [ <output_option_data> ] '

<output_option_data> ::=
      <output_option_values> [ , <output_option_values> ]

<output_option_values> ::=
   REJECT_POLICY: [ reject_policy ]
   | MINIMUM_ROWS: [ row_value ]
   | MAXIMUM_TIME: [ time_value_minutes ]
   | PARTITION_KEY_COLUMN: [ partition_key_column_name ]
   | PROPERTY_COLUMNS: [ ( [ output_col_name ] ) ]
   | SYSTEM_PROPERTY_COLUMNS: [ ( [ output_col_name ] ) ]
   | PARTITION_KEY: [ partition_key_name ]
   | ROW_KEY: [ row_key_name ]
   | BATCH_SIZE: [ batch_size_value ]
   | MAXIMUM_BATCH_COUNT: [ batch_value ]
   | STAGING_AREA: [ blob_data_source ]

<tag_column_value> ::= -- Reserved for Future Usage
);

引數

DATA_SOURCE

如需詳細資訊,請參閱 DATA_SOURCE

FILE_FORMAT

如需詳細資訊,請參閱 FILE_FORMAT

LOCATION

指定資料來源中實際資料或位置的名稱。

  • 對於 Edge 中樞或 Kafka 串流物件,位置會指定要從中讀取或寫入的 Edge 中樞或 Kafka 主題的名稱。
  • 對於 SQL 串流物件 (SQL Server、Azure SQL Database 或 Azure SQL Edge),位置會指定資料表的名稱。 如果在與目的地資料表相同的資料庫和結構描述中建立串流,則只有資料表名稱就已足夠。 否則,您必須完整限定的資料表名稱 (<database_name>.<schema_name>.<table_name>)。
  • 對於 Azure Blob 儲存體串流物件,位置是指 Blob 容器內要使用的路徑模式。 如需詳細資訊,請參閱來自 Azure 串流分析的輸出

INPUT_OPTIONS

針對作為串流查詢輸入的服務 (例如 Kafka 和 IoT Edge 中樞),將選項指定為索引鍵/值組。

  • PARTITIONS:

    針對主題所定義的分割區數目。 可以使用的分割區數目上限限制為 32 個 (適用 Kafka 輸入資料流)。

    • CONSUMER_GROUP:

      事件中樞和 IoT 中樞會限制一個取用者群組可擁有的讀者數量 (最多 5 個)。 將此欄位保留空白則會使用 '$Default' 取用者群組。

      • 保留供日後使用。 不適用 Azure SQL Edge。
    • TIME_POLICY:

      描述當延遲的事件或順序不正確的事件超過其容錯值時,是要捨棄事件還是調整事件的時間。

      • 保留供日後使用。 不適用 Azure SQL Edge。
    • LATE_EVENT_TOLERANCE:

      可接受的最大時間延遲。 延遲代表事件的時間戳記與系統時鐘之間的差異。

      • 保留供日後使用。 不適用 Azure SQL Edge。
    • OUT_OF_ORDER_EVENT_TOLERANCE:

      事件在從輸入前往串流查詢時,可能不會按順序抵達。 您可以依現狀接受這些事件,或者也可以選擇暫停一定時間來將其重新排序。

      • 保留供日後使用。 不適用 Azure SQL Edge。

OUTPUT_OPTIONS

針對輸出至串流查詢的受支援服務,將選項指定為索引鍵/值組

  • REJECT_POLICY: DROP | RETRY

    指定在發生資料轉換錯誤時的資料錯誤處理原則。

    • 適用所有支援的輸出。
  • MINIMUM_ROWS:

    寫入到輸出的每個批次所需包含的資料列數量下限。 針對 Parquet,每個批次都會建立新的檔案。

    • 適用所有支援的輸出。
  • MAXIMUM_TIME:

    每個批次的等候時間上限 (以分鐘為單位)。 在此時間後,即使未符合資料列下限需求,也會將批次寫入輸出。

    • 適用所有支援的輸出。
  • PARTITION_KEY_COLUMN:

    用於分割區索引鍵的資料行。

    • 保留供日後使用。 不適用 Azure SQL Edge。
  • PROPERTY_COLUMNS:

    要附加到訊息作為自訂屬性的輸出資料行名稱逗點分隔清單 (如有提供)。

    • 保留供日後使用。 不適用 Azure SQL Edge。
  • SYSTEM_PROPERTY_COLUMNS:

    要在服務匯流排訊息上填入之系統屬性名稱和輸出資料行的 JSON 格式名稱/值對集合。 例如: { "MessageId": "column1", "PartitionKey": "column2" }

    • 保留供日後使用。 不適用 Azure SQL Edge。
  • PARTITION_KEY:

    包含資料分割索引鍵的輸出資料行名稱。 在構成實體主索引鍵第一個部分的指定資料表內,資料分割索引鍵是資料分割的唯一識別碼。 大小最高為 1 KB 的字串值。

    • 保留供日後使用。 不適用 Azure SQL Edge。
  • ROW_KEY:

    包含資料列索引鍵的輸出資料行名稱。 資料列索引鍵是指定資料分割內實體的唯一識別碼。 其可構成實體主索引鍵的第二個部分。 資料列索引鍵是大小可能高達 1 KB 的字串值。

    • 保留供日後使用。 不適用 Azure SQL Edge。
  • BATCH_SIZE:

    這代表資料表儲存體的交易數目,最多可達 100 筆記錄。 對於 Azure Functions,這代表每個呼叫傳送至函式的批次大小 (以位元組為單位) - 預設為 256 kB。

    • 保留供日後使用。 不適用 Azure SQL Edge。
  • MAXIMUM_BATCH_COUNT:

    針對 Azure 函式,每個呼叫傳送給函式的最大事件數目 - 預設為 100。 對於 SQL Database,這代表每個大量插入交易所傳送的記錄數目上限 - 預設為 10,000。

    • 適用於所有 SQL 型輸出
  • STAGING_AREA:Blob 儲存體的外部資料來源物件

    高輸送量資料擷取至 Azure Synapse Analytics 的暫存區域

    • 保留供日後使用。 不適用 Azure SQL Edge。

如需對應至資料來源類型的支援輸入和輸出選項的詳細資訊,請分別參閱 Azure 串流分析 - 輸入概觀Azure 串流分析 - 輸出概觀

範例

範例 A:EdgeHub

類型:輸入或輸出。

CREATE EXTERNAL DATA SOURCE MyEdgeHub
    WITH (LOCATION = 'edgehub://');

CREATE EXTERNAL FILE FORMAT myFileFormat
    WITH (FORMAT_TYPE = JSON);

CREATE EXTERNAL STREAM Stream_A
    WITH (
            DATA_SOURCE = MyEdgeHub,
            FILE_FORMAT = myFileFormat,
            LOCATION = '<mytopicname>',
            OUTPUT_OPTIONS = 'REJECT_TYPE: Drop'
            );

範例 B:Azure SQL Database、Azure SQL Edge、SQL Server

類型:輸出

CREATE DATABASE SCOPED CREDENTIAL SQLCredName
    WITH IDENTITY = '<user>',
        SECRET = '<password>';

-- Azure SQL Database
CREATE EXTERNAL DATA SOURCE MyTargetSQLTabl
    WITH (
            LOCATION = '<my_server_name>.database.windows.net',
            CREDENTIAL = SQLCredName
            );

--SQL Server or Azure SQL Edge
CREATE EXTERNAL DATA SOURCE MyTargetSQLTabl
    WITH (
            LOCATION = ' <sqlserver://<ipaddress>,<port>',
            CREDENTIAL = SQLCredName
            );

CREATE EXTERNAL STREAM Stream_A
    WITH (
            DATA_SOURCE = MyTargetSQLTable,
            LOCATION = '<DatabaseName>.<SchemaName>.<TableName>',
            --Note: If table is contained in the database, <TableName> should be sufficient
            OUTPUT_OPTIONS = 'REJECT_TYPE: Drop'
            );

範例 C:Kafka

類型:輸入

CREATE EXTERNAL DATA SOURCE MyKafka_tweets
    WITH (
            --The location maps to KafkaBootstrapServer
            LOCATION = 'kafka://<kafkaserver>:<ipaddress>',
            CREDENTIAL = kafkaCredName
            );

CREATE EXTERNAL FILE FORMAT myFileFormat
    WITH (
            FORMAT_TYPE = JSON,
            DATA_COMPRESSION = 'org.apache.hadoop.io.compress.GzipCodec'
            );

CREATE EXTERNAL STREAM Stream_A (
    user_id VARCHAR,
    tweet VARCHAR
    )
    WITH (
            DATA_SOURCE = MyKafka_tweets,
            LOCATION = '<KafkaTopicName>',
            FILE_FORMAT = myFileFormat,
            INPUT_OPTIONS = 'PARTITIONS: 5'
            );

另請參閱