البرنامج التعليمي: Delta Lake

يقدم هذا البرنامج التعليمي عمليات Delta Lake الشائعة على Azure Databricks، بما في ذلك ما يلي:

يمكنك تشغيل مثال التعليمات البرمجية Python وR وSc scala وSQL في هذه المقالة من داخل دفتر ملاحظات مرفق بمجموعة Azure Databricks. يمكنك أيضا تشغيل التعليمات البرمجية SQL في هذه المقالة من داخل استعلام مقترن بمستودع SQL في Databricks SQL.

إشعار

تستخدم بعض أمثلة التعليمات البرمجية التالية رمز مساحة اسم من مستويين يتكون من مخطط (يسمى أيضا قاعدة بيانات) وجدول أو طريقة عرض (على سبيل المثال، default.people10m). لاستخدام هذه الأمثلة مع كتالوج Unity، استبدل مساحة الاسم ثنائية المستوى بإشارة مساحة الاسم ثلاثية المستوى في كتالوج Unity التي تتكون من كتالوج ومخطط وجدول أو طريقة عرض (على سبيل المثال، main.default.people10m).

إنشاء جدول

تستخدم جميع الجداول التي تم إنشاؤها على Azure Databricks Delta Lake بشكل افتراضي.

إشعار

Delta Lake هو الافتراضي لجميع أوامر القراءة والكتابة وإنشاء الجدول Azure Databricks.

Python

# Load the data from its source.
df = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")

# Write the data to a table.
table_name = "people_10m"
df.write.saveAsTable(table_name)

R

library(SparkR)
sparkR.session()

# Load the data from its source.
df = read.df(path = "/databricks-datasets/learning-spark-v2/people/people-10m.delta")

# Write the data to a table.
table_name = "people_10m"

saveAsTable(
  df = df,
  tableName = table_name
)

التطوير

// Load the data from its source.
val people = spark.read.load("/databricks-datasets/learning-spark-v2/people/people-10m.delta")

// Write the data to a table.
val table_name = "people_10m"

people.write.saveAsTable("people_10m")

SQL

DROP TABLE IF EXISTS people_10m;

CREATE TABLE IF NOT EXISTS people_10m
AS SELECT * FROM delta.`/databricks-datasets/learning-spark-v2/people/people-10m.delta`;

تنشئ العمليات السابقة جدولا مدارا جديدا باستخدام المخطط الذي تم استنتاجه من البيانات. للحصول على معلومات حول الخيارات المتوفرة عند إنشاء جدول دلتا، راجع إنشاء جدول.

بالنسبة للجداول المدارة، يحدد Azure Databricks موقع البيانات. للحصول على الموقع، يمكنك استخدام عبارة وصف التفاصيل ، على سبيل المثال:

Python

display(spark.sql('DESCRIBE DETAIL people_10m'))

R

display(sql("DESCRIBE DETAIL people_10m"))

التطوير

display(spark.sql("DESCRIBE DETAIL people_10m"))

SQL

DESCRIBE DETAIL people_10m;

قد ترغب في بعض الأحيان في إنشاء جدول عن طريق تحديد المخطط قبل إدراج البيانات. يمكنك إكمال ذلك باستخدام أوامر SQL التالية:

CREATE TABLE IF NOT EXISTS people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)

CREATE OR REPLACE TABLE people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
)

في Databricks Runtime 13.0 والإصدارات الأحدث، يمكنك استخدام CREATE TABLE LIKE لإنشاء جدول Delta فارغ جديد يكرر خصائص المخطط والجدول لجدول دلتا المصدر. يمكن أن يكون هذا مفيدا بشكل خاص عند ترقية الجداول من بيئة تطوير إلى إنتاج، كما هو الحال في مثال التعليمات البرمجية التالي:

CREATE TABLE prod.people10m LIKE dev.people10m

يمكنك أيضا استخدام DeltaTableBuilder واجهة برمجة التطبيقات في Delta Lake لإنشاء جداول. مقارنة بواجهات برمجة تطبيقات DataFrameWriter، تسهل واجهة برمجة التطبيقات هذه تحديد معلومات إضافية مثل تعليقات الأعمدة وخصائص الجدول والأعمدة التي تم إنشاؤها.

هام

هذه الميزة في المعاينة العامة.

Python

# Create table in the metastore
DeltaTable.createIfNotExists(spark) \
  .tableName("default.people10m") \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .execute()

# Create or replace table with path and add properties
DeltaTable.createOrReplace(spark) \
  .addColumn("id", "INT") \
  .addColumn("firstName", "STRING") \
  .addColumn("middleName", "STRING") \
  .addColumn("lastName", "STRING", comment = "surname") \
  .addColumn("gender", "STRING") \
  .addColumn("birthDate", "TIMESTAMP") \
  .addColumn("ssn", "STRING") \
  .addColumn("salary", "INT") \
  .property("description", "table with people data") \
  .location("/tmp/delta/people10m") \
  .execute()

التطوير

