Vytvoření úlohy streamování dat v Azure SQL Edge

Důležité

Azure SQL Edge už nepodporuje platformu ARM64.

Tento článek vysvětluje, jak vytvořit úlohu streamování T-SQL v Azure SQL Edge. Vytvoříte externí vstupní a výstupní objekty datového proudu a pak definujete dotaz úlohy streamování jako součást vytváření úlohy streamování.

Konfigurace vstupních a výstupních objektů externího datového proudu

Streamování T-SQL používá funkci externího zdroje dat SQL Serveru k definování zdrojů dat přidružených ke vstupům externího datového proudu a výstupům úlohy streamování. K vytvoření externího vstupního nebo výstupního objektu datového proudu použijte následující příkazy T-SQL:

Pokud se navíc Azure SQL Edge, SQL Server nebo Azure SQL Database používají jako výstupní datový proud, potřebujete PŘIHLAŠOVACÍ ÚDAJE CREATE DATABASE SCOPED (Transact-SQL). Tento příkaz T-SQL definuje přihlašovací údaje pro přístup k databázi.

Podporované zdroje dat vstupního a výstupního streamu

Azure SQL Edge v současné době podporuje jako vstupy a výstupy datových proudů pouze následující zdroje dat.

Typ zdroje dat Vstup Výstup Popis
Centrum Azure IoT Edge Y Y Zdroj dat pro čtení a zápis streamovaných dat do centra Azure IoT Edge. Další informace najdete v tématu IoT Edge Hub.
Databáze SQL N Y Připojení ke zdroji dat pro zápis streamovaných dat do služby SQL Database Databáze může být místní databáze v Azure SQL Edge nebo vzdálená databáze v SQL Serveru nebo Azure SQL Database.
Kafka Y N Zdroj dat ke čtení streamovaných dat z tématu Kafka

Příklad: Vytvoření externího vstupního/výstupního objektu streamu pro centrum Azure IoT Edge

Následující příklad vytvoří objekt externího streamu pro centrum Azure IoT Edge. Pokud chcete vytvořit externí zdroj vstupních a výstupních dat datového proudu pro centrum Azure IoT Edge, musíte nejprve vytvořit formát externího souboru pro rozložení dat, která se čtou nebo zapisují.

  1. Vytvořte formát externího souboru typu JSON.

    CREATE EXTERNAL FILE format InputFileFormat
    WITH (FORMAT_TYPE = JSON);
    GO
    
  2. Vytvořte externí zdroj dat pro centrum Azure IoT Edge. Následující skript T-SQL vytvoří připojení zdroje dat k centru IoT Edge, které běží na stejném hostiteli Dockeru jako Azure SQL Edge.

    CREATE EXTERNAL DATA SOURCE EdgeHubInput
    WITH (LOCATION = 'edgehub://');
    GO
    
  3. Vytvořte objekt externího streamu pro centrum Azure IoT Edge. Následující skript T-SQL vytvoří objekt streamu pro centrum IoT Edge. V případě objektu streamu centra IoT Edge je parametr LOCATION názvem tématu centra IoT Edge nebo kanálu, do kterého se čte nebo zapisuje.

    CREATE EXTERNAL STREAM MyTempSensors
    WITH (
         DATA_SOURCE = EdgeHubInput,
         FILE_FORMAT = InputFileFormat,
         LOCATION = N'TemperatureSensors',
         INPUT_OPTIONS = N'',
         OUTPUT_OPTIONS = N''
    );
    GO
    

Příklad: Vytvoření externího streamového objektu do azure SQL Database

