استنتاج المخطط والتطور في محمل تلقائي

ملاحظة

يتوفر دعم تنسيق JSON في Databricks وقت التشغيل 8.2 وما فوق; يتوفر دعم تنسيق CSV في Databricks Runtime 8.3 وما فوق. للحصول على تفاصيل حول كل تنسيق، راجع تنسيقات البيانات.

يمكن ل Auto Loader اكتشاف إدخال أعمدة جديدة إلى بياناتك تلقائيا وإعادة التشغيل حتى لا تضطر إلى إدارة تعقب ومعالجة تغييرات المخطط بنفسك. يمكن أيضا "إنقاذ" محمل السيارات البيانات التي كانت غير متوقعة (على سبيل المثال، من أنواع البيانات المختلفة) في عمود النقطة JSON، التي يمكنك اختيار الوصول إليها في وقت لاحق باستخدام واجهات برمجة التطبيقات الوصول إلى البيانات شبه منظم.

استنتاج المخطط

للاستدلال على المخطط، يقوم "محمل تلقائي" بنماذج أول ملفات 50 جيجابايت أو 1000 التي يكتشفها، أيهما يتم عبور الحد أولا. لتجنب تكبد تكلفة الاستدلال هذه في كل بدء تشغيل دفق، ولتكون قادرا على توفير مخطط مستقر عبر إعادة تشغيل الدفق، يجب عليك تعيين الخيار cloudFiles.schemaLocation . يقوم "محمل تلقائي" بإنشاء دليل مخفي _schemas في هذا الموقع لتعقب تغييرات المخطط على بيانات الإدخال بمرور الوقت. إذا كان الدفق يحتوي على مصدر واحد cloudFiles لاستيعاب البيانات، يمكنك توفير موقع نقطة التفتيش على أنه cloudFiles.schemaLocation . وإلا، توفير دليل فريد لهذا الخيار. إذا كانت بيانات الإدخال الخاصة بك ترجع مخططا غير متوقع للتيار الخاص بك، فتحقق من أن موقع المخطط الخاص بك يتم استخدامه من قبل مصدر محمل تلقائي واحد فقط.

ملاحظة

لتغيير حجم العينة المستخدمة يمكنك تعيين تكوينات SQL:

spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes

(سلسلة البايت، على سبيل 10gb المثال)

و

spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles

(عدد صحيح)

افتراضيا، يستدل "محمل تلقائي" على أعمدة بتنسيقات ملفات مستندة إلى نص مثل CSV وJSON string كأعمدة. في مجموعات بيانات JSON، يتم الاستدلال على الأعمدة المتداخلة string كأعمدة. وبما أن بيانات JSON و CSV تصف نفسها بنفسها ويمكن أن تدعم العديد من أنواع البيانات، فإن استنتاج البيانات كسلسلة يمكن أن يساعد في تجنب مشكلات تطور المخطط مثل عدم تطابق النوع الرقمي (أعداد صحيحة، طويلة، عوامات). إذا كنت ترغب في الاحتفاظ بسلوك الاستدلال مخطط Spark الأصلي تعيين الخيار cloudFiles.inferColumnTypes إلى true .

ملاحظة

ما لم يتم تمكين حساسية الحالة، فإن الأعمدة foo و و تعتبر نفس FooFOO العمود. اختيار الحالة التي سيتم تمثيل العمود فيها هو إجباري ويعتمد على البيانات التي تم أخذ عينات منها. يمكنك استخدام تلميحات المخطط لفرض أي حالة يجب استخدامها.

يحاول "محمل تلقائي" أيضا استنتاج أعمدة القسم من بنية الدليل الأساسي للبيانات إذا تم وضع البيانات في تقسيم نمط الخلية. على سبيل المثال، مسار ملف مثل base_path/event=click/date=2021-04-01/f0.json قد يؤدي إلى الاستدلال على أعمدة القسم dateevent وكأعمدة. ستكون أنواع البيانات لهذه الأعمدة سلاسل ما لم يتم تعيين cloudFiles.inferColumnTypes إلى true. إذا كانت بنية الدليل الأساسي تحتوي على أقسام خلية متعارضة أو لا تحتوي على تقسيم نمط الخلية، سيتم تجاهل أعمدة القسم. يمكنك توفير الخيار cloudFiles.partitionColumns كقائمة مفصولة بفاصلة من أسماء الأعمدة لمحاولة تحليل الأعمدة المعطى من مسار الملف إذا كانت هذه key=value الأعمدة موجودة كأزواج في بنية الدليل.

عندما يستدل "محمل تلقائي" على المخطط، تتم إضافة عمود بيانات تم إنقاذه تلقائيا إلى المخطط الخاص بك ك . راجع القسم الخاص بعمود البيانات التي تم إنقاذهاوتطور المخطط للحصول على التفاصيل.