// Create table in the metastore
DeltaTable.createOrReplace(spark)
  .tableName("default.people10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

// Create or replace table with path and add properties
DeltaTable.createOrReplace(spark)
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .property("description", "table with people data")
  .location("/tmp/delta/people10m")
  .execute()

Upsert إلى جدول

لدمج مجموعة من التحديثات والإدخالات في جدول Delta موجود، يمكنك استخدام عبارة MERGE INTO . على سبيل المثال، تأخذ العبارة التالية البيانات من الجدول المصدر وتدمجها في جدول Delta الهدف. عند وجود صف مطابق في كلا الجدولين، يقوم Delta Lake بتحديث عمود البيانات باستخدام التعبير المحدد. عندما لا يوجد صف مطابق، يضيف Delta Lake صفا جديدا. تعرف هذه العملية باسم upsert.

CREATE OR REPLACE TEMP VIEW people_updates (
  id, firstName, middleName, lastName, gender, birthDate, ssn, salary
) AS VALUES
  (9999998, 'Billy', 'Tommie', 'Luppitt', 'M', '1992-09-17T04:00:00.000+0000', '953-38-9452', 55250),
  (9999999, 'Elias', 'Cyril', 'Leadbetter', 'M', '1984-05-22T04:00:00.000+0000', '906-51-2137', 48500),
  (10000000, 'Joshua', 'Chas', 'Broggio', 'M', '1968-07-22T04:00:00.000+0000', '988-61-6247', 90000),
  (20000001, 'John', '', 'Doe', 'M', '1978-01-14T04:00:00.000+000', '345-67-8901', 55500),
  (20000002, 'Mary', '', 'Smith', 'F', '1982-10-29T01:00:00.000+000', '456-78-9012', 98250),
  (20000003, 'Jane', '', 'Doe', 'F', '1981-06-25T04:00:00.000+000', '567-89-0123', 89900);

MERGE INTO people_10m
USING people_updates
ON people_10m.id = people_updates.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

إذا قمت بتحديد *، فإن هذا يحدث أو يدرج كافة الأعمدة في الجدول الهدف. يفترض هذا أن الجدول المصدر يحتوي على نفس الأعمدة الموجودة في الجدول الهدف، وإلا سيطرح الاستعلام خطأ تحليل.

يجب تحديد قيمة لكل عمود في الجدول عند تنفيذ INSERT عملية (على سبيل المثال، عند عدم وجود صف مطابق في مجموعة البيانات الموجودة). ومع ذلك، لا تحتاج إلى تحديث كافة القيم.

لمشاهدة النتائج، استعلم عن الجدول.

SELECT * FROM people_10m WHERE id >= 9999998

قراءة جدول

يمكنك الوصول إلى البيانات في جداول Delta حسب اسم الجدول أو مسار الجدول، كما هو موضح في الأمثلة التالية:

Python

people_df = spark.read.table(table_name)

display(people_df)

## or

people_df = spark.read.load(table_path)

display(people_df)

R

people_df = tableToDF(table_name)

display(people_df)

التطوير

val people_df = spark.read.table(table_name)

display(people_df)

\\ or

val people_df = spark.read.load(table_path)

display(people_df)

SQL

SELECT * FROM people_10m;

SELECT * FROM delta.`<path-to-table`;

الكتابة إلى جدول

يستخدم Delta Lake بناء الجملة القياسي لكتابة البيانات إلى الجداول.

لإضافة بيانات جديدة تلقائيا إلى جدول Delta موجود، استخدم append الوضع كما في الأمثلة التالية:

SQL

INSERT INTO people10m SELECT * FROM more_people

Python

df.write.mode("append").saveAsTable("people10m")

التطوير

df.write.mode("append").saveAsTable("people10m")

لاستبدال كافة البيانات في جدول بشكل تلقائي، استخدم overwrite الوضع كما في الأمثلة التالية:

SQL

INSERT OVERWRITE TABLE people10m SELECT * FROM more_people

Python

df.write.mode("overwrite").saveAsTable("people10m")

التطوير

df.write.mode("overwrite").saveAsTable("people10m")

تحديث جدول

يمكنك تحديث البيانات التي تطابق دالة تقييم في جدول Delta. على سبيل المثال، في جدول مسمى people10m أو مسار في /tmp/delta/people-10m، لتغيير اختصار في gender العمود من M أو F إلى Male أو Female، يمكنك تشغيل ما يلي:

SQL

UPDATE people10m SET gender = 'Female' WHERE gender = 'F';
UPDATE people10m SET gender = 'Male' WHERE gender = 'M';

UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Female' WHERE gender = 'F';
UPDATE delta.`/tmp/delta/people-10m` SET gender = 'Male' WHERE gender = 'M';

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.update(
  condition = "gender = 'F'",
  set = { "gender": "'Female'" }
)

# Declare the predicate by using Spark SQL functions.
deltaTable.update(
  condition = col('gender') == 'M',
  set = { 'gender': lit('Male') }
)

التطوير

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.updateExpr(
  "gender = 'F'",
  Map("gender" -> "'Female'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.update(
  col("gender") === "M",
  Map("gender" -> lit("Male")));

حذف من جدول

يمكنك إزالة البيانات التي تطابق دالة تقييم من جدول Delta. على سبيل المثال، في جدول يسمى people10m أو مسار في /tmp/delta/people-10m، لحذف جميع الصفوف المقابلة للأشخاص الذين يعانون من قيمة في birthDate العمود من قبل 1955، يمكنك تشغيل ما يلي:

SQL

DELETE FROM people10m WHERE birthDate < '1955-01-01'

DELETE FROM delta.`/tmp/delta/people-10m` WHERE birthDate < '1955-01-01'

Python

from delta.tables import *
from pyspark.sql.functions import *

deltaTable = DeltaTable.forPath(spark, '/tmp/delta/people-10m')

# Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

# Declare the predicate by using Spark SQL functions.
deltaTable.delete(col('birthDate') < '1960-01-01')

التطوير

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta/people-10m")

// Declare the predicate by using a SQL-formatted string.
deltaTable.delete("birthDate < '1955-01-01'")

import org.apache.spark.sql.functions._
import spark.implicits._

// Declare the predicate by using Spark SQL functions and implicits.
deltaTable.delete(col("birthDate") < "1955-01-01")

هام

delete يزيل البيانات من أحدث إصدار من جدول Delta ولكن لا يزيلها من التخزين الفعلي حتى يتم تفريغ الإصدارات القديمة بشكل صريح. راجع الفراغ للحصول على التفاصيل.

عرض محفوظات الجدول

لعرض محفوظات جدول، استخدم عبارة DESCRIBE HISTORY ، التي توفر معلومات المصدر، بما في ذلك إصدار الجدول والعملية والمستخدم وما إلى ذلك، لكل كتابة إلى جدول.

DESCRIBE HISTORY people_10m

الاستعلام عن إصدار سابق من الجدول (السفر عبر الزمن)

يسمح لك السفر عبر الوقت في Delta Lake بالاستعلام عن لقطة قديمة لجدول Delta.

للاستعلام عن إصدار أقدم من جدول، حدد إصدارا أو طابعا زمنيا في عبارة SELECT . على سبيل المثال، للاستعلام عن الإصدار 0 من المحفوظات أعلاه، استخدم:

SELECT * FROM people_10m VERSION AS OF 0

أو

SELECT * FROM people_10m TIMESTAMP AS OF '2019-01-29 00:37:58'

بالنسبة للطوابع الزمنية، يتم قبول سلاسل التاريخ أو الطابع الزمني فقط، على سبيل المثال، "2019-01-01" و "2019-01-01'T'00:00:00.000Z".

تسمح لك خيارات DataFrameReader بإنشاء DataFrame من جدول Delta الذي تم إصلاحه إلى إصدار معين من الجدول، على سبيل المثال في Python:

df1 = spark.read.format('delta').option('timestampAsOf', '2019-01-01').table("people_10m")

display(df1)

أو، بالتناوب:

df2 = spark.read.format('delta').option('versionAsOf', 0).table("people_10m")

display(df2)

للحصول على التفاصيل، راجع العمل مع محفوظات جدول Delta Lake.

تحسين جدول

بمجرد إجراء تغييرات متعددة على جدول، قد يكون لديك الكثير من الملفات الصغيرة. لتحسين سرعة قراءة الاستعلامات، يمكنك استخدام OPTIMIZE لطي الملفات الصغيرة إلى ملفات أكبر:

OPTIMIZE people_10m

ترتيب Z حسب الأعمدة

لتحسين أداء القراءة بشكل أكبر، يمكنك المشاركة في تحديد موقع المعلومات ذات الصلة في نفس مجموعة الملفات بواسطة Z-Ordering. يتم استخدام هذه المنطقة المشتركة تلقائيا بواسطة خوارزميات تخطي بيانات Delta Lake لتقليل كمية البيانات التي تحتاج إلى قراءة بشكل كبير. إلى بيانات Z-Order، يمكنك تحديد الأعمدة التي تريد ترتيبها في العبارة ZORDER BY . على سبيل المثال، للمشاركة في تحديد موقع بواسطة gender، قم بتشغيل:

OPTIMIZE people_10m
ZORDER BY (gender)

للحصول على المجموعة الكاملة من الخيارات المتوفرة عند تشغيل OPTIMIZE، راجع ملفات البيانات المضغوطة مع التحسين على Delta Lake.

تنظيف اللقطات باستخدام VACUUM

يوفر Delta Lake عزل اللقطة للقراءات، ما يعني أنه من الآمن التشغيل OPTIMIZE حتى أثناء قيام المستخدمين الآخرين أو الوظائف بالاستعلام عن الجدول. ومع ذلك، في النهاية، يجب تنظيف اللقطات القديمة. يمكنك القيام بذلك عن طريق تشغيل VACUUM الأمر :

VACUUM people_10m

للحصول على تفاصيل حول استخدام VACUUM بشكل فعال، راجع إزالة ملفات البيانات غير المستخدمة باستخدام فراغ.