العمل مع محفوظات جدول Delta Lake

تنشئ كل عملية تقوم بتعديل جدول Delta Lake إصدار جدول جديد. يمكنك استخدام معلومات المحفوظات لتدقيق العمليات أو التراجع عن جدول أو الاستعلام عن جدول في نقطة زمنية معينة باستخدام السفر عبر الزمن.

إشعار

لا توصي Databricks باستخدام محفوظات جدول Delta Lake كحل نسخ احتياطي طويل الأجل لأرشفة البيانات. توصي Databricks باستخدام الأيام السبعة الماضية فقط لعمليات السفر عبر الوقت ما لم تقم بتعيين كل من تكوينات استبقاء البيانات والسجلات إلى قيمة أكبر.

استرداد محفوظات جدول Delta

يمكنك استرداد المعلومات بما في ذلك العمليات والمستخدم والطابع الزمني لكل كتابة إلى جدول Delta عن طريق تشغيل history الأمر . يتم إرجاع العمليات بترتيب زمني عكسي.

يتم تحديد استبقاء محفوظات الجدول بواسطة إعداد delta.logRetentionDurationالجدول ، وهو 30 يوما بشكل افتراضي.

إشعار

يتم التحكم في السفر عبر الزمن ومحفوظات الجدول من خلال حدود استبقاء مختلفة. راجع ما هو السفر عبر الزمن Delta Lake؟.

DESCRIBE HISTORY '/data/events/'          -- get the full history of the table

DESCRIBE HISTORY delta.`/data/events/`

DESCRIBE HISTORY '/data/events/' LIMIT 1  -- get the last operation only

DESCRIBE HISTORY eventsTable

للحصول على تفاصيل بناء جملة Spark SQL، راجع وصف محفوظات.

راجع وثائق واجهة برمجة تطبيقات Delta Lake للحصول على تفاصيل بناء جملة Scala/Java/Python.

يوفر مستكشف الكتالوج طريقة عرض مرئية لمعلومات الجدول التفصيلية هذه ومحفوظات جداول دلتا. بالإضافة إلى مخطط الجدول وعينة البيانات، يمكنك النقر فوق علامة التبويب محفوظات لمشاهدة محفوظات الجدول التي يتم عرضها باستخدام DESCRIBE HISTORY.

مخطط المحفوظات

يحتوي إخراج العملية على history الأعمدة التالية.

