Azure SQL Edge 'de veri akışı işi oluşturma

bu makalede, Azure SQL Edge 'de T-SQL akış işi oluşturma işlemi açıklanmaktadır. Dış akış giriş ve çıkış nesnelerini oluşturup akış işi oluşturma işlemini akış işi oluşturma işleminin bir parçası olarak tanımlarsınız.

Dış akış giriş ve çıkış nesnelerini yapılandırma

T-SQL akışı, akış işinin dış akış girişleri ve çıkışları ile ilişkili veri kaynaklarını tanımlamak için SQL Server dış veri kaynağı işlevselliğini kullanır. dış akış girişi veya çıkış nesnesi oluşturmak için aşağıdaki T-SQL komutlarını kullanın:

ayrıca, Azure SQL Edge, SQL Server veya Azure SQL Veritabanı bir çıkış akışı olarak kullanılıyorsa, veritabanı kapsamlı kimlik bilgileri (Transact-SQL) oluşturmagerekir. bu T-SQL komutu, veritabanına erişmek için kimlik bilgilerini tanımlar.

Desteklenen giriş ve çıkış akışı veri kaynakları

Azure SQL Edge şu anda yalnızca akış girişleri ve çıkışları olarak aşağıdaki veri kaynaklarını destekler.

Veri kaynağı türü Girdi Çıktı Description
Azure IoT Edge hub 'ı E E Bir Azure IoT Edge hub 'ına akış verilerini okumak ve yazmak için veri kaynağı. Daha fazla bilgi için bkz. IoT Edge hub.
SQL Veritabanı N E akış verilerini SQL Veritabanı yazmak için veri kaynağı bağlantısı. veritabanı, Azure SQL Edge 'de bir yerel veritabanı veya SQL Server veya Azure SQL Veritabanı uzak bir veritabanı olabilir.
Kafka E N Bir Kafka konusunun akış verilerini okumak için veri kaynağı. bu bağdaştırıcı şu anda yalnızca Azure SQL Edge 'in ıntel veya AMD sürümlerinde kullanılabilir. Azure SQL Edge 'in ARM64 sürümünde kullanılamaz.

Örnek: Azure IoT Edge hub 'ı için dış akış giriş/çıkış nesnesi oluşturma

Aşağıdaki örnek Azure IoT Edge Hub için bir dış akış nesnesi oluşturur. Azure IoT Edge Hub için bir dış akış giriş/çıkış veri kaynağı oluşturmak için, önce okunan veya yazılan verilerin düzeni için bir dış dosya biçimi oluşturmanız gerekir.

  1. JSON türünde bir dış dosya biçimi oluşturun.

    Create External file format InputFileFormat
    WITH 
    (  
       format_type = JSON,
    )
    go
    
  2. Azure IoT Edge hub 'ı için bir dış veri kaynağı oluşturun. aşağıdaki T-SQL betiği, Azure SQL Edge ile aynı docker ana bilgisayarında çalışan bir IoT Edge hub 'ına bir veri kaynağı bağlantısı oluşturur.

    CREATE EXTERNAL DATA SOURCE EdgeHubInput 
    WITH 
    (
        LOCATION = 'edgehub://'
    )
    go
    
  3. Azure IoT Edge Hub için dış akış nesnesi oluşturun. aşağıdaki T-SQL betiği IoT Edge hub 'ı için bir stream nesnesi oluşturur. Bir IoT Edge hub akış nesnesi söz konusu olduğunda, konum parametresi, okunan veya yazılan kanalın IoT Edge hub konusunun adıdır.

    CREATE EXTERNAL STREAM MyTempSensors 
    WITH 
    (
        DATA_SOURCE = EdgeHubInput,
        FILE_FORMAT = InputFileFormat,
        LOCATION = N'TemperatureSensors',
        INPUT_OPTIONS = N'',
        OUTPUT_OPTIONS = N''
    );
    go
    

örnek: Azure SQL Veritabanı için bir dış akış nesnesi oluşturun

