Een gegevensstreaming-taak maken in Azure SQL Edge

In dit artikel wordt uitgelegd hoe u een T-SQL-streaming-taak maakt in Azure SQL Edge. U maakt de externe stroominvoer- en uitvoerobjecten en vervolgens definieert u de streaming-taakquery als onderdeel van het maken van de streaming-taak.

De invoer- en uitvoerobjecten van de externe stroom configureren

T-SQL streaming maakt gebruik van de functionaliteit van de externe gegevensbron van SQL Server om de gegevensbronnen te definiëren die zijn gekoppeld aan de externe stroominvoer en -uitvoer van de streaming-taak. Gebruik de volgende T-SQL om een externe stroominvoer of uitvoerobject te maken:

Als Azure SQL Edge, SQL Server of Azure SQL Database wordt gebruikt als uitvoerstroom, hebt u bovendien DE REFERENTIE VOOR CREATE DATABASE SCOPED (Transact-SQL) nodig. Met deze T-SQL definieert u de referenties voor toegang tot de database.

Ondersteunde gegevensbronnen voor invoer- en uitvoerstroom

Azure SQL Edge ondersteunt momenteel alleen de volgende gegevensbronnen als stroominvoer en -uitvoer.

Gegevensbrontype Invoer Uitvoer Description
Azure IoT Edge hub J J Gegevensbron voor het lezen en schrijven van streaminggegevens naar Azure IoT Edge hub. Zie voor meer informatie IoT Edge Hub.
SQL Database N J Gegevensbronverbinding voor het schrijven van streaminggegevens naar SQL Database. De database kan een lokale database zijn in Azure SQL Edge of een externe database in SQL Server of Azure SQL Database.
Kafka J N Gegevensbron voor het lezen van streaminggegevens uit een Kafka-onderwerp. Deze adapter is momenteel alleen beschikbaar voor Intel- of AMD-versies van Azure SQL Edge. Het is niet beschikbaar voor de ARM64-versie van Azure SQL Edge.

Voorbeeld: Een invoer-/uitvoerobject voor een externe stroom maken voor Azure IoT Edge hub

In het volgende voorbeeld wordt een extern streamobject gemaakt voor Azure IoT Edge hub. Als u een gegevensbron voor invoer/uitvoer van een externe stroom wilt maken voor Azure IoT Edge Hub, moet u eerst een externe bestandsindeling maken voor de indeling van de gegevens die worden gelezen of geschreven.

  1. Maak een externe bestandsindeling van het type JSON.

    Create External file format InputFileFormat
    WITH 
    (  
       format_type = JSON,
    )
    go
    
  2. Maak een externe gegevensbron voor Azure IoT Edge hub. Met het volgende T-SQL-script maakt u een gegevensbronverbinding met een IoT Edge-hub die wordt uitgevoerd op dezelfde Docker-host als Azure SQL Edge.

    CREATE EXTERNAL DATA SOURCE EdgeHubInput 
    WITH 
    (
        LOCATION = 'edgehub://'
    )
    go
    
  3. Maak het externe streamobject voor Azure IoT Edge hub. Met het volgende T-SQL script maakt u een streamobject voor de IoT Edge hub. In het geval van IoT Edge hub stream-object, is de parameter LOCATION de naam van het IoT Edge hub-onderwerp of -kanaal dat wordt gelezen of geschreven.

    CREATE EXTERNAL STREAM MyTempSensors 
    WITH 
    (
        DATA_SOURCE = EdgeHubInput,
        FILE_FORMAT = InputFileFormat,
        LOCATION = N'TemperatureSensors',
        INPUT_OPTIONS = N'',
        OUTPUT_OPTIONS = N''
    );
    go
    

Voorbeeld: Een extern streamobject maken om een Azure SQL Database

In het volgende voorbeeld wordt een extern streamobject gemaakt naar de lokale database in Azure SQL Edge.

  1. Maak een hoofdsleutel voor de database. Dit is vereist om het referentiegeheim te versleutelen.

    CREATE MASTER KEY ENCRYPTION BY PASSWORD = '<<Strong_Password_For_Master_Key_Encryption>>';
    
  2. Maak een databasereferentie voor toegang tot de SQL Server bron. In het volgende voorbeeld wordt een referentie gemaakt voor de externe gegevensbron, met IDENTITY = username en SECRET = password.

    CREATE DATABASE SCOPED CREDENTIAL SQLCredential
    WITH IDENTITY = '<SQL_Login>', SECRET = '<SQL_Login_PASSWORD>'
    go
    
  3. Maak een externe gegevensbron met CREATE EXTERNAL DATA SOURCE. Het volgende voorbeeld:

    • Hiermee maakt u een externe gegevensbron met de naam LocalSQLOutput.
    • Identificeert de externe gegevensbron ( LOCATION = '<vendor>://<server>[:<port>]' ). In het voorbeeld wordt naar een lokaal exemplaar van Azure SQL Edge.
    • Maakt gebruik van de referentie die u eerder hebt gemaakt.
    CREATE EXTERNAL DATA SOURCE LocalSQLOutput 
    WITH 
    (
        LOCATION = 'sqlserver://tcp:.,1433',
        CREDENTIAL = SQLCredential
    )
    go
    
  4. Maak het externe streamobject. In het volgende voorbeeld wordt een extern streamobject gemaakt dat verwijst naar een tabel-dbo. TemperatureMeasurements in de database MySQLDatabase.

    CREATE EXTERNAL STREAM TemperatureMeasurements 
    WITH 
    (
        DATA_SOURCE = LocalSQLOutput,
        LOCATION = N'MySQLDatabase.dbo.TemperatureMeasurements',
        INPUT_OPTIONS = N'',
        OUTPUT_OPTIONS = N''
    );
    