Column نوع ‏‏الوصف
إصدار طويل إصدار الجدول الذي تم إنشاؤه بواسطة العملية.
الطابع الزمني الطابع الزمني عندما تم تثبيت هذا الإصدار.
userId سلسلة معرف المستخدم الذي قام بتشغيل العملية.
userName سلسلة اسم المستخدم الذي قام بتشغيل العملية.
‏‏التشغيل سلسلة اسم العملية.
operationParameters map معلمات العملية (على سبيل المثال، دالات التقييم.)
المهمة بنية تفاصيل المهمة التي شغلت العملية.
notebook بنية تفاصيل دفتر الملاحظات الذي تم تشغيل العملية منه.
clusterId سلسلة معرف نظام المجموعة الذي تم تشغيل العملية عليه.
readVersion طويل إصدار الجدول الذي تمت قراءته لتنفيذ عملية الكتابة.
isolationLevel سلسلة مستوى العزل المستخدم لهذه العملية.
isBlindAppend boolean ما إذا كانت هذه العملية قد ألحقت البيانات أم لا.
operationMetrics map مقاييس العملية (على سبيل المثال، عدد الصفوف والملفات المعدلة.)
بيانات تعريف المستخدم سلسلة بيانات تعريف التثبيت المعرفة من قبل المستخدم إذا تم تحديدها
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+
|      5|2019-07-29 14:07:47|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          4|WriteSerializable|        false|[numTotalRows -> ...|
|      4|2019-07-29 14:07:41|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          3|WriteSerializable|        false|[numTotalRows -> ...|
|      3|2019-07-29 14:07:29|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          2|WriteSerializable|        false|[numTotalRows -> ...|
|      2|2019-07-29 14:06:56|   ###|     ###|   UPDATE|[predicate -> (id...|null|     ###|      ###|          1|WriteSerializable|        false|[numTotalRows -> ...|
|      1|2019-07-29 14:04:31|   ###|     ###|   DELETE|[predicate -> ["(...|null|     ###|      ###|          0|WriteSerializable|        false|[numTotalRows -> ...|
|      0|2019-07-29 14:01:40|   ###|     ###|    WRITE|[mode -> ErrorIfE...|null|     ###|      ###|       null|WriteSerializable|         true|[numFiles -> 2, n...|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+

إشعار

مفاتيح مقاييس العملية

ترجع history العملية مجموعة من مقاييس العمليات في operationMetrics مخطط العمود.

تسرد الجداول التالية تعريفات مفتاح الخريطة حسب العملية.

العملية اسم قياسي ‏‏الوصف
كتابة، إنشاء جدول كحدد، استبدال جدول كحدد، نسخ إلى
ملفات numFiles عدد الملفات المكتوبة.
numOutputBytes الحجم بالبايت للمحتويات المكتوبة.
numOutputRows عدد الصفوف المكتوبة.
تحديث الدفق
numAddedFiles عدد الملفات المضافة.
numRemovedFiles عدد الملفات التي تمت إزالتها.
numOutputRows عدد الصفوف المكتوبة.
numOutputBytes حجم الكتابة بالبايت.
حذف
numAddedFiles عدد الملفات المضافة. لا يتم توفيره عند حذف أقسام الجدول.
numRemovedFiles عدد الملفات التي تمت إزالتها.
numDeletedRows عدد الصفوف التي تمت إزالتها. لا يتم توفيره عند حذف أقسام الجدول.
numCopiedRows عدد الصفوف المنسخة في عملية حذف الملفات.
executionTimeMs الوقت المستغرق لتنفيذ العملية بأكملها.
scanTimeMs الوقت المستغرق لفحص الملفات بحثا عن التطابقات.
إعادة كتابةTimeMs الوقت المستغرق لإعادة كتابة الملفات المتطابقة.
اقتطاع
numRemovedFiles عدد الملفات التي تمت إزالتها.
executionTimeMs الوقت المستغرق لتنفيذ العملية بأكملها.
دمج
numSourceRows عدد الصفوف في DataFrame المصدر.
numTargetRowsInserted عدد الصفوف المدرجة في الجدول الهدف.
numTargetRowsUpdated عدد الصفوف التي تم تحديثها في الجدول الهدف.
numTargetRowsDeleted عدد الصفوف المحذوفة في الجدول الهدف.
numTargetRowsCopied عدد الصفوف الهدف المنسخة.
numOutputRows إجمالي عدد الصفوف المكتوبة.
تمت إضافة numTargetFiles عدد الملفات المضافة إلى المتلقي (الهدف).
numTargetFilesRemoved عدد الملفات التي تمت إزالتها من المتلقي (الهدف).
executionTimeMs الوقت المستغرق لتنفيذ العملية بأكملها.
scanTimeMs الوقت المستغرق لفحص الملفات بحثا عن التطابقات.
إعادة كتابةTimeMs الوقت المستغرق لإعادة كتابة الملفات المتطابقة.
تحديث
numAddedFiles عدد الملفات المضافة.
numRemovedFiles عدد الملفات التي تمت إزالتها.
numUpdatedRows عدد الصفوف المحدثة.
numCopiedRows عدد الصفوف التي تم نسخها للتو في عملية تحديث الملفات.
executionTimeMs الوقت المستغرق لتنفيذ العملية بأكملها.
scanTimeMs الوقت المستغرق لفحص الملفات بحثا عن التطابقات.
إعادة كتابةTimeMs الوقت المستغرق لإعادة كتابة الملفات المتطابقة.
FSCK numRemovedFiles عدد الملفات التي تمت إزالتها.
CONVERT numConvertedFiles عدد ملفات Parquet التي تم تحويلها.
تحسين
numAddedFiles عدد الملفات المضافة.
numRemovedFiles عدد الملفات المحسنة.
numAddedBytes عدد وحدات البايت المضافة بعد تحسين الجدول.
numRemovedBytes عدد وحدات البايت التي تمت إزالتها.
minFileSize حجم أصغر ملف بعد تحسين الجدول.
p25FileSize حجم الملف 25 بالمائة بعد تحسين الجدول.
p50FileSize متوسط حجم الملف بعد تحسين الجدول.
p75FileSize حجم الملف 75 بالمائة بعد تحسين الجدول.
Maxfilesize حجم أكبر ملف بعد تحسين الجدول.
استنساخ
حجم sourceTableSize الحجم بالبايت للجدول المصدر في الإصدار المستنسخ.
sourceNumOfFiles عدد الملفات في الجدول المصدر في الإصدار المستنسخ.
numRemovedFiles عدد الملفات التي تمت إزالتها من الجدول الهدف إذا تم استبدال جدول Delta سابق.
تمت إزالةFilesSize الحجم الإجمالي بالبايت للملفات التي تمت إزالتها من الجدول الهدف إذا تم استبدال جدول Delta سابق.
ملفات numCopied عدد الملفات التي تم نسخها إلى الموقع الجديد. 0 للنسخ الضحلة.
حجم ملفات النسخ الحجم الإجمالي بالبايت للملفات التي تم نسخها إلى الموقع الجديد. 0 للنسخ الضحلة.
الاستعادة
tableSizeAfterRestore حجم الجدول بالبايت بعد الاستعادة.
numOfFilesAfterRestore عدد الملفات في الجدول بعد الاستعادة.
numRemovedFiles عدد الملفات التي تمت إزالتها بواسطة عملية الاستعادة.
numRestoredFiles عدد الملفات التي تمت إضافتها نتيجة للاستعادة.
تمت إزالةFilesSize الحجم بالبايت من الملفات التي تمت إزالتها بواسطة الاستعادة.
حجم الملفات المستعادة الحجم بالبايت من الملفات المضافة بواسطة الاستعادة.
فراغ
numDeletedFiles عدد الملفات المحذوفة.
numVacuumedDirectories عدد الدلائل المنسغة.
numFilesToDelete عدد الملفات المراد حذفها.

ما هو السفر عبر الزمن في Delta Lake؟

يدعم Delta Lake time travel الاستعلام عن إصدارات الجدول السابقة استنادا إلى الطابع الزمني أو إصدار الجدول (كما هو مسجل في سجل المعاملات). يمكنك استخدام السفر عبر الزمن لتطبيقات مثل ما يلي:

  • إعادة إنشاء التحليلات أو التقارير أو المخرجات (على سبيل المثال" إخراج نموذج التعلم الآلي). وقد يكون ذلك مفيدًا في تصحيح الأخطاء أو التدقيق، لا سيما في الصناعات الخاضعة للتنظيم.
  • كتابة الاستعلامات المؤقتة المعقدة.
  • إصلاح الأخطاء في بياناتك.
  • توفير عزل اللقطات لمجموعة من الاستعلامات للجداول سريعة التغير.

هام

يتم تحديد إصدارات الجدول التي يمكن الوصول إليها مع السفر عبر الوقت من خلال مجموعة من حد الاستبقاء لملفات سجل المعاملات وتكرار العمليات واستبقاءها VACUUM المحدد. إذا قمت بتشغيل VACUUM يوميا بالقيم الافتراضية، فستتوفر 7 أيام من البيانات للسفر عبر الزمن.

بناء جملة السفر عبر الزمن في دلتا

يمكنك الاستعلام عن جدول Delta مع السفر عبر الوقت عن طريق إضافة عبارة بعد مواصفات اسم الجدول.

  • timestamp_expression يمكن أن يكون أي واحد من:
    • '2018-10-18T22:15:12.013Z'، أي سلسلة يمكن تحويلها إلى طابع زمني
    • cast('2018-10-18 13:36:32 CEST' as timestamp)
    • '2018-10-18'، أي سلسلة تاريخ
    • current_timestamp() - interval 12 hours
    • date_sub(current_date(), 1)
    • أي تعبير آخر يتم تحويله أو يمكن تحويله إلى طابع زمني
  • version هي قيمة طويلة يمكن الحصول عليها من إخراج DESCRIBE HISTORY table_spec.

لا timestamp_expression يمكن ولا version الاستعلامات الفرعية.

يتم قبول سلاسل التاريخ أو الطابع الزمني فقط. على سبيل المثال، "2019-01-01" و "2019-01-01T00:00:00.000Z". راجع التعليمات البرمجية التالية على سبيل المثال بناء الجملة:

SQL

SELECT * FROM people10m TIMESTAMP AS OF '2018-10-18T22:15:12.013Z'
SELECT * FROM delta.`/tmp/delta/people10m` VERSION AS OF 123

Python

df1 = spark.read.option("timestampAsOf", "2019-01-01").table("people10m")
df2 = spark.read.option("versionAsOf", 123).load("/tmp/delta/people10m")

يمكنك أيضا استخدام بناء الجملة @ لتحديد الطابع الزمني أو الإصدار كجزء من اسم الجدول. يجب أن يكون الطابع الزمني بالتنسيق yyyyMMddHHmmssSSS . يمكنك تحديد إصدار بعد @ عن طريق إلحاق بالإصدار v . راجع التعليمات البرمجية التالية على سبيل المثال بناء الجملة:

SQL

SELECT * FROM people10m@20190101000000000
SELECT * FROM people10m@v123

Python

spark.read.table("people10m@20190101000000000")
spark.read.table("people10m@v123")

spark.read.load("/tmp/delta/people10m@20190101000000000")
spark.read.load("/tmp/delta/people10m@v123")

ما هي نقاط التحقق من سجل المعاملات؟

يسجل Delta Lake إصدارات الجدول كملفات JSON داخل _delta_log الدليل، والذي يتم تخزينه جنبا إلى جنب مع بيانات الجدول. لتحسين الاستعلام عن نقاط التحقق، تقوم Delta Lake بتجميع إصدارات الجدول إلى ملفات نقطة التحقق Parquet، مما يمنع الحاجة إلى قراءة جميع إصدارات JSON من محفوظات الجدول. يحسن Azure Databricks تكرار نقاط التفتيش لحجم البيانات وعبء العمل. يجب ألا يحتاج المستخدمون إلى التفاعل مع نقاط التحقق مباشرة. يخضع تكرار نقطة التحقق للتغيير دون إشعار.

تكوين استبقاء البيانات للاستعلامات عن السفر عبر الوقت

للاستعلام عن إصدار جدول سابق، يجب الاحتفاظ بكل من السجل وملفات البيانات لهذا الإصدار.

يتم حذف ملفات البيانات عند VACUUM التشغيل مقابل جدول. تدير Delta Lake إزالة ملف السجل تلقائيا بعد التحقق من إصدارات الجدول.

نظرا لأن معظم جداول Delta قد VACUUM تم تشغيلها عليها بانتظام، يجب أن تحترم الاستعلامات في نقطة زمنية حد الاستبقاء ل VACUUM، وهو 7 أيام بشكل افتراضي.

لزيادة حد استبقاء البيانات لجداول Delta، يجب تكوين خصائص الجدول التالية:

  • delta.logRetentionDuration = "interval <interval>": يتحكم في المدة التي يتم فيها الاحتفاظ بمحفوظات الجدول. الافتراضي هو interval 30 days.
  • delta.deletedFileRetentionDuration = "interval <interval>": يحدد الحد VACUUM الذي يستخدم لإزالة ملفات البيانات التي لم تعد مشار إليها في إصدار الجدول الحالي. الافتراضي هو interval 7 days.

يمكنك تحديد خصائص Delta أثناء إنشاء الجدول أو تعيينها باستخدام عبارة ALTER TABLE . راجع مرجع خصائص جدول Delta.

إشعار

يجب تعيين هاتين الخاصيتين لضمان الاحتفاظ بمحفوظات الجدول لمدة أطول للجداول ذات العمليات المتكررة VACUUM . على سبيل المثال، للوصول إلى 30 يوما من البيانات التاريخية، قم بتعيين delta.deletedFileRetentionDuration = "interval 30 days" (الذي يطابق الإعداد الافتراضي ل delta.logRetentionDuration).

يمكن أن تؤدي زيادة حد استبقاء البيانات إلى ارتفاع تكاليف التخزين الخاصة بك، حيث يتم الاحتفاظ بمزيد من ملفات البيانات.

استعادة جدول Delta إلى حالة سابقة

يمكنك استعادة جدول Delta إلى حالته السابقة باستخدام RESTORE الأمر . يحتفظ جدول Delta داخليا بالإصدارات التاريخية من الجدول التي تمكنه من استعادته إلى حالة سابقة. يتم دعم الإصدار المقابل للحالة السابقة أو الطابع الزمني لوقت إنشاء الحالة السابقة كخيارات RESTORE بواسطة الأمر .

هام

  • يمكنك استعادة جدول تمت استعادته بالفعل.
  • يمكنك استعادة جدول مستنسخ .
  • يجب أن يكون لديك MODIFY إذن على الجدول الذي يتم استعادته.
  • لا يمكنك استعادة جدول إلى إصدار أقدم حيث تم حذف ملفات البيانات يدويا أو بواسطة vacuum. لا يزال الاستعادة إلى هذا الإصدار ممكنا جزئيا إذا spark.sql.files.ignoreMissingFiles تم تعيين إلى true.
  • تنسيق الطابع الزمني للاستعادة إلى حالة سابقة هو yyyy-MM-dd HH:mm:ss. يتم أيضا دعم توفير سلسلة تاريخ (yyyy-MM-dd) تاريخ فقط.
RESTORE TABLE db.target_table TO VERSION AS OF <version>
RESTORE TABLE delta.`/data/target/` TO TIMESTAMP AS OF <timestamp>

للحصول على تفاصيل بناء الجملة، راجع استعادة.

هام

تعتبر الاستعادة عملية تغيير البيانات. تحتوي إدخالات سجل Delta Lake المضافة بواسطة RESTORE الأمر على dataChange المعينة إلى true. إذا كان هناك تطبيق انتقال البيانات من الخادم، مثل مهمة التدفق المنظم التي تعالج التحديثات إلى جدول Delta Lake، فإن إدخالات سجل تغيير البيانات التي تمت إضافتها بواسطة عملية الاستعادة تعتبر تحديثات بيانات جديدة، وقد تؤدي معالجتها إلى بيانات مكررة.

على سبيل المثال:

إصدار الجدول العملية تحديثات سجل دلتا السجلات في تحديثات سجل تغيير البيانات
0 إدراج AddFile(/path/to/file-1, dataChange = true) (الاسم = فيكتور، العمر = 29، (الاسم = جورج، العمر = 55)
1 إدراج AddFile(/path/to/file-2, dataChange = true) (الاسم = جورج، العمر = 39)
2 تحسين AddFile(/path/to/file-3, dataChange = false), RemoveFile(/path/to/file-1), RemoveFile(/path/to/file-2) (لا توجد سجلات مثل تحسين الضغط لا يغير البيانات في الجدول)
3 RESTORE(version=1) RemoveFile(/path/to/file-3), AddFile(/path/to/file-1, dataChange = true), AddFile(/path/to/file-2, dataChange = true) (الاسم = فيكتور، العمر = 29)، (الاسم = جورج، العمر = 55)، (الاسم = جورج، العمر = 39)

في المثال السابق، RESTORE ينتج عن الأمر تحديثات تمت رؤيتها بالفعل عند قراءة إصدار جدول Delta 0 و1. إذا كان الاستعلام المتدفق يقرأ هذا الجدول، فسيتم اعتبار هذه الملفات كبيانات تمت إضافتها حديثا وستتم معالجتها مرة أخرى.

استعادة المقاييس

RESTORE يبلغ عن المقاييس التالية ك DataFrame صف واحد بمجرد اكتمال العملية:

  • table_size_after_restore: حجم الجدول بعد الاستعادة.

  • num_of_files_after_restore: عدد الملفات في الجدول بعد الاستعادة.

  • num_removed_files: عدد الملفات التي تمت إزالتها (تم حذفها منطقيا) من الجدول.

  • num_restored_files: عدد الملفات التي تمت استعادتها بسبب التراجع.

  • removed_files_size: الحجم الإجمالي بالبايت للملفات التي تمت إزالتها من الجدول.

  • restored_files_size: الحجم الإجمالي بالبايت للملفات التي تمت استعادتها.

    مثال على استعادة المقاييس

أمثلة على استخدام السفر عبر الزمن في Delta Lake

  • إصلاح عمليات الحذف العرضية إلى جدول للمستخدم 111:

    INSERT INTO my_table
      SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
      WHERE userId = 111
    
  • إصلاح التحديثات غير الصحيحة العرضية لجدول:

    MERGE INTO my_table target
      USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
      ON source.userId = target.userId
      WHEN MATCHED THEN UPDATE SET *
    
  • الاستعلام عن عدد العملاء الجدد الذين تمت إضافتهم خلال الأسبوع الماضي.

    SELECT count(distinct userId)
    FROM my_table  - (
      SELECT count(distinct userId)
      FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
    

كيف أعمل العثور على إصدار التثبيت الأخير في جلسة Spark؟

للحصول على رقم إصدار آخر تثبيت مكتوب بواسطة الحالي SparkSession عبر جميع مؤشرات الترابط وجميع الجداول، استعلم عن تكوين spark.databricks.delta.lastCommitVersionInSessionSQL .

SQL

SET spark.databricks.delta.lastCommitVersionInSession

Python

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

Scala

spark.conf.get("spark.databricks.delta.lastCommitVersionInSession")

إذا لم يتم إجراء أي عمليات تثبيت بواسطة SparkSession، فإن الاستعلام عن المفتاح يرجع قيمة فارغة.

إشعار

إذا كنت تشارك نفس الشيء SparkSession عبر مؤشرات ترابط متعددة، فإنه يشبه مشاركة متغير عبر مؤشرات ترابط متعددة؛ قد تصل إلى شروط السباق حيث يتم تحديث قيمة التكوين بشكل متزامن.