تحميل الملفات من AWS S3 باستخدام محمل تلقائي

محمل السيارات تدريجيا وكفاءة بمعالجة ملفات البيانات الجديدة عند وصولها في AWS S3 ( s3:// ).

يوفر "محمل تلقائي" مصدر تدفق منظم يسمى cloudFiles . نظرا لمسار دليل الإدخال على تخزين الملفات السحابية ، cloudFiles يقوم المصدر تلقائيا بمعالجة الملفات الجديدة عند وصولها ، مع خيار معالجة الملفات الموجودة في هذا الدليل أيضا.

يعمل "محمل تلقائي" مع مسارات DBFS بالإضافة إلى مسارات مباشرة إلى مصدر البيانات.

المتطلبات

Databricks وقت التشغيل 7.2 أو أكثر.

إذا قمت بإنشاء تدفقات باستخدام Databricks Runtime 7.1 أو أقل، راجع التغييرات في قيم الخيارات الافتراضية والتوافقوإدارة موارد السحابة.

أوضاع اكتشاف الملفات

يدعم "محمل تلقائي" وضعين للكشف عن عند وجود ملفات جديدة: سرد الدليل وإعلام الملف.

  • سرد الدليل: يعرف الملفات الجديدة عن طريق سرد دليل الإدخال. يسمح لك وضع سرد الدليل ببدء تدفقات "المحمل التلقائي" بسرعة دون أي تكوينات أذونات بخلاف الوصول إلى بياناتك على AWS S3 وهو مناسب للسيناريوهات التي تحتاج فيها بعض الملفات فقط إلى التدفق بشكل منتظم. وضع سرد الدليل هو الافتراضي ل "محمل تلقائي" في Databricks وقت التشغيل 7.2 وما فوق.
  • إعلام الملف: يستخدم خدمات AWS SNS و SQS التي تشترك في أحداث الملفات من دليل الإدخال. يقوم "محمل تلقائي" تلقائيا بإعداد خدمات AWS SNS و SQS. وضع الإعلام عن الملف أكثر أداء وقابلية للتوسعة لدلائل الإدخال الكبيرة. لاستخدام هذا الوضع، يجب تكوين أذونات لخدمات AWS SNS و SQS وتحديد .

يمكنك تغيير الوضع عند إعادة تشغيل الدفق. على سبيل المثال، قد تحتاج إلى التبديل إلى وضع الإعلام عن الملفات عند إدخال قائمة الدليل بطيء جدا بسبب زيادة حجم دليل الإدخال. لكلا الوضعين، يحتفظ "محمل تلقائي" داخليا بمسارات للملفات التي تمت معالجتها في موقع نقطة التحقق الخاصة بك لتوفير دلالات مرة واحدة بالضبط، لذلك لا تحتاج إلى إدارة أي معلومات حالة بنفسك.

استخدام cloudFiles المصدر

لاستخدام "محمل تلقائي"، قم بإنشاء مصدر بنفس الطريقة التي تقوم cloudFiles بها مصادر الدفق الأخرى. سيبدأ التعليمات البرمجية أدناه دفق محمل تلقائي الكتابة إلى دلتا ليك في وضع سرد الدليل:

Python

df = spark.readStream.format("cloudFiles") \
  .option(<cloudFiles-option>, <option-value>) \
  .schema(<schema>) \
  .load(<input-path>)

df.writeStream.format("delta") \
  .option("checkpointLocation", <checkpoint-path>) \
  .trigger(<trigger>) \
  .start(<output-path>)

Scala

val df = spark.readStream.format("cloudFiles")
  .option(<cloudFiles-option>, <option-value>)
  .schema(<schema>)
  .load(<input-path>)

df.writeStream.format("delta")
  .option("checkpointLocation", <checkpoint-path>)
  .trigger(<trigger>)
  .start(<output-path>)

حيث:

  • <cloudFiles-option> هو خيار تكوين في <cloudFiles-option>.

  • <schema> هو مخطط الملف. يدعم "محمل تلقائي" أيضا استنتاج المخطط والتطور مع بعض تنسيقات الملفات. انظر استنتاج المخطط وتطوره لمزيد من التفاصيل

  • <input-path> هو المسار في التخزين الذي يتم مراقبته للملفات الجديدة. كما يتم رصد أدلة <input-path> الطفل. <input-path> يمكن أن تحتوي على أنماط جلوب الملف. سيكون نمط الجلوب * ملحقا به ؛ إذا كان هذا يتضمن ملفات لا تريد استيعابها ، فيمكنك تضمين فلتر إضافي من خلال pathGlobFilter الخيار. إذا كنت تقوم بتوفير قائمة انتظار لإشعارات الملفات ولا تحتاج إلى ردم أي بيانات، فلن تحتاج إلى توفير مسار إدخال.

  • <checkpoint-path> هو موقع نقطة التحقق دفق.

  • <trigger> مشغل اختياري للتيار. الافتراضي هو تنفيذ الدفعة الصغرى التالية في أسرع وقت ممكن. إذا كان لديك بيانات تصل في فاصل زمني منتظم، على سبيل المثال مرة واحدة في اليوم، يمكنك استخدام Trigger.Once جدولة تنفيذ عمليات الدفق في مهمة Azure Databricks. بالنسبة Databricks Runtime 10.1 و Databricks Runtime 10.1 Photon وما فوق ، يدعم Auto Loader الآن نوعا جديدا من المشغل: لكل من وضعي إدخال الدليل وإعلام الملف. Trigger.AvailableNow يوفر نفس الضمانات مثل Trigger.Once ، الذي يعالج كافة البيانات المتوفرة ثم يتوقف الاستعلام. ولكن Trigger.AvailableNow يمكن تنفيذ الحد من معدل وتقسيم العمل عبر دفعات متعددة، لذلك ينصح بدلا من Trigger.Once . بالنسبة للتدفقات التي يتم تشغيلها دائما، توصي Databricks بتعيين مشغل وقت المعالجة.

  • <output-path> هو مسار دفق الإخراج.

فوائد على اباتشي سبارك FileStreamSource

في اباتشي سبارك، يمكنك قراءة الملفات تدريجيا باستخدام spark.readStream.format(fileFormat).load(directory) . يوفر "محمل تلقائي" الفوائد التالية عبر مصدر الملف:

  • قابلية التوسع: يمكن ل Auto Loader اكتشاف مليارات الملفات بكفاءة. يمكن إجراء عمليات الردم بشكل غير متزامن لتجنب إضاعة أي موارد حساب.
  • الأداء: تكلفة اكتشاف الملفات مع تحجيم "محمل تلقائي" مع عدد الملفات التي يتم تناولها بدلا من عدد الدلائل التي قد تهبط الملفات في. راجع قائمة الدليل المحسن.
  • استنتاج المخطط ودعم التطور: يمكن ل Auto Loader اكتشاف انجرافات المخطط، وإعلامك عند حدوث تغييرات في المخطط، وبيانات الإنقاذ التي كان يمكن تجاهلها أو فقدانها. انظر استنتاج المخطط وتطوره.
  • التكلفة: يستخدم "محمل تلقائي" واجهات برمجة التطبيقات السحابية الأصلية للحصول على قوائم بالملفات الموجودة في التخزين. بالإضافة إلى ذلك، يمكن أن يساعد وضع الإعلام بملف "أداة التحميل التلقائي" في تقليل تكاليف السحابة بشكل أكبر من خلال تجنب إدراج الدليل تماما. يمكن ل Auto Loader إعداد خدمات إعلام الملفات تلقائيا على التخزين لجعل اكتشاف الملف أرخص بكثير.

قائمة الدليل المحسنة

ملاحظة

متوفر في Databricks وقت التشغيل 9.0 وما فوق.

يمكن ل Auto Loader اكتشاف الملفات على أنظمة التخزين السحابية باستخدام قائمة الدليل بشكل أكثر كفاءة من البدائل الأخرى. على سبيل المثال، إذا كان لديك ملفات يتم تحميلها كل 5 دقائق ك /some/path/YYYY/MM/DD/HH/fileName ، للبحث عن كافة الملفات في هذه الدلائل، فإن مصدر ملف Apache Spark يسرد كافة الدلائل الفرعية بالتوازي، مما يؤدي إلى استدعاء 1 (الدليل الأساسي) + 365 (في اليوم) * 24 (في الساعة) = 8761 دليل LIST API إلى التخزين. من خلال تلقي استجابة مسطحة من التخزين ، يقلل Auto Loader من عدد مكالمات API إلى عدد الملفات في التخزين مقسوما على عدد النتائج التي يتم إرجاعها بواسطة كل مكالمة API ، مما يقلل بشكل كبير من تكاليف السحابة الخاصة بك.

الإدراج المتزايد

هام

هذه الميزة في المعاينة العامة.

للملفات التي تم إنشاؤها بشكل معجمي ، يمكن ل Auto Loader الآن الاستفادة من ترتيب الملفات المعجمية و واجهات برمجة التطبيقات المحسنة الموجودة لتحسين كفاءة سرد الدليل عن طريق سرد الملفات التي تم تناولها مسبقا بدلا من سرد الدليل بأكمله.

افتراضيا، تلقائيا الكشف عن "محمل تلقائي" ما إذا كان دليل معين ينطبق على قائمة تزايدية عن طريق التحقق من ومقارنة مسارات الملفات من قوائم الدليل الكامل المكتملة السابقة. لضمان اكتمال النهائي في هذا auto الوضع، سيتم تلقائيا تشغيل "محمل تلقائي" قائمة الدليل الكامل بعد إكمال 7 قوائم تزايدية متتالية. إذا كنت تريد أن تكون أكثر تكرارا أو أقل تكرارا، يمكنك تعيين cloudFiles.backfillInterval تشغيل عمليات ردم غير متزامنة في فاصل زمني معين.

إذا كان لديك الثقة في ترتيب الملفات التي تم إنشاؤها في الدليل، يمكنك بوضوح تشغيل أو إيقاف وضع الإدراج المتزايد عن طريق الإعداد cloudFiles.useIncrementalListing إلى true أو falseauto (افتراضي)، على سبيل المثال، يمكن اعتبار الملفات التي يتم ترتيبها بواسطة date=... أقسام مرتبة بشكل معجمي إذا تمت معالجة البيانات مرة واحدة في اليوم، يمكن اعتبار مسارات الملفات التي تحتوي على طوابع زمن مرتبة بشكل معجمي. يمكنك دائما cloudFiles.backfillInterval استخدامها لضمان استيعاب كافة البيانات عند تشغيل القائمة المتزايدة.

استنتاج المخطط وتطوره

ملاحظة

متوفر في Databricks وقت التشغيل 8.2 وما فوق.

يدعم "محمل تلقائي" استنتاج المخطط والتطور مع CSV وJSON binaryFile وتنسيقات الملفات الثنائية ( ) والنصية. راجع استنتاج المخطط وتطوره في "محمل تلقائي" للحصول على التفاصيل.

تشغيل محمل تلقائي في الإنتاج

توصي Databricks باتباع أفضل ممارسات البث لتشغيل "محمل تلقائي" في الإنتاج.

التكوين

يتم بادئة خيارات التكوين الخاصة cloudFiles بالمصدر cloudFiles بحيث تكون في مساحة اسم منفصلة عن خيارات مصدر "التدفق المنظم" الأخرى.

هام

تغيرت بعض قيم الخيارات الافتراضية في Databricks وقت التشغيل 7.2. إذا كنت تستخدم "محمل تلقائي" على Databricks وقت التشغيل 7.1 أو أقل، راجع التغييرات في قيم الخيار الافتراضي والتوافق.

خيارات تنسيق الملف

مع محمل السيارات يمكنك استيعاب JSON ، ، ، ، ، ، CSVPARQUETAVROTEXTBINARYFILEORC والملفات. راجع تنسيق خيارات الخيارات لتنسيقات الملفات هذه.

خيارات "محمل تلقائي" الشائعة

يمكنك تكوين الخيارات التالية لسرد الدليل أو وضع إعلام الملف.

الخيار
ملفات السحابة.allowكتب

نوع: Boolean

ما إذا كان سيتم السماح بإدخال تغييرات ملف الدليل الكتابة فوق البيانات الموجودة. متوفر في Databricks وقت التشغيل 7.6 وما فوق.

قيمة افتراضية: false
ملفات السحابة.تنسيق

نوع: String

تنسيق ملف البيانات في مسار المصدر. تتضمن القيم المسموح بها:

* avro*
* binaryFileالملف * :
* csv*
* json*
* orcملف ORC
* parquet*
* textالملف النصي

القيمة الافتراضية: بلا (الخيار المطلوب)
ملفات السحاب.تضمين الملفات الموجودة

نوع: Boolean

سواء لتضمين الملفات الموجودة في مسار إدخال معالجة الدفق أو معالجة الملفات الجديدة التي تصل بعد الإعداد الأولي فقط. يتم تقييم هذا الخيار فقط عند بدء دفق للمرة الأولى. تغيير هذا الخيار بعد إعادة تشغيل الدفق ليس له أي تأثير.

قيمة افتراضية: true
ملفات السحابة.إنفركولومنتيبيس

نوع: Boolean

ما إذا كان يمكن استنتاج أنواع الأعمدة الدقيقة عند الاستفادة من استنتاج المخطط. بشكل افتراضي، يتم الاستدلال على الأعمدة كسلاسل عند الاستدلال على مجموعات بيانات JSON. راجع استنتاج المخطط لمزيد من التفاصيل.

قيمة افتراضية: false
ملفات السحابة.ماكس بايتسبيرتريجر

نوع: Byte String

الحد الأقصى لعدد وحدات البايت الجديدة التي ستتم معالجتها في كل مشغل. يمكنك تحديد سلسلة بايت مثل 10g تحديد كل microbatch إلى 10 غيغابايت من البيانات. هذا هو الحد الأقصى لينة. إذا كان لديك ملفات 3 غيغابايت لكل منهما، يقوم Azure Databricks بمعالجة 12 غيغابايت في رقعة صغيرة. عند استخدامها مع cloudFiles.maxFilesPerTrigger ، تستهلك Azure Databricks الحد الأدنى من cloudFiles.maxFilesPerTrigger أو ، cloudFiles.maxBytesPerTrigger أيهما يتم الوصول إليه أولا. لا يكون لهذا الخيار أي تأثير عند استخدامه مع Trigger.Once() .

القيمة الافتراضية: بلا
ملفات السحابة.ماكسفيليج

نوع: Interval String

كم من الوقت يتم تعقب حدث ملف لأغراض إزالة الارتباط. لا توصي Databricks بضبط هذه المعلمة إلا إذا كنت تتناول البيانات بترتيب ملايين الملفات في الساعة. راجع المقطع حول كيفية اختيار maxFileAge لمزيد من التفاصيل.

القيمة الافتراضية: بلا
ملفات السحابة.resourceTags

نوع: Map(String, String)

سلسلة من أزواج العلامات ذات القيمة الرئيسية للمساعدة في إقران الموارد ذات الصلة وتحديدها، على سبيل المثال:

cloudFiles.option("cloudFiles.resourceTag.myFirstKey", "myFirstValue")
.option("cloudFiles.resourceTag.mySecondKey", "mySecondValue")

لمزيد من المعلومات، راجع تسمية قوائم الانتظار وبيانات التعريف وتغطية في اشتراكات الأحداث. يقوم "محمل تلقائي" بتخزين أزواج العلامات ذات القيمة الرئيسية هذه في JSON كعلامات. (1)

القيمة الافتراضية: بلا
ملفات السحابة.مخططالتلافيالتي

نوع: String

يتم اكتشاف وضع تطوير المخطط كأعمدة جديدة في البيانات. بشكل افتراضي، يتم الاستدلال على الأعمدة كسلاسل عند الاستدلال على مجموعات بيانات JSON. راجع تطور المخطط لمزيد من التفاصيل.

القيمة الافتراضية: "addNewColumns" عند عدم توفير مخطط.
"none" خلاف ذلك.
ملفات السحابة.مخططهينتس

نوع: String

معلومات المخطط التي توفرها إلى "محمل تلقائي" أثناء الاستدلال المخطط. راجع تلميحات المخطط للحصول على مزيد من التفاصيل.

القيمة الافتراضية: بلا
ملفات السحابة.موقع المخطط

نوع: String

الموقع لتخزين المخطط المستدل عليه والتغييرات اللاحقة. راجع استنتاج المخطط لمزيد من التفاصيل.

القيمة الافتراضية: بلا (مطلوب عند استنتاج المخطط)
ملفات السحابة.validateOptions

نوع: Boolean

ما إذا كان للتحقق من صحة خيارات "محمل تلقائي" وإرجاع خطأ لخيارات غير معروفة أو غير متناسقة.

قيمة افتراضية: true
ملفات السحابة.الردمالتعيين

>[! هام] >> هذه الميزة في المعاينة >.

نوع: Interval String

يمكن أن يؤدي "محمل تلقائي" إلى عمليات ردم غير متزامنة في فاصل زمني معين،
على سبيل المثال 1 day لملء مرة واحدة في اليوم، أو 1 week لملء مرة واحدة في الأسبوع. أنظمة الإعلام عن الحدث ملف لا تضمن تسليم 100٪ من جميع الملفات التي تم تحميلها وبالتالي يمكنك استخدام الردم لضمان أن جميع الملفات في نهاية المطاف الحصول على معالجتها، المتاحة في Databricks وقت التشغيل 8.4 و Databricks وقت التشغيل 8.4 فوتون وما فوق. إذا كنت تستخدم القائمة الإضافية، يمكنك أيضا استخدام الردم العادية لضمان اكتمال النهائي، متوفرة في Databricks وقت التشغيل 9.1 LTS و Databricks Runtime 9.1 LTS فوتون وما فوق.

القيمة الافتراضية: بلا

(1) يضيف "محمل تلقائي" أزواج العلامات ذات القيمة المفتاحية التالية بشكل افتراضي على أساس أفضل جهد:

  • vendor: Databricks
  • pathالموقع من حيث يتم تحميل البيانات.
  • checkpointLocationموقع نقطة تفتيش الدفق.
  • streamIdمعرف فريد عموميا للتدفق.

يتم حجز أسماء المفاتيح هذه ولا يمكنك الكتابة فوق قيمها.

خيارات سرد الدليل

الخيارات التالية ذات الصلة لوضع سرد الدليل.

الخيار
ملفات السحابة.useالقائمة المشكوك بها

>[! هام] >> هذه الميزة في المعاينة >.

نوع: String

ما إذا كان استخدام قائمة تزايدي بدلا من القائمة الكاملة في وضع سرد الدليل. بشكل افتراضي، سوف يقوم "محمل تلقائي" ببذل قصارى جهده للكشف تلقائيا إذا كان دليل معين قابلا للتطبيق في القائمة المتزايدة. يمكنك استخدام قائمة تزايدية بشكل صريح أو استخدام قائمة الدليل الكامل عن طريق تعيينها ك true أو false على التوالي.

متوفر في Databricks وقت التشغيل 9.1 LTS و Databricks وقت التشغيل 9.1 LTS فوتون وما فوق.

قيمة افتراضية: auto

القيم المتاحة: auto , true , false

كيفية اختيار maxFileAge

ملاحظة

متوفر في Databricks وقت التشغيل 8.4 وما فوق.

يتتبع "محمل تلقائي" الملفات المكتشفة في موقع نقطة التفتيش باستخدام RocksDB لتوفير ضمانات الابتلاع مرة واحدة بالضبط. بالنسبة لمجموعات البيانات ذات الحجم الكبير، يمكنك استخدام maxFileAge الخيار لانتهاء صلاحية الأحداث من موقع نقطة التحقق. القيمة الدنيا التي يمكنك تعيينها maxFileAge هي "14 days" . حذف في RocksDB تظهر كإدخالات علامة مميزة، لذلك يجب أن تتوقع زيادة استخدام التخزين كما تنتهي صلاحية الأحداث قبل أن يبدأ إلى مستوى إيقاف.

تحذير

maxFileAge يتم توفيرها كآلية لمراقبة التكاليف لمجموعات البيانات ذات الحجم الكبير ، بلعها في ترتيب ملايين الملفات كل ساعة. maxFileAgeضبط بشكل غير صحيح يمكن أن يؤدي إلى مشاكل في جودة البيانات. لذلك، لا يوصي Databricks ضبط هذه المعلمة ما لم يكن مطلوبا تماما.

محاولة ضبط maxFileAge الخيار يمكن أن يؤدي إلى الملفات غير المجهزة يتم تجاهلها بواسطة "محمل تلقائي" أو الملفات المعالجة بالفعل انتهاء الصلاحية ومن ثم إعادة معالجتها مما يؤدي إلى تكرار البيانات. إليك بعض الأمور التي يجب مراعاتها عند اختيار maxFileAge :

  • إذا كان الدفق الخاص بك إعادة تشغيل بعد وقت طويل، ملف إعلام الأحداث التي يتم سحبها من قائمة الانتظار الأقدم من maxFileAge يتم تجاهلها. وبالمثل، إذا كنت تستخدم سرد الدليل، الملفات التي قد ظهرت أثناء وقت لأسفل أقدم من maxFileAge يتم تجاهلها.
  • إذا كنت تستخدم وضع سرد الدليل واستخدام maxFileAge ، على سبيل المثال تعيين إلى ، يمكنك إيقاف "1 month" الدفق وإعادة تشغيل الدفق مع maxFileAge تعيين إلى ، كافة الملفات الأقدم من شهر واحد ولكن أحدث من "2 months" 2 أشهر تتم إعادة معالجتها.

أفضل نهج لضبط maxFileAge سيكون للبدء من انتهاء الصلاحية سخية، على سبيل المثال، "1 year" والعمل إلى أسفل إلى شيء من هذا القبيل "9 months" . إذا قمت بتعيين هذا الخيار في المرة الأولى التي تبدأ فيها الدفق، فلن تبتلع بيانات أقدم من maxFileAge ، لذلك ، إذا كنت ترغب في استيعاب البيانات القديمة ، فيجب عدم تعيين هذا الخيار أثناء بدء البث.

خيارات إعلام الملف

الخيارات التالية ذات صلة بوضع الإعلام بالملف.

الخيار
ملفات السحاب.جلبالبراليزم

نوع: Integer

عدد مؤشرات الترابط التي سيتم استخدامها عند جلب الرسائل من خدمة قائمة الانتظار.

القيمة الافتراضية: 1
ملفات السحابة.pathRewrites

النوع: سلسلة JSON

مطلوب فقط إذا قمت بتحديد queueUrl الذي يتلقى إعلامات الملف من دلاء S3 متعددة وتريد الاستفادة من نقاط التحميل تكوين للوصول إلى البيانات في هذه الحاويات. استخدم هذا الخيار لإعادة كتابة بادئة bucket/key المسار مع نقطة التحميل. يمكن إعادة كتابة البادئات فقط. على سبيل المثال، للتكوين
{"<databricks-mounted-bucket>/path": "dbfs:/mnt/data-warehouse"}، المسار
s3://<databricks-mounted-bucket>/path/2017/08/fileA.json تتم إعادة كتابة إلى dbfs:/mnt/data-warehouse/2017/08/fileA.json .

القيمة الافتراضية: بلا
ملفات السحابة.قائمة الانتظارUrl

نوع: String

URL قائمة الانتظار SQS. إذا تم توفيرها، مصدر ملفات مجموعة النظراء مباشرة يستهلك الأحداث من قائمة الانتظار هذه بدلا من إعداد خدمات AWS SNS و SQS الخاصة بها.

القيمة الافتراضية: بلا
ملفات السحابة.useNotifications

نوع: Boolean

ما إذا كان سيتم استخدام وضع الإعلام عن الملفات لتحديد متى توجد ملفات جديدة. إذا false استخدم وضع سرد الدليل. راجع أوضاع اكتشاف الملفات.

قيمة افتراضية: false

توفير الخيار التالي فقط إذا اخترت cloudFiles.useNotifications = true وتريد "محمل تلقائي" لإعداد خدمات الإعلام لك:

الخيار
ملفات السحابة.المنطقة

نوع: String

المنطقة التي يتواجد فيها دلو S3 المصدر وحيث سيتم إنشاء خدمات AWS SNS و SQS.

القيمة الافتراضية: في Databricks وقت التشغيل 9.0 وأعلى منطقة مثيل EC2. في Databricks وقت التشغيل 8.4 و أدناه يجب تحديد المنطقة.

يمكنك استخدام الخيارات التالية لتوفير بيانات الاعتماد للوصول إلى AWS SNS و SQS عندما لا تتوفر أدوار IAM أو عند تناول البيانات من سحب مختلفة.

الخيار
ملفات السحابة.awsAccessKey

نوع: String

معرف مفتاح الوصول AWS للمستخدم. يجب تزويدك
cloudFiles.awsSecretKey.

القيمة الافتراضية: بلا
ملفات السحابة.awsSecretKey

نوع: String

مفتاح الوصول السري AWS للمستخدم. يجب تزويدك
cloudFiles.awsAccessKey.

القيمة الافتراضية: بلا
ملفات السحابة.roleArn

نوع: String

ARN من دور IAM لتولي. يمكن افتراض الدور من ملف تعريف مثيل الكتلة أو عن طريق توفير بيانات الاعتماد مع
cloudFiles.awsAccessKey وcloudFiles.awsSecretKey.

القيمة الافتراضية: بلا
ملفات السحابة.roleExternalId

نوع: String

معرف لتوفيره أثناء تولي دور باستخدام cloudFiles.roleArn .

القيمة الافتراضية: بلا
ملفات السحابة.roleSessionName

نوع: String

اسم جلسة عمل اختياري لاستخدامه أثناء تولي دور باستخدام
cloudFiles.roleArn.

القيمة الافتراضية: بلا
ملفات السحابة.stsEndpoint

نوع: String

نقطة نهاية اختيارية لتوفير الوصول إلى AWS STS عند تولي دور باستخدام cloudFiles.roleArn .

القيمة الافتراضية: بلا

التغييرات في قيم الخيارات الافتراضية والتوافق

القيم الافتراضية لخيارات "محمل تلقائي" التالية تغيرت في Databricks وقت التشغيل 7.2 إلى القيم المسرودة في التكوين.

  • cloudFiles.useNotifications
  • cloudFiles.includeExistingFiles
  • cloudFiles.validateOptions

دفق محمل تلقائي بدأت على Databricks وقت التشغيل 7.1 وما دونها لديها قيم الخيار الافتراضي التالي:

  • cloudFiles.useNotifications هل true
  • cloudFiles.includeExistingFiles هل false
  • cloudFiles.validateOptions هل false

لضمان التوافق مع التطبيقات الموجودة، لا تتغير قيم الخيارات الافتراضية هذه عند تشغيل تدفقات "محمل تلقائي" الموجودة على Databricks Runtime 7.2 أو أعلى؛ سيكون للتدفقات نفس السلوك بعد الترقية.

الأذونات

يجب أن يكون لديك أذونات القراءة لدليل الإدخال. راجع تفاصيل اتصال S3 لمزيد من التفاصيل.

لاستخدام وضع الإعلام عن الملف، قم بإرفاق مستند نهج JSON التالي إلى مستخدم IAM أو دوره.

إذا لم تتمكن من إعداد الأذونات المحددة في مستند نهج JSON، فيمكنك اختياريا أن تطلب من مسؤول تنفيذ الإعداد لك باستخدام واجهة برمجة تطبيقات Scala لإدارة موارد السحابة. يمكن للمسؤول تزويدك بعنوان URL لقائمة الانتظار، والذي يمكنك توفيره مباشرة .option("queueUrl", <queue-url>)cloudFiles للمصدر. مع هذا التكوين، تحتاج فقط إلى أذونات مخفضة. راجع الملحق: تقليل الأذونات بعد الإعداد الأولي للحصول على التفاصيل.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderSetup",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketNotification",
        "s3:PutBucketNotification",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes",
        "sns:SetTopicAttributes",
        "sns:CreateTopic",
        "sns:TagResource",
        "sns:Publish",
        "sns:Subscribe",
        "sqs:CreateQueue",
        "sqs:DeleteMessage",
        "sqs:DeleteMessageBatch",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:SetQueueAttributes",
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility",
        "sqs:ChangeMessageVisibilityBatch"
      ],
      "Resource": [
        "arn:aws:s3:::<bucket-name>",
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderList",
      "Effect": "Allow",
      "Action": [
        "sqs:ListQueues",
        "sqs:ListQueueTags",
        "sns:ListTopics"
      ],
      "Resource": "*"
    },
    {
      "Sid": "DatabricksAutoLoaderTeardown",
      "Effect": "Allow",
      "Action": [
        "sns:Unsubscribe",
        "sns:DeleteTopic",
        "sqs:DeleteQueue"
      ],
      "Resource": [
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    }
  ]
}