ملاحظة

ملف ثنائي ( binaryFile ) text وتنسيقات الملفات لديها مخططات بيانات ثابتة، ولكن أيضا دعم استنتاج عمود القسم. يتم الاستدلال على أعمدة القسم عند إعادة تشغيل كل دفق ما لم تحدد cloudFiles.schemaLocation . لتجنب أي أخطاء محتملة أو فقدان المعلومات، توصي Databricks بإعداد cloudFiles.schemaLocation أو cloudFiles.partitionColumns كخيارات لتنسيقات الملفات هذه كما cloudFiles.schemaLocation هو ليس خيارا مطلوبا لهذه التنسيقات.

تلميحات المخطط

أنواع البيانات التي يتم الاستدلال عليها قد لا تكون دائما بالضبط ما تبحث عنه. باستخدام تلميحات المخطط، يمكنك فرض المعلومات التي تعرفها وتتوقعها على مخطط استدلال.

بشكل افتراضي، لدى Apache Spark أسلوب قياسي للاستدلال على نوع أعمدة البيانات. على سبيل المثال، فإنه يستنتج JSON المتداخلة كما البنيات والأعداد الصحيحة كأطول. في المقابل، يعتبر "محمل تلقائي" كافة الأعمدة كسلاسل. عندما تعرف أن عمودا من نوع بيانات محدد، أو إذا كنت تريد اختيار نوع بيانات أكثر عمومية (على سبيل المثال، مزدوج بدلا من عدد صحيح)، يمكنك توفير عدد عشوائي من تلميحات الأعمدة أنواع البيانات كما يلي:

.option("cloudFiles.schemaHints", "tags map<string,string>, version int")

راجع الوثائق المتعلقة بأنواع البيانات لقائمة أنواع البيانات المعتمدة.

إذا لم يكن عمود موجودا في بداية الدفق، يمكنك أيضا استخدام تلميحات المخطط لإضافة هذا العمود إلى المخطط المستدل عليه.

هنا مثال على مخطط استدل عليه لرؤية السلوك مع تلميحات المخطط. المخطط المستدل عليه:

|-- date: string
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string
|-- purchase_options: struct
|    |-- delivery_address: string

بتحديد تلميحات المخطط التالية:

.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")

سوف تحصل على:

|-- date: string -> date
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp

ملاحظة

صفيف وخريطة مخطط تلميحات الدعم متاح في Databricks وقت التشغيل 9.1 LTS و Databricks وقت التشغيل 9.1 LTS فوتون وما فوق.

هنا مثال على مخطط استدل عليه مع أنواع البيانات المعقدة لرؤية السلوك مع تلميحات المخطط. المخطط المستدل عليه:

|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int

بتحديد تلميحات المخطط التالية:

.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")

سوف تحصل على:

|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string -> int
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string -> int
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int -> string

ملاحظة

يتم استخدام تلميحات المخطط فقط إذا لم تقم بتوفير مخطط إلى "محمل تلقائي". يمكنك استخدام تلميحات المخطط سواء cloudFiles.inferColumnTypes تم تمكينه أو تعطيله.

تطور المخطط

يكشف "محمل تلقائي" إضافة أعمدة جديدة أثناء معالجة البيانات. بشكل افتراضي، سيؤدي إضافة عمود جديد إلى توقف عمليات البث باستخدام UnknownFieldException . قبل دفق يطرح هذا الخطأ، "محمل تلقائي" تنفيذ استنتاج المخطط على أحدث مجموعة صغيرة من البيانات، وتحديث موقع المخطط مع أحدث مخطط. يتم دمج أعمدة جديدة إلى نهاية المخطط. تبقى أنواع البيانات من الأعمدة الموجودة دون تغيير. من خلال تعيين دفق "محمل تلقائي" داخل مهمة "إعادة تشكيل البيانات أزور"، يمكنك الحصول على دفق الخاص بك لإعادة تشغيل تلقائيا بعد تغيير المخطط.