aşağıdaki örnek, Azure SQL Edge 'de yerel veritabanında bir dış akış nesnesi oluşturur.

  1. Veritabanında bir ana anahtar oluşturun. Kimlik bilgisi gizliliğini şifrelemek için bu gereklidir.

    CREATE MASTER KEY ENCRYPTION BY PASSWORD = '<<Strong_Password_For_Master_Key_Encryption>>';
    
  2. SQL Server kaynağına erişmek için veritabanı kapsamlı kimlik bilgileri oluşturun. Aşağıdaki örnek, KIMLIK = username ve SECRET = Password ile dış veri kaynağına ilişkin bir kimlik bilgisi oluşturur.

    CREATE DATABASE SCOPED CREDENTIAL SQLCredential
    WITH IDENTITY = '<SQL_Login>', SECRET = '<SQL_Login_PASSWORD>'
    go
    
  3. Dış VERI kaynağı oluştur ile bir dış veri kaynağı oluşturun. Aşağıdaki örnek:

    • LocalSQLOutput adlı bir dış veri kaynağı oluşturur.
    • Dış veri kaynağını () tanımlar LOCATION = '<vendor>://<server>[:<port>]' . örnekte, Azure SQL Edge 'in yerel bir örneğini işaret eder.
    • Daha önce oluşturulan kimlik bilgisini kullanır.
    CREATE EXTERNAL DATA SOURCE LocalSQLOutput 
    WITH 
    (
        LOCATION = 'sqlserver://tcp:.,1433',
        CREDENTIAL = SQLCredential
    )
    go
    
  4. Dış akış nesnesini oluşturun. Aşağıdaki örnek, dbo tablosuna işaret eden bir dış akış nesnesi oluşturur . TemperatureMeasurements, mysqldatabase veritabanında.

    CREATE EXTERNAL STREAM TemperatureMeasurements 
    WITH 
    (
        DATA_SOURCE = LocalSQLOutput,
        LOCATION = N'MySQLDatabase.dbo.TemperatureMeasurements',
        INPUT_OPTIONS = N'',
        OUTPUT_OPTIONS = N''
    );
    

Örnek: Kafka için dış akış nesnesi oluşturma

aşağıdaki örnek, Azure SQL Edge 'de yerel veritabanında bir dış akış nesnesi oluşturur. Bu örnekte, Kafka sunucusunun anonim erişim için yapılandırıldığı varsayılır.

  1. Dış VERI kaynağı oluştur ile bir dış veri kaynağı oluşturun. Aşağıdaki örnek:

    Create EXTERNAL DATA SOURCE [KafkaInput] 
    With
    (
        LOCATION = N'kafka://<kafka_bootstrap_server_name_ip>:<port_number>'
    )
    GO
    
  2. Kafka girişi için bir dış dosya biçimi oluşturun. Aşağıdaki örnek, Gdaraltılmış sıkıştırmaya sahip bir JSON dosya biçimi oluşturdu.

    CREATE EXTERNAL FILE FORMAT JsonGzipped  
     WITH 
     (  
         FORMAT_TYPE = JSON , 
         DATA_COMPRESSION = 'org.apache.hadoop.io.compress.GzipCodec' 
     )
    
  3. Dış akış nesnesini oluşturun. Aşağıdaki örnek, Kafka konusuna işaret eden bir dış akış nesnesi oluşturur *TemperatureMeasurement* .

    CREATE EXTERNAL STREAM TemperatureMeasurement 
    WITH 
    (  
        DATA_SOURCE = KafkaInput, 
        FILE_FORMAT = JsonGzipped,
        LOCATION = 'TemperatureMeasurement',
        INPUT_OPTIONS = 'PARTITIONS: 10' 
    ); 
    

Akış işini ve akış sorgularını oluşturma