حيث:

  • <bucket-name>اسم دلو S3 حيث سيقرأ الدفق الملفات، على سبيل auto-logs المثال. يمكنك استخدام * كحرف بدل، على سبيل المثال. databricks-*-logs لمعرفة دلو S3 الأساسي لمسار DBFS الخاص بك، يمكنك سرد كافة نقاط تحميل DBFS في دفتر ملاحظات عن طريق تشغيل %fs mounts .
  • <region>منطقة AWS حيث يوجد دلو S3، على سبيل us-west-2 المثال. إذا كنت لا تريد تحديد المنطقة، فاستخدم * .
  • <account-number>رقم حساب AWS الذي يملك حاوية S3، على سبيل المثال. 123456789012 إذا لم ترغب في تحديد رقم الحساب، فاستخدم * .

السلسلة databricks-auto-ingest-* في مواصفات SQS و SNS ARN هي بادئة الاسم التي cloudFiles يستخدمها المصدر عند إنشاء خدمات SQS و SNS. منذ Azure Databricks إعداد خدمات الإعلام في التشغيل الأولي للتيار، يمكنك استخدام نهج مع أذونات مخفضة بعد التشغيل الأولي (على سبيل المثال، إيقاف الدفق ومن ثم إعادة تشغيله). راجع الملحق: تقليل الأذونات بعد الإعداد الأولي للحصول على التفاصيل.