يدعم "محمل تلقائي" الأوضاع التالية لتطور المخطط، والتي قمت بتعيينها في الخيار cloudFiles.schemaEvolutionMode :

  • addNewColumnsالوضع الافتراضي عند عدم توفير مخطط إلى "محمل تلقائي". ستفشل مهمة البث باستخدام UnknownFieldException . يتم إضافة أعمدة جديدة إلى المخطط. لا تقوم الأعمدة الموجودة بتطوير أنواع البيانات. addNewColumns غير مسموح به عند توفير مخطط الدفق. يمكنك بدلا من ذلك توفير المخطط الخاص بك كمخطط تلميح بدلا من ذلك إذا كنت ترغب في استخدام هذا الوضع.
  • failOnNewColumnsإذا اكتشف "محمل تلقائي" عمود جديد، سيفشل الدفق. لن يتم إعادة تشغيله إلا إذا تم تحديث المخطط المتوفر أو إزالة ملف البيانات المخالف.
  • rescueتشغيل الدفق مع المخطط المستدل عليه أو المقدمة الأولى. يتم إنقاذ أي تغييرات في نوع البيانات أو أعمدة جديدة تمت إضافتها في عمود البيانات الذي تم إنقاذه الذي تتم إضافته تلقائيا إلى مخطط الدفق الخاص بك ك . في هذا الوضع، لن يفشل الدفق بسبب تغييرات المخطط.
  • noneالوضع الافتراضي عند توفير مخطط. لا يتطور المخطط، يتم تجاهل أعمدة جديدة، ولا يتم إنقاذ البيانات إلا إذا تم توفير عمود البيانات التي تم إنقاذها بشكل منفصل كخيار.

لا تعتبر أعمدة القسم لتطور المخطط. إذا كان لديك بنية دليل أولية مثل base_path/event=click/date=2021-04-01/f0.json ، ثم ابدأ في تلقي ملفات جديدة ك ، يتم تجاهل عمود base_path/event=click/date=2021-04-01/hour=01/f1.json الساعة. لالتقاط معلومات لأعمدة أقسام جديدة، قم بتعيين cloudFiles.partitionColumns إلى event,date,hour .

عمود البيانات التي تم إنقاذها

يضمن عمود البيانات الذي تم إنقاذه عدم فقدان البيانات أو تفويتها أثناء ETL. يحتوي عمود البيانات التي تم إنقاذها على أية بيانات لم يتم تحليلها، إما لأنها كانت مفقودة من المخطط المحدد، أو بسبب عدم تطابق النوع، أو لأن غلاف العمود في السجل أو الملف لم يتطابق مع ذلك في المخطط. يتم إرجاع عمود البيانات التي تم إنقاذها كنقطة JSON تحتوي على الأعمدة التي تم إنقاذها، ومسار الملف المصدر للسجل (مسار الملف المصدر متوفر في Databricks Runtime 8.3 وما فوق). عمود البيانات التي تم إنقاذها جزء من المخطط الذي تم إرجاعه بواسطة "محمل تلقائي" بشكل _rescued_data افتراضي عند الاستدلال على المخطط. يمكنك إعادة تسمية العمود أو تضمينه في الحالات التي تقوم فيها بتوفير مخطط عن طريق تعيين الخيار rescuedDataColumn .

منذ القيمة الافتراضية من cloudFiles.inferColumnTypes هو ، و هو عندما يتم الاستدلال على المخطط يلتقط falsecloudFiles.schemaEvolutionModeaddNewColumnsrescuedDataColumn الأعمدة التي لها حالة مختلفة عن تلك الموجودة في المخطط.

يدعم موزعو JSON و CSV ثلاثة أوضاع عند تحليل السجلات: PERMISSIVE و DROPMALFORMED و و FAILFAST . عند استخدامها مع rescuedDataColumn ، لا يؤدي عدم تطابق نوع البيانات إلى إسقاط السجلات في DROPMALFORMED الوضع أو رمي خطأ في FAILFAST الوضع. يتم إسقاط أو رمي الأخطاء فقط السجلات التالفة - أي، JSON أو CSV غير كاملة أو مشوهة. إذا كنت تستخدم badRecordsPath عند تحليل JSON أو CSV، لا تعتبر عدم تطابق نوع البيانات كسجلات تالفة عند استخدام rescuedDataColumn . يتم تخزين سجلات JSON أو CSV غير المكتملة والمتشوهة فقط في badRecordsPath .

تنسيقات البيانات

التقييدات

  • لا يتم اعتماد تطور المخطط في تطبيقات Python التي تعمل على Databricks Runtime 8.2 و 8.3 التي تستخدم foreachBatch . يمكنك استخدام foreachBatch في سكالا بدلا من ذلك.

مثال حالات الاستخدام

تمكين ETL سهلة

طريقة سهلة للحصول على البيانات الخاصة بك في بحيرة دلتا دون فقدان أي بيانات هو استخدام النمط التالي وتمكين الاستدلال المخطط مع محمل السيارات. توصي Databricks بتشغيل التعليمات البرمجية التالية في مهمة Azure Databricks لكي تقوم بإعادة تشغيل الدفق تلقائيا عند تغيير مخطط البيانات المصدر. بشكل افتراضي، يتم الاستدلال على المخطط كأنواع سلسلة، أي أخطاء تحليل (يجب أن يكون هناك لا شيء إذا بقي كل شيء كسلسلة) سوف تذهب إلى _rescued_data ، وأي أعمدة جديدة سوف تفشل الدفق وتتطور المخطط.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.schemaLocation", "<path_to_schema_location>") \
  .load("<path_to_source_data>") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path_to_checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path_to_schema_location>")
  .load("<path_to_source_data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path_to_checkpoint>")
  .start("<path_to_target")

