Een gegevensstreaming-taak maken in Azure SQL Edge
In dit artikel wordt uitgelegd hoe u een T-SQL-streaming-taak maakt in Azure SQL Edge. U maakt de externe stroominvoer- en uitvoerobjecten en vervolgens definieert u de streaming-taakquery als onderdeel van het maken van de streaming-taak.
De invoer- en uitvoerobjecten van de externe stroom configureren
T-SQL streaming maakt gebruik van de functionaliteit van de externe gegevensbron van SQL Server om de gegevensbronnen te definiëren die zijn gekoppeld aan de externe stroominvoer en -uitvoer van de streaming-taak. Gebruik de volgende T-SQL om een externe stroominvoer of uitvoerobject te maken:
Als Azure SQL Edge, SQL Server of Azure SQL Database wordt gebruikt als uitvoerstroom, hebt u bovendien DE REFERENTIE VOOR CREATE DATABASE SCOPED (Transact-SQL) nodig. Met deze T-SQL definieert u de referenties voor toegang tot de database.
Ondersteunde gegevensbronnen voor invoer- en uitvoerstroom
Azure SQL Edge ondersteunt momenteel alleen de volgende gegevensbronnen als stroominvoer en -uitvoer.
| Gegevensbrontype | Invoer | Uitvoer | Description |
|---|---|---|---|
| Azure IoT Edge hub | J | J | Gegevensbron voor het lezen en schrijven van streaminggegevens naar Azure IoT Edge hub. Zie voor meer informatie IoT Edge Hub. |
| SQL Database | N | J | Gegevensbronverbinding voor het schrijven van streaminggegevens naar SQL Database. De database kan een lokale database zijn in Azure SQL Edge of een externe database in SQL Server of Azure SQL Database. |
| Kafka | J | N | Gegevensbron voor het lezen van streaminggegevens uit een Kafka-onderwerp. Deze adapter is momenteel alleen beschikbaar voor Intel- of AMD-versies van Azure SQL Edge. Het is niet beschikbaar voor de ARM64-versie van Azure SQL Edge. |
Voorbeeld: Een invoer-/uitvoerobject voor een externe stroom maken voor Azure IoT Edge hub
In het volgende voorbeeld wordt een extern streamobject gemaakt voor Azure IoT Edge hub. Als u een gegevensbron voor invoer/uitvoer van een externe stroom wilt maken voor Azure IoT Edge Hub, moet u eerst een externe bestandsindeling maken voor de indeling van de gegevens die worden gelezen of geschreven.
Maak een externe bestandsindeling van het type JSON.
Create External file format InputFileFormat WITH ( format_type = JSON, ) goMaak een externe gegevensbron voor Azure IoT Edge hub. Met het volgende T-SQL-script maakt u een gegevensbronverbinding met een IoT Edge-hub die wordt uitgevoerd op dezelfde Docker-host als Azure SQL Edge.
CREATE EXTERNAL DATA SOURCE EdgeHubInput WITH ( LOCATION = 'edgehub://' ) goMaak het externe streamobject voor Azure IoT Edge hub. Met het volgende T-SQL script maakt u een streamobject voor de IoT Edge hub. In het geval van IoT Edge hub stream-object, is de parameter LOCATION de naam van het IoT Edge hub-onderwerp of -kanaal dat wordt gelezen of geschreven.
CREATE EXTERNAL STREAM MyTempSensors WITH ( DATA_SOURCE = EdgeHubInput, FILE_FORMAT = InputFileFormat, LOCATION = N'TemperatureSensors', INPUT_OPTIONS = N'', OUTPUT_OPTIONS = N'' ); go
Voorbeeld: Een extern streamobject maken om een Azure SQL Database
In het volgende voorbeeld wordt een extern streamobject gemaakt naar de lokale database in Azure SQL Edge.
Maak een hoofdsleutel voor de database. Dit is vereist om het referentiegeheim te versleutelen.
CREATE MASTER KEY ENCRYPTION BY PASSWORD = '<<Strong_Password_For_Master_Key_Encryption>>';Maak een databasereferentie voor toegang tot de SQL Server bron. In het volgende voorbeeld wordt een referentie gemaakt voor de externe gegevensbron, met IDENTITY = username en SECRET = password.
CREATE DATABASE SCOPED CREDENTIAL SQLCredential WITH IDENTITY = '<SQL_Login>', SECRET = '<SQL_Login_PASSWORD>' goMaak een externe gegevensbron met CREATE EXTERNAL DATA SOURCE. Het volgende voorbeeld:
- Hiermee maakt u een externe gegevensbron met de naam LocalSQLOutput.
- Identificeert de externe gegevensbron (
LOCATION = '<vendor>://<server>[:<port>]'). In het voorbeeld wordt naar een lokaal exemplaar van Azure SQL Edge. - Maakt gebruik van de referentie die u eerder hebt gemaakt.
CREATE EXTERNAL DATA SOURCE LocalSQLOutput WITH ( LOCATION = 'sqlserver://tcp:.,1433', CREDENTIAL = SQLCredential ) goMaak het externe streamobject. In het volgende voorbeeld wordt een extern streamobject gemaakt dat verwijst naar een tabel-dbo. TemperatureMeasurements in de database MySQLDatabase.
CREATE EXTERNAL STREAM TemperatureMeasurements WITH ( DATA_SOURCE = LocalSQLOutput, LOCATION = N'MySQLDatabase.dbo.TemperatureMeasurements', INPUT_OPTIONS = N'', OUTPUT_OPTIONS = N'' );
Voorbeeld: Een extern streamobject maken voor Kafka
In het volgende voorbeeld wordt een extern streamobject gemaakt naar de lokale database in Azure SQL Edge. In dit voorbeeld wordt ervan uitgenomen dat de Kafka-server is geconfigureerd voor anonieme toegang.
Maak een externe gegevensbron met CREATE EXTERNAL DATA SOURCE. Het volgende voorbeeld:
Create EXTERNAL DATA SOURCE [KafkaInput] With ( LOCATION = N'kafka://<kafka_bootstrap_server_name_ip>:<port_number>' ) GOMaak een externe bestandsindeling voor de kafka-invoer. In het volgende voorbeeld is een JSON-bestandsindeling gemaakt met GZipped-compressie.
CREATE EXTERNAL FILE FORMAT JsonGzipped WITH ( FORMAT_TYPE = JSON , DATA_COMPRESSION = 'org.apache.hadoop.io.compress.GzipCodec' )Maak het externe streamobject. In het volgende voorbeeld wordt een extern streamobject gemaakt dat verwijst naar het Kafka-onderwerp
*TemperatureMeasurement*.CREATE EXTERNAL STREAM TemperatureMeasurement WITH ( DATA_SOURCE = KafkaInput, FILE_FORMAT = JsonGzipped, LOCATION = 'TemperatureMeasurement', INPUT_OPTIONS = 'PARTITIONS: 10' );
De streaming-taak en de streamingquery's maken
Gebruik de sys.sp_create_streaming_job opgeslagen systeemprocedure om de streamingquery's te definiëren en de streaming-taak te maken. De sp_create_streaming_job opgeslagen procedure heeft de volgende parameters:
job_name: de naam van de streaming-taak. Namen van streaming-taak zijn uniek binnen het exemplaar.statement: Stream Analytics querytaal op basisvan streamingquery-instructies.
In het volgende voorbeeld wordt een eenvoudige streaming-taak gemaakt met één streamingquery. Deze query leest de invoer van de IoT Edge hub en schrijft naar dbo.TemperatureMeasurements in de database.
EXEC sys.sp_create_streaming_job @name=N'StreamingJob1',
@statement= N'Select * INTO TemperatureMeasurements from MyEdgeHubInput'
In het volgende voorbeeld wordt een complexere streaming-taak gemaakt met meerdere verschillende query's. Deze query's bevatten een query die gebruikmaakt van de ingebouwde functie om afwijkingen in de AnomalyDetection_ChangePoint temperatuurgegevens te identificeren.
EXEC sys.sp_create_streaming_job @name=N'StreamingJob2', @statement=
N' Select * INTO TemperatureMeasurements1 from MyEdgeHubInput1
Select * Into TemperatureMeasurements2 from MyEdgeHubInput2
Select * Into TemperatureMeasurements3 from MyEdgeHubInput3
SELECT
Timestamp as [Time],
[Temperature] As [Temperature],
GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER(LIMIT DURATION(minute, 20)), ''Score'') as ChangePointScore,
GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER(LIMIT DURATION(minute, 20)), ''IsAnomaly'') as IsChangePointAnomaly
INTO TemperatureAnomalies FROM MyEdgeHubInput2;
'
go
Streamingtaken starten, stoppen, neerzetten en bewaken
Als u een streaming-taak wilt starten in Azure SQL Edge, moet u de sys.sp_start_streaming_job opgeslagen procedure uitvoeren. Voor de opgeslagen procedure moet de naam van de streaming-taak worden gestart als invoer.
exec sys.sp_start_streaming_job @name=N'StreamingJob1'
go
Voer de opgeslagen procedure uit om een sys.sp_stop_streaming_job streaming-taak te stoppen. Voor de opgeslagen procedure moet de naam van de streaming-taak worden gestopt als invoer.
exec sys.sp_stop_streaming_job @name=N'StreamingJob1'
go
Als u een streaming-taak wilt verwijderen (of verwijderen), moet u de sys.sp_drop_streaming_job opgeslagen procedure uitvoeren. Voor de opgeslagen procedure moet de naam van de streaming-taak als invoer worden verwijderd.
exec sys.sp_drop_streaming_job @name=N'StreamingJob1'
go
Voer de opgeslagen procedure uit om de huidige status van een streaming-taak sys.sp_get_streaming_job op te halen. Voor de opgeslagen procedure moet de naam van de streaming-taak als invoer worden verwijderd. De uitvoer is de naam en de huidige status van de streaming-taak.
exec sys.sp_get_streaming_job @name=N'StreamingJob1'
WITH RESULT SETS
(
(
name nvarchar(256),
status nvarchar(256),
error nvarchar(256)
)
)
De streaming-taak kan een van de volgende statussen hebben:
| Status | Beschrijving |
|---|---|
| Gemaakt | De streaming-taak is gemaakt, maar is nog niet gestart. |
| Starten | De streaming-taak is in de beginfase. |
| Niet-actief | De streaming-taak wordt uitgevoerd, maar er is geen invoer om te verwerken. |
| Wordt verwerkt | De streaming-taak wordt uitgevoerd en verwerkt invoer. Deze status geeft een goede status aan voor de streaming-taak. |
| Verminderd beschikbaar | De streaming-taak wordt uitgevoerd, maar er zijn enkele niet-fatale fouten tijdens de invoerverwerking. De invoer-taak blijft worden uitgevoerd, maar invoer die fouten ondervindt, wordt wegvallen. |
| Gestopt | De streaming-taak is gestopt. |
| Mislukt | De streaming-taak is mislukt. Dit is in het algemeen een indicatie van een fatale fout tijdens de verwerking. |