ملاحظة

النهج السابق هو المعنية فقط مع الأذونات اللازمة لإعداد خدمات إعلام الملف، وهي إعلام دلو S3، SNS، وخدمات SQS ويفترض لديك بالفعل حق الوصول للقراءة إلى دلو S3. إذا كنت بحاجة إلى إضافة أذونات للقراءة فقط S3 إضافة ما يلي إلى Action القائمة في DatabricksAutoLoaderSetup العبارة في المستند JSON:

  • s3:ListBucket
  • s3:GetObject

استيعاب البيانات بأمان في حساب AWS مختلف

يمكن تحميل "محمل تلقائي" البيانات عبر حسابات AWS بافتراض دور IAM. بعد تعيين بيانات اعتماد الأمان المؤقتة التي تم إنشاؤها بواسطة AssumeRole ، يمكنك الحصول على تحميل "محمل تلقائي" ملفات مجموعة النظراء عبر الحسابات. لإعداد "محمل تلقائي" لحسابات AWS عبر اتبع المستند: _. تأكد من وجود:

  • تحقق من أن لديك دور التعريف AssumeRole المعينة إلى الكتلة.

  • تكوين تكوين Spark الكتلة لتضمين الخصائص التالية:

    fs.s3a.credentialsType AssumeRole
    fs.s3a.stsAssumeRole.arn arn:aws:iam::<bucket-owner-acct-id>:role/MyRoleB
    fs.s3a.acl.default BucketOwnerFullControl
    