Následující příklad vytvoří objekt externího datového proudu pro místní databázi v Azure SQL Edge.

  1. Vytvořte v databázi hlavní klíč. To se vyžaduje k šifrování tajného klíče přihlašovacích údajů.

    CREATE MASTER KEY ENCRYPTION BY PASSWORD = '<<Strong_Password_For_Master_Key_Encryption>>';
    
  2. Vytvořte přihlašovací údaje s oborem databáze pro přístup ke zdroji SQL Serveru. Následující příklad vytvoří přihlašovací údaje pro externí zdroj dat s IDENTITOU = uživatelské jméno a SECRET = heslo.

    CREATE DATABASE SCOPED CREDENTIAL SQLCredential
    WITH IDENTITY = '<SQL_Login>', SECRET = '<SQL_Login_PASSWORD>';
    GO
    
  3. Vytvořte externí zdroj dat pomocí příkazu CREATE EXTERNAL DATA SOURCE. Následující příklad:

    • Vytvoří externí zdroj dat s názvem LocalSQLOutput.
    • Identifikuje externí zdroj dat (LOCATION = '<vendor>://<server>[:<port>]'). V tomto příkladu odkazuje na místní instanci Azure SQL Edge.
    • Používá přihlašovací údaje vytvořené dříve.
    CREATE EXTERNAL DATA SOURCE LocalSQLOutput
    WITH (
         LOCATION = 'sqlserver://tcp:.,1433',
         CREDENTIAL = SQLCredential
    );
    GO
    
  4. Vytvořte objekt externího datového proudu. Následující příklad vytvoří objekt externího datového proudu odkazující na tabulku dbo. TemperatureMeasurements v databázi MySQLDatabase.

    CREATE EXTERNAL STREAM TemperatureMeasurements
    WITH
    (
        DATA_SOURCE = LocalSQLOutput,
        LOCATION = N'MySQLDatabase.dbo.TemperatureMeasurements',
        INPUT_OPTIONS = N'',
        OUTPUT_OPTIONS = N''
    );
    

Příklad: Vytvoření objektu externího streamu pro Kafka

Následující příklad vytvoří objekt externího datového proudu pro místní databázi v Azure SQL Edge. V tomto příkladu se předpokládá, že je server Kafka nakonfigurovaný pro anonymní přístup.

  1. Vytvořte externí zdroj dat pomocí příkazu CREATE EXTERNAL DATA SOURCE. Následující příklad:

    CREATE EXTERNAL DATA SOURCE [KafkaInput]
    WITH (LOCATION = N'kafka://<kafka_bootstrap_server_name_ip>:<port_number>');
    GO
    
  2. Vytvořte pro vstup Kafka formát externího souboru. Následující příklad vytvořil formát souboru JSON s GZipped Compression.

    CREATE EXTERNAL FILE FORMAT JsonGzipped
    WITH (
         FORMAT_TYPE = JSON,
         DATA_COMPRESSION = 'org.apache.hadoop.io.compress.GzipCodec'
    );
    GO
    
  3. Vytvořte objekt externího datového proudu. Následující příklad vytvoří objekt externího datového proudu odkazující na téma TemperatureMeasurementKafka .

    CREATE EXTERNAL STREAM TemperatureMeasurement
    WITH
    (
        DATA_SOURCE = KafkaInput,
        FILE_FORMAT = JsonGzipped,
        LOCATION = 'TemperatureMeasurement',
        INPUT_OPTIONS = 'PARTITIONS: 10'
    );
    GO
    

Vytvoření úlohy streamování a dotazů streamování

sys.sp_create_streaming_job Pomocí systémové uložené procedury definujte dotazy streamování a vytvořte úlohu streamování. Uložená procedura sp_create_streaming_job přebírá následující parametry:

  • @job_name: Název úlohy streamování. Názvy úloh streamování jsou v celé instanci jedinečné.
  • @statement: Stream Analytics Query Language – příkazy dotazů založené na streamovacím jazyce

Následující příklad vytvoří jednoduchou úlohu streamování s jedním dotazem streamování. Tento dotaz načte vstupy z centra IoT Edge a zapisuje do dbo.TemperatureMeasurements databáze.