منع فقدان البيانات في البيانات جيدة التنظيم

عندما تعرف المخطط الخاص بك، ولكن تريد أن تعرف كلما تلقيت بيانات غير متوقعة، توصي Databricks باستخدام rescuedDataColumn .

Python

spark.readStream.format("cloudFiles") \
  .schema(expected_schema) \
  .option("cloudFiles.format", "json") \
  # will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue") \
  .load("<path_to_source_data>") \
  .writeStream \
  .option("checkpointLocation", "<path_to_checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .schema(expected_schema)
  .option("cloudFiles.format", "json")
  // will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load("<path_to_source_data>")
  .writeStream
  .option("checkpointLocation", "<path_to_checkpoint>")
  .start("<path_to_target")

إذا كنت تريد أن يتوقف الدفق عن المعالجة إذا تم تقديم حقل جديد لا يتطابق مع المخطط، يمكنك إضافة:

.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")

تمكين خطوط أنابيب البيانات شبه المنظمة المرنة

عندما تتلقى بيانات من مورد يقدم أعمدة جديدة إلى المعلومات التي يقدمونها، قد لا تكون على علم بالضبط بموعد القيام بذلك، أو قد لا يكون لديك عرض النطاق الترددي لتحديث خط أنابيب البيانات. يمكنك الآن الاستفادة من تطور المخطط لإعادة تشغيل الدفق والسماح ل Auto Loader بتحديث المخطط المستدل عليه تلقائيا. يمكنك أيضا schemaHints الاستفادة من بعض الحقول "بدون تخطيط" التي قد يوفرها المورد.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT") \
  .load("/api/requests") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path_to_checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT")
  .load("/api/requests")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path_to_checkpoint>")
  .start("<path_to_target")

الأسئلة المتداولة (FAQs)

كيف مخطط الاستدلال "محمل تلقائي"؟

عند تعريف DataFrame أولا، يسرد "محمل تلقائي" الدليل المصدر ويختار أحدث ملفات 50 جيجابايت أو 1000 (حسب وقت تعديل الملف)، ويستخدمها للاستدلال على مخطط البيانات.

يقوم "محمل تلقائي" أيضا باستنتاج أعمدة القسم عن طريق فحص بنية الدليل المصدر ويبحث عن مسارات الملفات التي تحتوي على /key=value/ البنية. إذا كان الدليل المصدر بنية غير متناسقة على سبيل المثال:

base/path/partition=1/date=2020-12-31/file1.json
// inconsistent because date and partition directories are in different orders
base/path/date=2020-12-31/partition=2/file2.json
// inconsistent because the date directory is missing
base/path/partition=3/file3.json

يستدل "محمل تلقائي" على أعمدة القسم على أنها فارغة. استخدم cloudFiles.partitionColumns تحليل الأعمدة بوضوح من بنية الدليل.

كيف يتصرف "محمل تلقائي" عندما يكون المجلد المصدر فارغا؟

إذا كان الدليل المصدر فارغا، يتطلب منك "محمل تلقائي" توفير مخطط حيث لا توجد بيانات لإجراء الاستدلال.

متى يقوم مخطط الاستدلال بالتحزيل التلقائي؟ هل تتطور تلقائيا بعد كل دفعة صغيرة؟

يتم الاستدلال على المخطط عند تعريف DataFrame لأول مرة داخل التعليمات البرمجية الخاصة بك. أثناء كل دفعة صغيرة، يتم تقييم تغييرات المخطط على الطاير؛ لذلك، لا داعي للقلق حول الأداء يضرب. عند إعادة تشغيل الدفق، فإنه يلتقط المخطط المتطور من موقع المخطط ويبدأ التنفيذ دون أي حمل للاستدلال.

ما هو تأثير الأداء على استيعاب البيانات عند استخدام الاستدلال مخطط محمل السيارات؟

يجب أن تتوقع استدلال المخطط يستغرق بضع دقائق للحصول على دلائل مصدر كبيرة جدا أثناء الاستدلال المخطط الأولي. يجب عدم ملاحظة أداء هام يضرب خلاف ذلك أثناء تنفيذ دفق. إذا قمت بتشغيل التعليمات البرمجية في دفتر ملاحظات Azure Databricks، يمكنك مشاهدة تحديثات الحالة التي تحدد متى سيتم سرد "محمل تلقائي" الدليل لأخذ عينات والاستدلال على مخطط البيانات.

بسبب وجود خلل، ملف سيئ قد غيرت مخططي بشكل كبير. ماذا يجب أن أفعل للتراجع عن تغيير المخطط؟

اتصل Databricks الدعم للحصول على مساعدة.