المقاييس

تقارير محمل السيارات المقاييس في كل دفعة. يمكنك عرض عدد الملفات الموجودة في التراكمات ومدى ضخامة التراكم في numFilesOutstandingnumBytesOutstanding ومقاييس تحت علامة التبويب numFilesOutstanding في numBytesOutstanding

{
  "sources" : [
    {
      "description" : "CloudFilesSource[/path/to/source]",
      "metrics" : {
        "numFilesOutstanding" : "238",
        "numBytesOutstanding" : "163939124006"
      }
    }
  ]
}

إدارة موارد السحابة

يمكنك استخدام API Scala لإدارة خدمات AWS SNS و SQS التي تم إنشاؤها بواسطة "محمل تلقائي". يجب تكوين أذونات إعداد المورد الموضحة في أذونات قبل استخدام API هذا.

هام

إذا كنت قد استخدمت "محمل تلقائي" في Databricks وقت التشغيل 7.1 وما دون، تحديث نهج IAM باستخدام مستند نهج JSON في أذونات. هناك عبارات جديدة في النهج Databricks Runtime 7.2 — DatabricksAutoLoaderList و DatabricksAutoLoaderTeardown —التي تحدد الأذونات الإضافية المطلوبة من قبل API Scala.

import com.databricks.sql.CloudFilesAWSResourceManager
val manager = CloudFilesAWSResourceManager
    .newManager
    .option("cloudFiles.region", <region>)
    .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
    .create()