Voorbeeld: Een extern streamobject maken voor Kafka

In het volgende voorbeeld wordt een extern streamobject gemaakt naar de lokale database in Azure SQL Edge. In dit voorbeeld wordt ervan uitgenomen dat de Kafka-server is geconfigureerd voor anonieme toegang.

  1. Maak een externe gegevensbron met CREATE EXTERNAL DATA SOURCE. Het volgende voorbeeld:

    Create EXTERNAL DATA SOURCE [KafkaInput] 
    With
    (
        LOCATION = N'kafka://<kafka_bootstrap_server_name_ip>:<port_number>'
    )
    GO
    
  2. Maak een externe bestandsindeling voor de kafka-invoer. In het volgende voorbeeld is een JSON-bestandsindeling gemaakt met GZipped-compressie.

    CREATE EXTERNAL FILE FORMAT JsonGzipped  
     WITH 
     (  
         FORMAT_TYPE = JSON , 
         DATA_COMPRESSION = 'org.apache.hadoop.io.compress.GzipCodec' 
     )
    
  3. Maak het externe streamobject. In het volgende voorbeeld wordt een extern streamobject gemaakt dat verwijst naar het Kafka-onderwerp *TemperatureMeasurement* .

    CREATE EXTERNAL STREAM TemperatureMeasurement 
    WITH 
    (  
        DATA_SOURCE = KafkaInput, 
        FILE_FORMAT = JsonGzipped,
        LOCATION = 'TemperatureMeasurement',
        INPUT_OPTIONS = 'PARTITIONS: 10' 
    ); 
    

De streaming-taak en de streamingquery's maken

Gebruik de sys.sp_create_streaming_job opgeslagen systeemprocedure om de streamingquery's te definiëren en de streaming-taak te maken. De sp_create_streaming_job opgeslagen procedure heeft de volgende parameters:

  • job_name: de naam van de streaming-taak. Namen van streaming-taak zijn uniek binnen het exemplaar.
  • statement: Stream Analytics querytaal op basisvan streamingquery-instructies.

In het volgende voorbeeld wordt een eenvoudige streaming-taak gemaakt met één streamingquery. Deze query leest de invoer van de IoT Edge hub en schrijft naar dbo.TemperatureMeasurements in de database.

EXEC sys.sp_create_streaming_job @name=N'StreamingJob1',
@statement= N'Select * INTO TemperatureMeasurements from MyEdgeHubInput'

In het volgende voorbeeld wordt een complexere streaming-taak gemaakt met meerdere verschillende query's. Deze query's bevatten een query die gebruikmaakt van de ingebouwde functie om afwijkingen in de AnomalyDetection_ChangePoint temperatuurgegevens te identificeren.

EXEC sys.sp_create_streaming_job @name=N'StreamingJob2', @statement=
N' Select * INTO TemperatureMeasurements1 from MyEdgeHubInput1

Select * Into TemperatureMeasurements2 from MyEdgeHubInput2

Select * Into TemperatureMeasurements3 from MyEdgeHubInput3

SELECT
Timestamp as [Time],
[Temperature] As [Temperature],
GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER(LIMIT DURATION(minute, 20)), ''Score'') as ChangePointScore,
GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER(LIMIT DURATION(minute, 20)), ''IsAnomaly'') as IsChangePointAnomaly
INTO TemperatureAnomalies FROM MyEdgeHubInput2;
'
go

Streamingtaken starten, stoppen, neerzetten en bewaken

Als u een streaming-taak wilt starten in Azure SQL Edge, moet u de sys.sp_start_streaming_job opgeslagen procedure uitvoeren. Voor de opgeslagen procedure moet de naam van de streaming-taak worden gestart als invoer.

exec sys.sp_start_streaming_job @name=N'StreamingJob1'
go

Voer de opgeslagen procedure uit om een sys.sp_stop_streaming_job streaming-taak te stoppen. Voor de opgeslagen procedure moet de naam van de streaming-taak worden gestopt als invoer.

exec sys.sp_stop_streaming_job @name=N'StreamingJob1'
go

Als u een streaming-taak wilt verwijderen (of verwijderen), moet u de sys.sp_drop_streaming_job opgeslagen procedure uitvoeren. Voor de opgeslagen procedure moet de naam van de streaming-taak als invoer worden verwijderd.

exec sys.sp_drop_streaming_job @name=N'StreamingJob1'
go

Voer de opgeslagen procedure uit om de huidige status van een streaming-taak sys.sp_get_streaming_job op te halen. Voor de opgeslagen procedure moet de naam van de streaming-taak als invoer worden verwijderd. De uitvoer is de naam en de huidige status van de streaming-taak.

exec sys.sp_get_streaming_job @name=N'StreamingJob1'
        WITH RESULT SETS
(
       (
       name nvarchar(256),
       status nvarchar(256),
       error nvarchar(256)
       )
)

De streaming-taak kan een van de volgende statussen hebben:

Status Beschrijving
Gemaakt De streaming-taak is gemaakt, maar is nog niet gestart.
Starten De streaming-taak is in de beginfase.
Niet-actief De streaming-taak wordt uitgevoerd, maar er is geen invoer om te verwerken.
Wordt verwerkt De streaming-taak wordt uitgevoerd en verwerkt invoer. Deze status geeft een goede status aan voor de streaming-taak.
Verminderd beschikbaar De streaming-taak wordt uitgevoerd, maar er zijn enkele niet-fatale fouten tijdens de invoerverwerking. De invoer-taak blijft worden uitgevoerd, maar invoer die fouten ondervindt, wordt wegvallen.
Gestopt De streaming-taak is gestopt.
Mislukt De streaming-taak is mislukt. Dit is in het algemeen een indicatie van een fatale fout tijdens de verwerking.

Volgende stappen