Skapa ett dataströmningsjobb i Azure SQL Edge

Den här artikeln beskriver hur du skapar ett T-SQL-jobb för direktuppspelning i Azure SQL Edge. Du skapar de externa strömindata- och utdataobjekten och definierar sedan frågan för direktuppspelningsjobbet som en del av skapandet av strömningsjobbet.

Konfigurera externa strömindata- och utdataobjekt

T-SQL-strömning använder funktionen för externa datakällor i SQL Server för att definiera de datakällor som är associerade med externa strömindata och utdata för strömningsjobbet. Använd följande T-SQL för att skapa ett externt strömindata- eller utdataobjekt:

Om Azure SQL Edge, SQL Server eller Azure SQL Database används som utdataström behöver du dessutom TRANSACT-SQL( CREATE DATABASE SCOPED CREDENTIAL). Det här T-SQL-kommandot definierar autentiseringsuppgifterna för åtkomst till databasen.

Indata- och utdatakällor som stöds

Azure SQL Edge stöder för närvarande endast följande datakällor som strömindata och utdata.

Typ av datakälla Indata Resultat Description
Azure IoT Edge hubb Y Y Datakälla för att läsa och skriva strömmande data till en Azure IoT Edge hubb. Mer information finns i IoT Edge Hub.
SQL Database N Y Anslutning till datakälla för att skriva strömmande data till SQL Database. Databasen kan vara en lokal databas i Azure SQL Edge, eller en fjärrdatabas i SQL Server eller Azure SQL Database.
Kafka Y N Datakälla för att läsa strömmande data från ett Kafka-ämne. Det här kortet är för närvarande endast tillgängligt för Intel- eller AMD-versioner av Azure SQL Edge. Den är inte tillgänglig för ARM64-versionen av Azure SQL Edge.

Exempel: Skapa ett externt indata-/utdataflödesobjekt för Azure IoT Edge hubb

I följande exempel skapas ett externt strömobjekt för Azure IoT Edge hubb. Om du vill skapa en extern strömdatakälla för indata/utdata för Azure IoT Edge hub måste du först skapa ett externt filformat för layouten för de data som läses eller skrivs också.

  1. Skapa ett externt filformat av typen JSON.

    Create External file format InputFileFormat
    WITH 
    (  
       format_type = JSON,
    )
    go
    
  2. Skapa en extern datakälla för Azure IoT Edge hubb. Följande T-SQL-skript skapar en datakällsanslutning till en IoT Edge-hubb som körs på samma Docker-värd som Azure SQL Edge.

    CREATE EXTERNAL DATA SOURCE EdgeHubInput 
    WITH 
    (
        LOCATION = 'edgehub://'
    )
    go
    
  3. Skapa det externa strömobjektet för Azure IoT Edge hubb. Följande T-SQL-skript skapar ett strömobjekt för den IoT Edge hubben. Om ett IoT Edge-hubbströmobjekt är parametern LOCATION namnet på det IoT Edge hub-ämne eller kanal som läses eller skrivs till.

    CREATE EXTERNAL STREAM MyTempSensors 
    WITH 
    (
        DATA_SOURCE = EdgeHubInput,
        FILE_FORMAT = InputFileFormat,
        LOCATION = N'TemperatureSensors',
        INPUT_OPTIONS = N'',
        OUTPUT_OPTIONS = N''
    );
    go
    

Exempel: Skapa ett externt strömobjekt som ska Azure SQL Database

I följande exempel skapas ett externt strömobjekt till den lokala databasen i Azure SQL Edge.

  1. Skapa en huvudnyckel i databasen. Detta krävs för att kryptera autentiseringshemligheten.

    CREATE MASTER KEY ENCRYPTION BY PASSWORD = '<<Strong_Password_For_Master_Key_Encryption>>';
    
  2. Skapa en databasomfattning för åtkomst till SQL Server källan. I följande exempel skapas en autentiseringsuppgifterna till den externa datakällan med IDENTITY = username och SECRET = password.

    CREATE DATABASE SCOPED CREDENTIAL SQLCredential
    WITH IDENTITY = '<SQL_Login>', SECRET = '<SQL_Login_PASSWORD>'
    go
    
  3. Skapa en extern datakälla med CREATE EXTERNAL DATA SOURCE( SKAPA EXTERN DATAKÄLLA). Följande exempel:

    • Skapar en extern datakälla med namnet LocalSQLOutput.
    • Identifierar den externa datakällan ( LOCATION = '<vendor>://<server>[:<port>]' ). I det här exemplet pekar det på en lokal instans av Azure SQL Edge.
    • Använder de autentiseringsuppgifter som skapades tidigare.
    CREATE EXTERNAL DATA SOURCE LocalSQLOutput 
    WITH 
    (
        LOCATION = 'sqlserver://tcp:.,1433',
        CREDENTIAL = SQLCredential
    )
    go
    
  4. Skapa det externa strömobjektet. I följande exempel skapas ett externt strömobjekt som pekar på en tabell-dbo. TemperatureMeasurements, i databasen MySQLDatabase.

    CREATE EXTERNAL STREAM TemperatureMeasurements 
    WITH 
    (
        DATA_SOURCE = LocalSQLOutput,
        LOCATION = N'MySQLDatabase.dbo.TemperatureMeasurements',
        INPUT_OPTIONS = N'',
        OUTPUT_OPTIONS = N''
    );
    