// Set up an SQS queue and a topic subscribed to the path provided in the manager. Available in Databricks Runtime 7.4 and above.
manager.setUpNotificationServices(<resource-suffix>)

// List notification services created by Auto Loader
manager.listNotificationServices()

// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

ملاحظة

متوفر في Databricks وقت التشغيل 7.4 وما فوق.

استخدم setUpNotificationServices(<resource-suffix>) لإنشاء قائمة انتظار SQS وموضوع SNS بالاسم databricks-auto-ingest-<resource-suffix> . إذا كان هناك قائمة انتظار SQS موجودة أو موضوع SNS بنفس الاسم، يقوم Azure Databricks بإعادة استخدام المورد الموجود بالفعل بدلا من إنشاء مورد جديد. ترجع هذه الدالة قائمة انتظار SQS التي يمكنك تمريرها إلى cloudFiles المصدر باستخدام .option("cloudFiles.queueUrl", <queue-url>) . وهذا يمكن cloudFiles المستخدم المصدر من الحصول على أذونات أقل من المستخدم الذي يقوم بإنشاء الموارد. راجع أذونات.

توفير "path" الخيار newManager فقط إذا كان الاتصال setUpNotificationServices ؛ لا حاجة ل أو listNotificationServicestearDownNotificationServices . هذا هو نفس path الذي تستخدمه عند تشغيل استعلام دفق.

