قراءة الإدخال بأي تنسيق باستخدام أجهزة إلغاء التسلسل المخصصة .NET (معاينة)

تسمح أجهزة إلغاء التسلسلات المخصصة .NET لمهمة Azure Stream Analytics بقراءة البيانات من تنسيقات خارج تنسيقات البيانات الثلاثة المضمنة. توضح هذه المقالة تنسيق التسلسل والواجهات التي تحدد محددات التسلسلات المخصصة .NET لوظائف السحابة والحافة في Azure Stream Analytics. هناك أيضا أمثلة على deserializers لتنسيق المخزن المؤقت للبروتوكول وتنسيق CSV.

.NET مخصص deserializer

نماذج التعليمات البرمجية التالية هي الواجهات التي تحدد deserializer المخصص وتنفيذه StreamDeserializer<T>.

UserDefinedOperator هي الفئة الأساسية لجميع مشغلي البث المخصصين. يقوم بتهيئة StreamingContext، والذي يوفر السياق الذي يتضمن آلية لنشر التشخيصات التي ستحتاج إلى تصحيح أي مشكلات في جهاز إلغاء التسلسل الخاص بك.

    public abstract class UserDefinedOperator
    {
        public abstract void Initialize(StreamingContext streamingContext);
    }

مقتطف التعليمات البرمجية التالي هو إلغاء التسلسل لدفق البيانات.

يجب أن تنبعث الأخطاء القابلة للتخطي باستخدام IStreamingDiagnostics طريقة التهيئة التي تم تمريرها UserDefinedOperator. سيتم التعامل مع جميع الاستثناءات على أنها أخطاء وسيتم إعادة إنشاء جهاز إلغاء التسلسل. بعد عدد معين من الأخطاء ، ستنتقل المهمة إلى حالة فشل.

StreamDeserializer<T> يزيل تسلسل دفق إلى كائن من النوع T. يجب استيفاء الشروط التالية:

  1. T هي فئة أو هيكل.
  2. جميع المجالات العامة في T إما
    1. واحدة من [sbyte ، byte ، short ، ushort ، int ، uint ، long ، DateTime ، string ، float ، double] أو ما يعادلها القابلة للبطلان.
    2. هيكل أو فئة أخرى تتبع نفس القواعد.
    3. صفيف من النوع T2 يتبع نفس القواعد.
    4. IListT2 حيث يتبع T2 نفس القواعد.
    5. ليس لديه أي أنواع متكررة.

المعلمة stream هي الدفق الذي يحتوي على الكائن المتسلسل. Deserialize إرجاع مجموعة من المثيلات T .

    public abstract class StreamDeserializer<T> : UserDefinedOperator
    {
        public abstract IEnumerable<T> Deserialize(Stream stream);
    }

StreamingContext يوفر السياق الذي يتضمن آلية لنشر التشخيصات لمشغل المستخدم.

    public abstract class StreamingContext
    {
        public abstract StreamingDiagnostics Diagnostics { get; }
    }

StreamingDiagnostics هو التشخيص للمشغلين المعرف من قبل المستخدم بما في ذلك المتسلسل و deserializer والوظائف المعرفة من قبل المستخدم.

WriteError يكتب رسالة خطأ إلى سجلات الموارد ويرسل الخطأ إلى التشخيصات.

briefMessage هي رسالة خطأ موجزة. تظهر هذه الرسالة في التشخيصات ويتم استخدامها من قبل فريق المنتج لأغراض تصحيح الأخطاء. لا تقم بتضمين معلومات حساسة، واحتفظ بالرسالة أقل من 200 حرف

detailedMessage هي رسالة خطأ مفصلة تتم إضافتها فقط إلى سجلات الموارد في وحدة التخزين الخاصة بك. يجب أن تكون هذه الرسالة أقل من 2000 حرف.

    public abstract class StreamingDiagnostics
    {
        public abstract void WriteError(string briefMessage, string detailedMessage);
    }

أمثلة على مزيلات المسلسل

يوضح لك هذا القسم كيفية كتابة deserializers مخصصة ل Protobuf و CSV. للحصول على أمثلة إضافية، مثل تنسيق AVRO لالتقاط مركز الأحداث، تفضل بزيارة Azure Stream Analytics على GitHub.

تنسيق المخزن المؤقت للبروتوكول (Protobuf)

هذا مثال باستخدام تنسيق المخزن المؤقت للبروتوكول.

افترض تعريف المخزن المؤقت للبروتوكول التالي.

syntax = "proto3";
// protoc.exe from nuget "Google.Protobuf.Tools" is used to generate .cs file from this schema definition.
// Run below command to generate the csharp class
// protoc.exe --csharp_out=. MessageBodyProto.proto

package SimulatedTemperatureSensor;
message MessageBodyProto {
    message Ambient {
      double temperature = 1;
      int64 humidity = 2;
    }

    message Machine {
      double temperature = 1;
      double pressure = 2;
    }

    Machine machine = 1;
    Ambient ambient = 2;
    string timeCreated = 3;
}

يعمل التشغيل protoc.exe من Google.Protobuf.Tools NuGet على إنشاء ملف .cs مع التعريف. لا يظهر الملف الذي تم إنشاؤه هنا. يجب عليك التأكد من أن إصدار Protobuf Nuget الذي تستخدمه في مشروع Stream Analytics يتطابق مع إصدار Protobuf الذي تم استخدامه لإنشاء الإدخال.

