إنشاء مهمة دفق البيانات في Azure SQL Edge

هام

لم يعد Azure SQL Edge يدعم النظام الأساسي ARM64.

تشرح هذه المقالة كيفية إنشاء مهمة دفق T-SQL في Azure SQL Edge. يمكنك إنشاء كائنات إدخال وإخراج الدفق الخارجي، ثم تعريف استعلام مهمة الدفق كجزء من إنشاء مهمة الدفق.

تكوين إدخال الدفق الخارجي وعناصر الإخراج

يستخدم دفق T-SQL وظيفة مصدر البيانات الخارجية ل SQL Server لتحديد مصادر البيانات المرتبطة بمدخلات ومخرجات الدفق الخارجي لمهمة الدفق. استخدم أوامر T-SQL التالية لإنشاء إدخال دفق خارجي أو كائن إخراج:

بالإضافة إلى ذلك، إذا تم استخدام Azure SQL Edge أو SQL Server أو قاعدة بيانات Azure SQL كتدفق إخراج، فأنت بحاجة إلى CREATE DATABASE SCOPED CREDENTIAL (Transact-SQL). يعرف أمر T-SQL هذا بيانات الاعتماد للوصول إلى قاعدة البيانات.

مصادر بيانات دفق الإدخال والإخراج المدعومة

يدعم Azure SQL Edge حاليا مصادر البيانات التالية فقط كإدخالات ومخرجات دفق.

نوع مصدر البيانات الإدخال الإخراج ‏‏الوصف
مركز Azure IoT Edge نعم نعم مصدر بيانات لقراءة وكتابة البيانات المتدفقة إلى مركز Azure IoT Edge. لمزيد من المعلومات، راجع IoT Edge Hub.
قاعدة بيانات SQL N Y اتصال مصدر البيانات لكتابة البيانات المتدفقة إلى قاعدة بيانات SQL. يمكن أن تكون قاعدة البيانات قاعدة بيانات محلية في Azure SQL Edge، أو قاعدة بيانات بعيدة في SQL Server أو قاعدة بيانات Azure SQL.
Kafka السنة N مصدر بيانات لقراءة البيانات المتدفقة من موضوع Kafka.

مثال: إنشاء كائن إدخال/إخراج دفق خارجي لمركز Azure IoT Edge

ينشئ المثال التالي كائن دفق خارجي لمركز Azure IoT Edge. لإنشاء مصدر بيانات إدخال/إخراج دفق خارجي لمركز Azure IoT Edge، تحتاج أولا إلى إنشاء تنسيق ملف خارجي لتخطيط البيانات التي تتم قراءتها أو كتابتها أيضا.

  1. إنشاء تنسيق ملف خارجي من نوع JSON.

    CREATE EXTERNAL FILE format InputFileFormat
    WITH (FORMAT_TYPE = JSON);
    GO
    
  2. إنشاء مصدر بيانات خارجي لمركز Azure IoT Edge. ينشئ البرنامج النصي T-SQL التالي اتصال مصدر بيانات بمركز IoT Edge الذي يعمل على نفس مضيف Docker مثل Azure SQL Edge.

    CREATE EXTERNAL DATA SOURCE EdgeHubInput
    WITH (LOCATION = 'edgehub://');
    GO
    
  3. إنشاء كائن الدفق الخارجي لمركز Azure IoT Edge. ينشئ البرنامج النصي T-SQL التالي كائن دفق لمركز IoT Edge. في حالة وجود كائن دفق مركز IoT Edge، تكون معلمة LOCATION هي اسم موضوع مركز IoT Edge أو القناة التي تتم قراءتها أو كتابتها.

    CREATE EXTERNAL STREAM MyTempSensors
    WITH (
         DATA_SOURCE = EdgeHubInput,
         FILE_FORMAT = InputFileFormat,
         LOCATION = N'TemperatureSensors',
         INPUT_OPTIONS = N'',
         OUTPUT_OPTIONS = N''
    );
    GO
    

مثال: إنشاء كائن دفق خارجي إلى قاعدة بيانات Azure SQL

