vytvoření úlohy streamování dat v Azure SQL Edge
tento článek vysvětluje, jak vytvořit úlohu streamování T-SQL v Azure SQL Edge. Vytvoříte vstupní a výstupní objekty externího datového proudu a potom v rámci vytváření úlohy streamování definujete dotaz na úlohu streamování.
Konfigurace vstupních a výstupních objektů externího datového proudu
T-SQL streamování využívá funkci externích zdrojů dat SQL Server k definování zdrojů dat přidružených k vstupům externích datových proudů a výstupům úlohy streamování. pomocí následujících příkazů T-SQL vytvořte vstupní nebo výstupní objekt externího datového proudu:
pokud se navíc jako výstupní datový proud používá Azure SQL Edge, SQL Server nebo Azure SQL Database, budete potřebovat vytvořit databázi s rozsahem pověření (Transact-SQL). tento příkaz T-SQL definuje přihlašovací údaje pro přístup k databázi.
Podporované vstupní a výstupní datové proudy zdrojů dat
Azure SQL Edge aktuálně podporuje jenom následující zdroje dat jako vstupy a výstupy streamu.
| Typ zdroje dat | Vstup | Výstup | Description |
|---|---|---|---|
| Centrum Azure IoT Edge | Y | Y | Zdroj dat pro čtení a zápis streamovaná data do centra Azure IoT Edge. Další informace najdete v tématu IoT Edge hub. |
| SQL Database | N | Y | Připojení zdroje dat, které zapisuje streamovaná data do SQL Database. databáze může být místní databáze v Azure SQL Edge nebo vzdálená databáze v SQL Server nebo Azure SQL Database. |
| Kafka | Y | N | Zdroj dat pro čtení dat streamování z tématu Kafka. tento adaptér je v tuto chvíli dostupný jenom pro verze Intel nebo AMD služby Azure SQL Edge. není k dispozici pro ARM64 verze Azure SQL Edge. |
Příklad: vytvoření objektu vstupu/výstupu externího datového proudu pro Azure IoT Edge hub
Následující příklad vytvoří objekt externího datového proudu pro Azure IoT Edge hub. Pokud chcete vytvořit zdroj dat vstupu a výstupu externího datového proudu pro Azure IoT Edge centrum, musíte nejprve vytvořit externí formát souboru pro rozložení dat, která se čtou nebo zapisují i vy.
Vytvořte formát souboru JSON typu external.
Create External file format InputFileFormat WITH ( format_type = JSON, ) goVytvořte externí zdroj dat pro centrum Azure IoT Edge. následující skript T-SQL vytvoří připojení zdroje dat k rozbočovači IoT Edge, který běží na stejném hostiteli docker jako Azure SQL Edge.
CREATE EXTERNAL DATA SOURCE EdgeHubInput WITH ( LOCATION = 'edgehub://' ) goVytvořte objekt externího datového proudu pro Azure IoT Edge hub. následující skript T-SQL vytvoří pro centrum IoT Edge objekt datového proudu. V případě objektu datového proudu centra IoT Edge je parametr LOCATION názvem tématu IoT Edge centra nebo kanálu, do kterého se čte nebo zapisuje.
CREATE EXTERNAL STREAM MyTempSensors WITH ( DATA_SOURCE = EdgeHubInput, FILE_FORMAT = InputFileFormat, LOCATION = N'TemperatureSensors', INPUT_OPTIONS = N'', OUTPUT_OPTIONS = N'' ); go
Příklad: vytvoření objektu externího datového proudu pro Azure SQL Database
následující příklad vytvoří objekt externího datového proudu pro místní databázi v Azure SQL Edge.
Vytvořte v databázi hlavní klíč. Tento klíč je nutný k šifrování tajného kódu přihlašovacích údajů.
CREATE MASTER KEY ENCRYPTION BY PASSWORD = '<<Strong_Password_For_Master_Key_Encryption>>';vytvořte pověření v oboru databáze pro přístup ke zdroji SQL Server. Následující příklad vytvoří přihlašovací údaje k externímu zdroji dat s identitou = username a tajné = Password.
CREATE DATABASE SCOPED CREDENTIAL SQLCredential WITH IDENTITY = '<SQL_Login>', SECRET = '<SQL_Login_PASSWORD>' goVytvořte externí zdroj dat pomocí vytvoření externího zdroje dat. Následující příklad:
- Vytvoří externí zdroj dat s názvem LocalSQLOutput.
- Určuje externí zdroj dat (
LOCATION = '<vendor>://<server>[:<port>]'). v tomto příkladu odkazuje na místní instanci Azure SQL Edge. - Používá dříve vytvořené přihlašovací údaje.
CREATE EXTERNAL DATA SOURCE LocalSQLOutput WITH ( LOCATION = 'sqlserver://tcp:.,1433', CREDENTIAL = SQLCredential ) goVytvořte objekt externího streamu. Následující příklad vytvoří objekt externího datového proudu odkazující na tabulku dbo. TemperatureMeasurements v MySQLDatabase databáze.
CREATE EXTERNAL STREAM TemperatureMeasurements WITH ( DATA_SOURCE = LocalSQLOutput, LOCATION = N'MySQLDatabase.dbo.TemperatureMeasurements', INPUT_OPTIONS = N'', OUTPUT_OPTIONS = N'' );
Příklad: vytvoření objektu externího datového proudu pro Kafka
následující příklad vytvoří objekt externího datového proudu pro místní databázi v Azure SQL Edge. V tomto příkladu se předpokládá, že je server Kafka nakonfigurovaný pro anonymní přístup.
Vytvořte externí zdroj dat pomocí vytvoření externího zdroje dat. Následující příklad:
Create EXTERNAL DATA SOURCE [KafkaInput] With ( LOCATION = N'kafka://<kafka_bootstrap_server_name_ip>:<port_number>' ) GOPro vstup Kafka vytvořte externí formát souboru. Následující příklad vytvořil formát souboru JSON s komprimovaný jako gzip kompresí.
CREATE EXTERNAL FILE FORMAT JsonGzipped WITH ( FORMAT_TYPE = JSON , DATA_COMPRESSION = 'org.apache.hadoop.io.compress.GzipCodec' )Vytvořte objekt externího streamu. Následující příklad vytvoří objekt externího streamu, který odkazuje na Kafka téma
*TemperatureMeasurement*.CREATE EXTERNAL STREAM TemperatureMeasurement WITH ( DATA_SOURCE = KafkaInput, FILE_FORMAT = JsonGzipped, LOCATION = 'TemperatureMeasurement', INPUT_OPTIONS = 'PARTITIONS: 10' );
Vytvořit úlohu streamování a dotazy streamování
Pomocí sys.sp_create_streaming_job systémové uložené procedury definujte dotazy streamování a vytvořte úlohu streamování. sp_create_streaming_jobUložená procedura používá následující parametry:
job_name: Název úlohy streamování. Názvy úloh streamování jsou v rámci instance jedinečné.statement: Stream Analytics dotazovánípříkazů dotazu streamování založeného na jazyce.
Následující příklad vytvoří jednoduchou úlohu streamování s jedním dotazem pro streamování. Tento dotaz přečte vstupy z centra IoT Edge a zapisuje do dbo.TemperatureMeasurements databáze.
EXEC sys.sp_create_streaming_job @name=N'StreamingJob1',
@statement= N'Select * INTO TemperatureMeasurements from MyEdgeHubInput'
Následující příklad vytvoří složitější úlohu streamování s více různými dotazy. Tyto dotazy obsahují jednu z vestavěných AnomalyDetection_ChangePoint funkcí k identifikaci anomálií v datech o teplotě.
EXEC sys.sp_create_streaming_job @name=N'StreamingJob2', @statement=
N' Select * INTO TemperatureMeasurements1 from MyEdgeHubInput1
Select * Into TemperatureMeasurements2 from MyEdgeHubInput2
Select * Into TemperatureMeasurements3 from MyEdgeHubInput3
SELECT
Timestamp as [Time],
[Temperature] As [Temperature],
GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER(LIMIT DURATION(minute, 20)), ''Score'') as ChangePointScore,
GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER(LIMIT DURATION(minute, 20)), ''IsAnomaly'') as IsChangePointAnomaly
INTO TemperatureAnomalies FROM MyEdgeHubInput2;
'
go
Spuštění, zastavení, zrušení a monitorování úloh streamování
pokud chcete spustit úlohu streamování ve službě Azure SQL Edge, spusťte sys.sp_start_streaming_job uloženou proceduru. Uložená procedura vyžaduje jako vstup název úlohy streamování, která se má spustit.
exec sys.sp_start_streaming_job @name=N'StreamingJob1'
go
Pokud chcete zastavit úlohu streamování, spusťte sys.sp_stop_streaming_job uloženou proceduru. Uložená procedura vyžaduje, aby se jako vstup zastavil název úlohy streamování.
exec sys.sp_stop_streaming_job @name=N'StreamingJob1'
go
Pokud chcete odpojit úlohu streamování (nebo ji odstranit), spusťte sys.sp_drop_streaming_job uloženou proceduru. Uložená procedura vyžaduje jako vstup název úlohy streamování, která se má vyřadit.
exec sys.sp_drop_streaming_job @name=N'StreamingJob1'
go
Chcete-li získat aktuální stav úlohy streamování, spusťte sys.sp_get_streaming_job uloženou proceduru. Uložená procedura vyžaduje jako vstup název úlohy streamování, která se má vyřadit. Vypíše název a aktuální stav úlohy streamování.
exec sys.sp_get_streaming_job @name=N'StreamingJob1'
WITH RESULT SETS
(
(
name nvarchar(256),
status nvarchar(256),
error nvarchar(256)
)
)
Úloha streamování může mít některý z následujících stavů:
| Status | Popis |
|---|---|
| Vytvořeno | Úloha streamování se vytvořila, ale ještě se nespustila. |
| Spouštění | Úloha streamování je ve fázi zahájení. |
| Období | Úloha streamování je spuštěná, ale není k dispozici žádný vstup ke zpracování. |
| Zpracování | Úloha streamování je spuštěná a zpracovává vstupy. Tento stav označuje stav v pořádku pro úlohu streamování. |
| Snížený výkon | Úloha streamování je spuštěná, ale během zpracování vstupu došlo k nějakým nezá závažná chyba. Vstupní úloha se bude dál spouštět, ale zahodí vstupy, u které dojde k chybám. |
| Zastaveno | Úloha streamování se zastavila. |
| Neúspěšný | Úloha streamování selhala. Obvykle se jedná o indikaci závažné chyby během zpracování. |