Exempel: Skapa ett externt strömobjekt för Kafka

I följande exempel skapas ett externt strömobjekt till den lokala databasen i Azure SQL Edge. Det här exemplet förutsätter att kafka-servern är konfigurerad för anonym åtkomst.

  1. Skapa en extern datakälla med CREATE EXTERNAL DATA SOURCE( SKAPA EXTERN DATAKÄLLA). Följande exempel:

    Create EXTERNAL DATA SOURCE [KafkaInput] 
    With
    (
        LOCATION = N'kafka://<kafka_bootstrap_server_name_ip>:<port_number>'
    )
    GO
    
  2. Skapa ett externt filformat för kafka-indata. I följande exempel skapades ett JSON-filformat med GZipped Compression.

    CREATE EXTERNAL FILE FORMAT JsonGzipped  
     WITH 
     (  
         FORMAT_TYPE = JSON , 
         DATA_COMPRESSION = 'org.apache.hadoop.io.compress.GzipCodec' 
     )
    
  3. Skapa det externa strömobjektet. I följande exempel skapas ett externt strömobjekt som pekar på Kafka-ämnet *TemperatureMeasurement* .

    CREATE EXTERNAL STREAM TemperatureMeasurement 
    WITH 
    (  
        DATA_SOURCE = KafkaInput, 
        FILE_FORMAT = JsonGzipped,
        LOCATION = 'TemperatureMeasurement',
        INPUT_OPTIONS = 'PARTITIONS: 10' 
    ); 
    

Skapa strömningsjobbet och strömningsfrågorna

Använd den sys.sp_create_streaming_job system lagrade proceduren för att definiera strömningsfrågor och skapa strömningsjobbet. Den sp_create_streaming_job lagrade proceduren tar följande parametrar:

I följande exempel skapas ett enkelt strömningsjobb med en strömningsfråga. Den här frågan läser indata från IoT Edge och skriver till dbo.TemperatureMeasurements i databasen.

EXEC sys.sp_create_streaming_job @name=N'StreamingJob1',
@statement= N'Select * INTO TemperatureMeasurements from MyEdgeHubInput'

I följande exempel skapas ett mer komplext strömningsjobb med flera olika frågor. Dessa frågor innehåller en som använder den inbyggda funktionen AnomalyDetection_ChangePoint för att identifiera avvikelser i temperaturdata.

EXEC sys.sp_create_streaming_job @name=N'StreamingJob2', @statement=
N' Select * INTO TemperatureMeasurements1 from MyEdgeHubInput1

Select * Into TemperatureMeasurements2 from MyEdgeHubInput2

Select * Into TemperatureMeasurements3 from MyEdgeHubInput3

SELECT
Timestamp as [Time],
[Temperature] As [Temperature],
GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER(LIMIT DURATION(minute, 20)), ''Score'') as ChangePointScore,
GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER(LIMIT DURATION(minute, 20)), ''IsAnomaly'') as IsChangePointAnomaly
INTO TemperatureAnomalies FROM MyEdgeHubInput2;
'
go

Starta, stoppa, släppa och övervaka direktuppspelningsjobb

Om du vill starta ett strömningsjobb i Azure SQL Edge kör du den sys.sp_start_streaming_job lagrade proceduren. Den lagrade proceduren kräver att namnet på strömningsjobbet startas som indata.

exec sys.sp_start_streaming_job @name=N'StreamingJob1'
go

Om du vill stoppa ett strömningsjobb kör du den sys.sp_stop_streaming_job lagrade proceduren. Den lagrade proceduren kräver att namnet på strömningsjobbet stoppas som indata.

exec sys.sp_stop_streaming_job @name=N'StreamingJob1'
go

Om du vill ta bort (eller ta bort) ett strömningsjobb kör du den sys.sp_drop_streaming_job lagrade proceduren. Den lagrade proceduren kräver att namnet på strömningsjobbet ska släppas som indata.

exec sys.sp_drop_streaming_job @name=N'StreamingJob1'
go

Kör den lagrade proceduren för att hämta den aktuella statusen för ett sys.sp_get_streaming_job strömningsjobb. Den lagrade proceduren kräver att namnet på strömningsjobbet ska släppas som indata. Den matar ut namnet och den aktuella statusen för strömningsjobbet.

exec sys.sp_get_streaming_job @name=N'StreamingJob1'
        WITH RESULT SETS
(
       (
       name nvarchar(256),
       status nvarchar(256),
       error nvarchar(256)
       )
)

Strömningsjobbet kan ha någon av följande statusar:

Status Beskrivning
Skapad Strömningsjobbet har skapats, men har ännu inte startats.
Startar Strömningsjobbet är i startfasen.
Inaktiv Strömningsjobbet körs, men det finns inga indata att bearbeta.
Bearbetar Strömningsjobbet körs och bearbetar indata. Det här tillståndet anger ett felfritt tillstånd för strömningsjobbet.
Degraderad Strömningsjobbet körs, men det uppstod några oåterkalleliga fel under indatabearbetningen. Indatajobbet fortsätter att köras, men släpper indata som påträffar fel.
Stoppad Strömningsjobbet har stoppats.
Misslyckad Strömningsjobbet misslyckades. Detta är vanligtvis en indikation på ett allvarligt fel under bearbetningen.

Nästa steg