ينشئ المثال التالي كائن دفق خارجي إلى قاعدة البيانات المحلية في Azure SQL Edge.

  1. إنشاء مفتاح رئيسي على قاعدة البيانات. هذا مطلوب لتشفير بيانات الاعتماد السرية.

    CREATE MASTER KEY ENCRYPTION BY PASSWORD = '<<Strong_Password_For_Master_Key_Encryption>>';
    
  2. إنشاء بيانات اعتماد ذات نطاق قاعدة بيانات للوصول إلى مصدر SQL Server. ينشئ المثال التالي بيانات اعتماد إلى مصدر البيانات الخارجي، مع IDENTITY = اسم المستخدم، وSECRET = كلمة المرور.

    CREATE DATABASE SCOPED CREDENTIAL SQLCredential
    WITH IDENTITY = '<SQL_Login>', SECRET = '<SQL_Login_PASSWORD>';
    GO
    
  3. إنشاء مصدر بيانات خارجي باستخدام CREATE EXTERNAL DATA SOURCE. المثال التالي:

    • إنشاء مصدر بيانات خارجي يسمى LocalSQLOutput.
    • يحدد مصدر البيانات الخارجي (LOCATION = '<vendor>://<server>[:<port>]'). في المثال، يشير إلى مثيل محلي من Azure SQL Edge.
    • يستخدم بيانات الاعتماد التي تم إنشاؤها مسبقا.
    CREATE EXTERNAL DATA SOURCE LocalSQLOutput
    WITH (
         LOCATION = 'sqlserver://tcp:.,1433',
         CREDENTIAL = SQLCredential
    );
    GO
    
  4. إنشاء كائن الدفق الخارجي. ينشئ المثال التالي كائن دفق خارجي يشير إلى جدول dbo. TemperatureMeasurements، في قاعدة البيانات MySQLDatabase.

    CREATE EXTERNAL STREAM TemperatureMeasurements
    WITH
    (
        DATA_SOURCE = LocalSQLOutput,
        LOCATION = N'MySQLDatabase.dbo.TemperatureMeasurements',
        INPUT_OPTIONS = N'',
        OUTPUT_OPTIONS = N''
    );
    

مثال: إنشاء كائن دفق خارجي ل Kafka

ينشئ المثال التالي كائن دفق خارجي إلى قاعدة البيانات المحلية في Azure SQL Edge. يفترض هذا المثال تكوين خادم kafka للوصول المجهول.

  1. إنشاء مصدر بيانات خارجي باستخدام CREATE EXTERNAL DATA SOURCE. المثال التالي:

    CREATE EXTERNAL DATA SOURCE [KafkaInput]
    WITH (LOCATION = N'kafka://<kafka_bootstrap_server_name_ip>:<port_number>');
    GO
    
  2. إنشاء تنسيق ملف خارجي لإدخال Kafka. أنشأ المثال التالي تنسيق ملف JSON مع ضغط GZipped.

    CREATE EXTERNAL FILE FORMAT JsonGzipped
    WITH (
         FORMAT_TYPE = JSON,
         DATA_COMPRESSION = 'org.apache.hadoop.io.compress.GzipCodec'
    );
    GO
    
  3. إنشاء كائن الدفق الخارجي. ينشئ المثال التالي كائن دفق خارجي يشير إلى موضوع TemperatureMeasurementKafka .

    CREATE EXTERNAL STREAM TemperatureMeasurement
    WITH
    (
        DATA_SOURCE = KafkaInput,
        FILE_FORMAT = JsonGzipped,
        LOCATION = 'TemperatureMeasurement',
        INPUT_OPTIONS = 'PARTITIONS: 10'
    );
    GO
    

إنشاء مهمة الدفق واستعلامات الدفق

sys.sp_create_streaming_job استخدم الإجراء المخزن للنظام لتعريف استعلامات الدفق وإنشاء مهمة الدفق. sp_create_streaming_job يأخذ الإجراء المخزن المعلمات التالية:

ينشئ المثال التالي مهمة دفق بسيطة مع استعلام دفق واحد. يقرأ هذا الاستعلام المدخلات من مركز IoT Edge، ويكتب في dbo.TemperatureMeasurements قاعدة البيانات.

EXEC sys.sp_create_streaming_job @name = N'StreamingJob1',
    @statement = N'Select * INTO TemperatureMeasurements from MyEdgeHubInput'