sys.sp_create_streaming_jobAkış sorgularını tanımlamak ve akış işini oluşturmak için sistem saklı yordamını kullanın. sp_create_streaming_jobSaklı yordam aşağıdaki parametreleri alır:

  • job_name: Akış işinin adı. Akış işi adları, örnek genelinde benzersizdir.
  • statement: Stream Analytics sorgu Diltabanlı akış sorgu deyimleri.

Aşağıdaki örnek, bir akış sorgusuyla basit bir akış işi oluşturur. Bu sorgu IoT Edge hub 'ından girdileri okur ve dbo.TemperatureMeasurements veritabanına yazar.

EXEC sys.sp_create_streaming_job @name=N'StreamingJob1',
@statement= N'Select * INTO TemperatureMeasurements from MyEdgeHubInput'

Aşağıdaki örnek birden çok farklı sorguya sahip daha karmaşık bir akış işi oluşturur. Bu sorgular AnomalyDetection_ChangePoint , sıcaklık verilerinde bozukluklar belirlemek için yerleşik işlevi kullanan bir tane içerir.

EXEC sys.sp_create_streaming_job @name=N'StreamingJob2', @statement=
N' Select * INTO TemperatureMeasurements1 from MyEdgeHubInput1

Select * Into TemperatureMeasurements2 from MyEdgeHubInput2

Select * Into TemperatureMeasurements3 from MyEdgeHubInput3

SELECT
Timestamp as [Time],
[Temperature] As [Temperature],
GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER(LIMIT DURATION(minute, 20)), ''Score'') as ChangePointScore,
GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER(LIMIT DURATION(minute, 20)), ''IsAnomaly'') as IsChangePointAnomaly
INTO TemperatureAnomalies FROM MyEdgeHubInput2;
'
go

Akış işlerini başlatma, durdurma, bırakma ve izleme

Azure SQL Edge 'de bir akış işi başlatmak için sys.sp_start_streaming_job saklı yordamı çalıştırın. Saklı yordam, giriş olarak, başlangıç için akış işinin adını gerektirir.

exec sys.sp_start_streaming_job @name=N'StreamingJob1'
go

Bir akış işini durdurmak için sys.sp_stop_streaming_job saklı yordamı çalıştırın. Saklı yordam, giriş olarak, durdurulacak akış işinin adını gerektirir.

exec sys.sp_stop_streaming_job @name=N'StreamingJob1'
go

Bir akış işini bırakmak (veya silmek) için sys.sp_drop_streaming_job saklı yordamı çalıştırın. Saklı yordam, giriş olarak, akış işinin adını gerektirir.

exec sys.sp_drop_streaming_job @name=N'StreamingJob1'
go

Bir akış işinin geçerli durumunu almak için sys.sp_get_streaming_job saklı yordamı çalıştırın. Saklı yordam, giriş olarak, akış işinin adını gerektirir. Akış işinin adını ve geçerli durumunu çıktı olarak verir.

exec sys.sp_get_streaming_job @name=N'StreamingJob1'
        WITH RESULT SETS
(
       (
       name nvarchar(256),
       status nvarchar(256),
       error nvarchar(256)
       )
)

Akış işi aşağıdaki durumlardan herhangi birine sahip olabilir:

Durum Açıklama
Oluşturulan Akış işi oluşturuldu, ancak henüz başlatılmadı.
Başlatılıyor Akış işi başlangıç aşamasındadır.
Boş Akış işi çalışıyor, ancak işlenecek giriş yok.
İşleniyor Akış işi çalışıyor ve girişleri işliyor. Bu durum, akış işi için sağlıklı bir durumu gösterir.
Düzeyi düşürüldü Akış işi çalışıyor, ancak giriş işleme sırasında bazı önemli olmayan hatalar vardı. Giriş işi çalışmaya devam eder, ancak hatalarla karşılaşan girişleri bıraktır.
Durduruldu Akış işi durduruldu.
Başarısız Akış işi başarısız oldu. Bu genellikle işleme sırasında önemli bir hatanın göstergesidir.

Sonraki adımlar