الأسئلة المتداولة (FAQ)

هل أحتاج إلى إنشاء خدمات إعلام حدث AWS مسبقا؟

كلا. إذا اخترت وضع إعلام الملف، يقوم "محمل تلقائي" بإنشاء خط إعلام حدث ملف قائمة > انتظار SQS SNS موضوع AWS S3 > تلقائيا عند بدء الدفق.

كيف يمكنني تنظيف موارد إعلام الحدث، مثل مواضيع SNS وقوائم الانتظار SQS، التي تم إنشاؤها بواسطة "محمل تلقائي"؟

يمكنك استخدام إدارة موارد مجموعة النظراء لسرد الموارد وهدمها. يمكنك أيضا حذف هذه الموارد يدويا، إما في وحدة تحكم ويب أو باستخدام واجهات برمجة التطبيقات AWS. تحتوي كافة الموارد التي تم إنشاؤها بواسطة "محمل تلقائي" على البادئة: databricks-auto-ingest- .

هل يقوم "محمل تلقائي" بمعالجة الملف مرة أخرى عندما يتم إلحاق الملف أو الكتابة فوقه؟

تتم معالجة الملفات مرة واحدة فقط ما لم تقم بتمكين cloudFiles.allowOverwrites . إذا تم إلحاق ملف أو الكتابة فوقه، Databricks لا يضمن أي إصدار من الملف تتم معالجته. لسلوك محدد جيدا، Databricks توصي باستخدام "محمل تلقائي" إلى استيعاب الملفات غير قابلة للتغيير فقط. إذا لم يكن ذلك يلبي متطلباتك، فاتصل بممثل Databricks.