EXEC sys.sp_create_streaming_job @name = N'StreamingJob1',
    @statement = N'Select * INTO TemperatureMeasurements from MyEdgeHubInput'

Následující příklad vytvoří složitější úlohu streamování s několika různými dotazy. Mezi tyto dotazy patří jedna, která používá integrovanou AnomalyDetection_ChangePoint funkci k identifikaci anomálií v datech o teplotě.

EXEC sys.sp_create_streaming_job @name = N'StreamingJob2',
    @statement = N'
        SELECT *
        INTO TemperatureMeasurements1
        FROM MyEdgeHubInput1

        SELECT *
        INTO TemperatureMeasurements2
        FROM MyEdgeHubInput2

        SELECT *
        INTO TemperatureMeasurements3
        FROM MyEdgeHubInput3

        SELECT timestamp AS [Time],
            [Temperature] AS [Temperature],
            GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER (LIMIT DURATION(minute, 20)), '' Score '') AS ChangePointScore,
            GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER (LIMIT DURATION(minute, 20)), '' IsAnomaly '') AS IsChangePointAnomaly
        INTO TemperatureAnomalies
        FROM MyEdgeHubInput2;
';
GO

Spuštění, zastavení, vyřazení a monitorování úloh streamování

Pokud chcete spustit úlohu streamování v Azure SQL Edge, spusťte uloženou proceduru sys.sp_start_streaming_job . Uložená procedura vyžaduje, aby se jako vstup spustil název úlohy streamování.

EXEC sys.sp_start_streaming_job @name = N'StreamingJob1';
GO

Pokud chcete zastavit úlohu streamování, spusťte uloženou proceduru sys.sp_stop_streaming_job . Uložená procedura vyžaduje, aby se jako vstup zastavil název úlohy streamování.

EXEC sys.sp_stop_streaming_job @name = N'StreamingJob1';
GO

Pokud chcete odstranit (nebo odstranit) úlohu streamování, spusťte uloženou proceduru sys.sp_drop_streaming_job . Uložená procedura vyžaduje, aby se jako vstup zobrazoval název úlohy streamování.

EXEC sys.sp_drop_streaming_job @name = N'StreamingJob1';
GO

Pokud chcete získat aktuální stav úlohy streamování, spusťte uloženou proceduru sys.sp_get_streaming_job . Uložená procedura vyžaduje, aby se jako vstup zobrazoval název úlohy streamování. Vypíše název a aktuální stav úlohy streamování.

EXEC sys.sp_get_streaming_job @name = N'StreamingJob1'
WITH RESULT SETS (
        (
            name NVARCHAR(256),
            status NVARCHAR(256),
            error NVARCHAR(256)
        )
    );
GO

Úloha streamování může mít některý z následujících stavů:

Průběh Popis
Vytvořeno Úloha streamování byla vytvořena, ale ještě nebyla spuštěna.
Spouštění Úloha streamování je ve počáteční fázi.
Období Úloha streamování je spuštěná, ale neexistuje žádný vstup ke zpracování.
Zpracování Úloha streamování je spuštěná a zpracovává vstupy. Tento stav označuje stav, který je v pořádku pro úlohu streamování.
Snížený výkon Úloha streamování je spuštěná, ale během zpracování vstupu došlo k některým nefaktálním chybám. Vstupní úloha se bude dál spouštět, ale zahodí vstupy, u kterých dochází k chybám.
Zastaveno Úloha streamování byla zastavena.
Nezdařilo se Úloha streamování selhala. Obvykle to značí závažnou chybu během zpracování.

Poznámka:

Vzhledem k tomu, že úloha streamování je spuštěna asynchronně, může úloha narazit na chyby za běhu. K řešení potíží se selháním úlohy streamování použijte sys.sp_get_streaming_job uloženou proceduru nebo zkontrolujte protokol Dockeru z kontejneru Azure SQL Edge, který může poskytnout podrobnosti o chybě z úlohy streamování.

Další kroky