ينشئ المثال التالي مهمة دفق أكثر تعقيدا مع استعلامات مختلفة متعددة. تتضمن هذه الاستعلامات واحدا يستخدم الدالة المضمنة AnomalyDetection_ChangePoint لتحديد الحالات الشاذة في بيانات درجة الحرارة.

EXEC sys.sp_create_streaming_job @name = N'StreamingJob2',
    @statement = N'
        SELECT *
        INTO TemperatureMeasurements1
        FROM MyEdgeHubInput1

        SELECT *
        INTO TemperatureMeasurements2
        FROM MyEdgeHubInput2

        SELECT *
        INTO TemperatureMeasurements3
        FROM MyEdgeHubInput3

        SELECT timestamp AS [Time],
            [Temperature] AS [Temperature],
            GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER (LIMIT DURATION(minute, 20)), '' Score '') AS ChangePointScore,
            GetRecordPropertyValue(AnomalyDetection_ChangePoint(Temperature, 80, 1200) OVER (LIMIT DURATION(minute, 20)), '' IsAnomaly '') AS IsChangePointAnomaly
        INTO TemperatureAnomalies
        FROM MyEdgeHubInput2;
';
GO

بدء مهام الدفق وإيقافها وإفلاتها ومراقبتها

لبدء مهمة دفق في Azure SQL Edge، قم بتشغيل sys.sp_start_streaming_job الإجراء المخزن. يتطلب الإجراء المخزن اسم مهمة الدفق للبدء، كإدخل.

EXEC sys.sp_start_streaming_job @name = N'StreamingJob1';
GO

لإيقاف مهمة دفق، قم بتشغيل sys.sp_stop_streaming_job الإجراء المخزن. يتطلب الإجراء المخزن اسم مهمة الدفق لإيقافها، كإدخل.

EXEC sys.sp_stop_streaming_job @name = N'StreamingJob1';
GO

لإسقاط (أو حذف) مهمة دفق، قم بتشغيل sys.sp_drop_streaming_job الإجراء المخزن. يتطلب الإجراء المخزن اسم مهمة الدفق للإفلات، كإدخل.

EXEC sys.sp_drop_streaming_job @name = N'StreamingJob1';
GO

للحصول على الحالة الحالية لمهمة دفق، قم بتشغيل sys.sp_get_streaming_job الإجراء المخزن. يتطلب الإجراء المخزن اسم مهمة الدفق للإفلات، كإدخل. يقوم إخراج الاسم والحالة الحالية لمهمة الدفق.

EXEC sys.sp_get_streaming_job @name = N'StreamingJob1'
WITH RESULT SETS (
        (
            name NVARCHAR(256),
            status NVARCHAR(256),
            error NVARCHAR(256)
        )
    );
GO

يمكن أن تحتوي مهمة الدفق على أي من الحالات التالية:

الحالة الوصف
تم إنشاؤه تم إنشاء مهمة الدفق، ولكن لم يتم بدء تشغيلها بعد.
البدء وظيفة الدفق في مرحلة البدء.
الخمول وظيفة الدفق قيد التشغيل، ولكن لا توجد مدخلات لمعالجتها.
قيد المعالجة وظيفة الدفق قيد التشغيل، وهي تعالج المدخلات. تشير هذه الحالة إلى حالة صحية لمهمة الدفق.
متدهور وظيفة الدفق قيد التشغيل، ولكن كانت هناك بعض الأخطاء غير القاتلة أثناء معالجة الإدخال. تستمر مهمة الإدخال في التشغيل، ولكنها ستسقط المدخلات التي تواجه أخطاء.
متوقفة تم إيقاف مهمة الدفق.
‏‏فاشلة فشلت مهمة الدفق. هذا عموما مؤشر على خطأ فادح أثناء المعالجة.

إشعار

نظرا لأن مهمة الدفق يتم تنفيذها بشكل غير متزامن، فقد تواجه المهمة أخطاء في وقت التشغيل. لاستكشاف أخطاء فشل مهمة الدفق وإصلاحها، استخدم sys.sp_get_streaming_job الإجراء المخزن، أو راجع سجل Docker من حاوية Azure SQL Edge، والتي يمكن أن توفر تفاصيل الخطأ من مهمة الدفق.

الخطوات التالية