هل يمكنني تشغيل العديد من استعلامات البث من نفس دليل الإدخال؟

نعم. كل دفق ملفات مجموعة النظراء، كما هو محدد من قبل دليل نقطة تفتيش فريدة من نوعها، لديها قائمة انتظار SQS الخاصة بها، ويمكن إرسال نفس الأحداث AWS S3 إلى قوائم انتظار SQS متعددة.

إذا لم تصل ملفات البيانات بشكل مستمر، ولكن على فترات منتظمة، على سبيل المثال، مرة واحدة في اليوم، هل يجب أن أستخدم هذا المصدر وهل هناك أي فوائد؟

نعم ونعم. في هذه الحالة، يمكنك إعداد Trigger-Once مهمة "تدفق منظم" وجدولة تشغيلها بعد وقت وصول الملف المتوقع. يقوم التشغيل الأول بإعداد خدمات إعلام الحدث، والتي ستكون دائما قيد التشغيل، حتى عندما تكون كتلة البث في حالة إيقاف التشغيل. عند إعادة تشغيل cloudFiles الدفق، المصدر إحضار و معالجة كافة الملفات الأحداث التي تم إجراء الرواهم في قائمة انتظار SQS. الفائدة من استخدام "محمل تلقائي" لهذه الحالة هو أنك لا تحتاج إلى تحديد الملفات الجديدة والتي سيتم معالجتها في كل مرة، والتي يمكن أن تكون مكلفة للغاية.