مقتطف التعليمات البرمجية التالي هو تنفيذ deserializer على افتراض تضمين الملف الذي تم إنشاؤه في المشروع. هذا التنفيذ هو مجرد غلاف رقيق فوق الملف الذي تم إنشاؤه.

    public class MessageBodyDeserializer : StreamDeserializer<SimulatedTemperatureSensor.MessageBodyProto>
    {
        public override IEnumerable<SimulatedTemperatureSensor.MessageBodyProto> Deserialize(Stream stream)
        {
            while (stream.Position < stream.Length)
            {
                yield return SimulatedTemperatureSensor.MessageBodyProto.Parser.ParseDelimitedFrom(stream);
            }
        }

        public override void Initialize(StreamingContext streamingContext)
        {
        }
    }

CSV

مقتطف التعليمات البرمجية التالي هو معطل CSV بسيط يوضح أيضا نشر الأخطاء.

using System.Collections.Generic;
using System.IO;

using Microsoft.Azure.StreamAnalytics;
using Microsoft.Azure.StreamAnalytics.Serialization;

namespace ExampleCustomCode.Serialization
{
    public class CustomCsvDeserializer : StreamDeserializer<CustomEvent>
    {
        private StreamingDiagnostics streamingDiagnostics;

        public override IEnumerable<CustomEvent> Deserialize(Stream stream)
        {
            using (var sr = new StreamReader(stream))
            {
                string line = sr.ReadLine();
                while (line != null)
                {
                    if (line.Length > 0 && !string.IsNullOrWhiteSpace(line))
                    {
                        string[] parts = line.Split(',');
                        if (parts.Length != 3)
                        {
                            streamingDiagnostics.WriteError("Did not get expected number of columns", $"Invalid line: {line}");
                        }
                        else
                        {
                            yield return new CustomEvent()
                            {
                                Column1 = parts[0],
                                Column2 = parts[1],
                                Column3 = parts[2]
                            };
                        }
                    }

                    line = sr.ReadLine();
                }
            }
        }

        public override void Initialize(StreamingContext streamingContext)
        {
            this.streamingDiagnostics = streamingContext.Diagnostics;
        }
    }

    public class CustomEvent
    {
        public string Column1 { get; set; }

        public string Column2 { get; set; }

        public string Column3 { get; set; }
    }
}

تنسيق التسلسل لواجهات برمجة تطبيقات REST

يحتوي كل إدخال في Stream Analytics على تنسيق تسلسل. لمزيد من المعلومات حول خيارات الإدخال، راجع وثائق واجهة برمجة تطبيقات Input REST .

تعد التعليمات البرمجية لجافا سكريبت التالية مثالا على تنسيق تسلسل .NET deserializer عند استخدام واجهة برمجة تطبيقات REST:

{    
   "properties":{    
      "type":"stream",  
      "serialization":{    
         "type":"CustomCLR",  
         "properties":{    
            "serializationDllPath":"<path to the dll inside UserCustomCode\CLR\ folder>", 
            "serializationClassName":"<Full name of the deserializer class name>" 
         }  
      }
   }  
}  

serializationClassName يجب أن تكون فئة تنفذ StreamDeserializer<T>. يتم وصف ذلك في القسم التالي.

دعم المنطقة

تتوفر هذه الميزة في المناطق التالية عند استخدام SKU القياسي:

  • غرب وسط الولايات المتحدة
  • شمال أوروبا
  • شرق الولايات المتحدة
  • غرب الولايات المتحدة
  • East US 2
  • غرب أوروبا

يمكنك طلب الدعم لمناطق إضافية. ومع ذلك، لا توجد قيود إقليمية من هذا القبيل عند استخدام مجموعات Stream Analytics.

الأسئلة المتداولة

متى ستتوفر هذه الميزة في جميع مناطق Azure؟

تتوفر هذه الميزة في 6 مناطق. إذا كنت مهتما باستخدام هذه الوظيفة في منطقة أخرى، يمكنك إرسال طلب. يوجد الدعم لجميع مناطق Azure على خريطة الطريق.

هل يمكنني الوصول إلى MetadataPropertyValue من مدخلاتي المشابهة لوظيفة GetMetadataPropertyValue؟

هذه الوظيفة غير مدعومة. إذا كنت بحاجة إلى هذه الإمكانية ، فيمكنك التصويت لهذا الطلب على UserVoice.

هل يمكنني مشاركة تنفيذ deserializer الخاص بي مع المجتمع حتى يتمكن الآخرون من الاستفادة؟

بمجرد تنفيذ جهاز deserializer الخاص بك ، يمكنك مساعدة الآخرين من خلال مشاركته مع المجتمع. أرسل التعليمات البرمجية الخاصة بك إلى GitHub Azure Stream Analytics.

ما هي القيود الأخرى المفروضة على استخدام أجهزة إلغاء التسلسل المخصصة في Stream Analytics؟

إذا كان إدخالك بتنسيق Protobuf مع مخطط يحتوي على MapField نوع ، فلن تتمكن من تنفيذ جهاز إلغاء تسلسل مخصص. أيضا، لا تدعم أجهزة إلغاء التسلسل المخصصة بيانات العينة أو بيانات المعاينة.

الخطوات التالية