ماذا يحدث إذا قمت بتغيير موقع نقطة التفتيش عند إعادة تشغيل الدفق؟

يحتفظ موقع نقطة التحقق بمعلومات تعريف هامة للتدفق. تغيير موقع نقطة التحقق بشكل فعال يعني أنك قد تخليت عن الدفق السابق وبدأت دفق جديد. سيقوم الدفق الجديد بإنشاء معلومات تقدم جديدة، وإذا كنت تستخدم وضع إعلام الملف، فإن خدمات AWS SNS و SQS الجديدة. يجب تنظيف موقع نقطة التفتيش وخدمات AWS SNS و SQS يدويا لأي تيارات مهجورة.

هل يمكنني تشغيل استعلامات تدفق متعددة من أدلة إدخال مختلفة على نفس حاوية S3؟

نعم، طالما أنها ليست دلائل الوالدين والطفل، على سبيل المثال، prod-logs/ و prod-logs/usage/ .

هل يمكنني استخدام هذه الميزة عندما تكون هناك إشعارات ملفات موجودة على حاوية S3 الخاصة بي؟

نعم، طالما أن دليل الإدخال الخاص بك لا يتعارض مع بادئة الإعلام الموجودة (على سبيل المثال، الدلائل الأصل التابعة أعلاه).

الملحق: أذونات مخفضة بعد الإعداد الأولي

أذونات إعداد المورد الموضحة في أذونات مطلوبة فقط أثناء التشغيل الأولي للتدفق. بعد التشغيل الأول، يمكنك التبديل إلى نهج IAM التالي بأذونات مخفضة.

هام

مع الأذونات المخفضة، لن تتمكن من بدء تشغيل استعلامات تدفق جديدة أو إعادة إنشاء الموارد في حالة حدوث فشل (على سبيل المثال، تم حذف قائمة انتظار SQS بطريق الخطأ)؛ كما أنك لن تتمكن من استخدام واجهة برمجة تطبيقات إدارة موارد السحابة لسرد الموارد أو هدمها.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderUse",
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketNotification",
       "sns:ListSubscriptionsByTopic",
       "sns:GetTopicAttributes",
       "sns:TagResource",
       "sns:Publish",
       "sqs:DeleteMessage",
       "sqs:DeleteMessageBatch",
       "sqs:ReceiveMessage",
       "sqs:SendMessage",
       "sqs:GetQueueUrl",
       "sqs:GetQueueAttributes",
       "sqs:TagQueue",
       "sqs:ChangeMessageVisibility",
       "sqs:ChangeMessageVisibilityBatch"
      ],
      "Resource": [
       "arn:aws:sqs:<region>:<account-number>:<queue-name>",
       "arn:aws:sns:<region>:<account-number>:<topic-name>",
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:GetBucketLocation",
       "s3:ListBucket"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
       "s3:PutObject",
       "s3:PutObjectAcl",
       "s3:GetObject",
       "s3:DeleteObject"
      ],
      "Resource": [
       "arn:aws:s3:::<bucket-name>/*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderListTopics",
      "Effect": "Allow",
      "Action": [
       "sqs:ListQueues",
       "sqs:ListQueueTags",
       "sns:ListTopics"
      ],
      "Resource": "arn:aws:sns:<region>:<account-number>:*"
    }
  ]
}