بدء استخدام COPY INTO لتحميل البيانات

COPY INTO يتيح لك أمر SQL تحميل البيانات من موقع ملف إلى جدول Delta. هذه عملية قابلة لإعادة الفرز ومكررة؛ يتم تخطي الملفات الموجودة في الموقع المصدر الذي تم تحميله بالفعل.

COPY INTO يوفر الإمكانات التالية:

  • تصفية الملفات أو الدليل القابلة للتكوين بسهولة من التخزين السحابي، بما في ذلك وحدات تخزين S3 وADS Gen2 وABFS وGCS ووحدات تخزين كتالوج Unity.
  • دعم تنسيقات الملفات المصدر المتعددة: CSV وJSON وXML وAvro وORC وParquet والنصوص والملفات الثنائية
  • معالجة الملفات مرة واحدة (غير متكررة) بشكل افتراضي
  • استنتاج مخطط الجدول الهدف وتعيينه ودمجه وتطوره

إشعار

للحصول على تجربة استيعاب ملفات أكثر قابلية للتطوير وقوة، توصي Databricks بأن يستفيد مستخدمو SQL من جداول الدفق. راجع تحميل البيانات باستخدام جداول الدفق في Databricks SQL.

تحذير

COPY INTO يحترم إعداد مساحة العمل لمتجهات الحذف. إذا تم تمكينها، يتم تمكين متجهات الحذف على الجدول الهدف عند COPY INTO التشغيل على مستودع SQL أو حساب تشغيل Databricks Runtime 14.0 أو أعلى. بمجرد التمكين، تحظر متجهات الحذف الاستعلامات مقابل جدول في Databricks Runtime 11.3 LTS وما دونه. راجع ما هي متجهات الحذف؟ والتمكين التلقائي لنواقل الحذف.

المتطلبات

يجب أن يتبع مسؤول الحساب الخطوات الواردة في تكوين الوصول إلى البيانات لاستيعابها لتكوين الوصول إلى البيانات في تخزين كائن السحابة قبل أن يتمكن المستخدمون من تحميل البيانات باستخدام COPY INTO.

مثال: تحميل البيانات في جدول Delta Lake بلا مخطط

إشعار

تتوفر هذه الميزة في Databricks Runtime 11.3 LTS وما فوق.

يمكنك إنشاء جداول دلتا العنصر النائب الفارغة بحيث يتم استنتاج المخطط لاحقا أثناء أمر COPY INTO عن طريق تعيين mergeSchema إلى true في COPY_OPTIONS:

CREATE TABLE IF NOT EXISTS my_table
[COMMENT <table-description>]
[TBLPROPERTIES (<table-properties>)];

COPY INTO my_table
FROM '/path/to/files'
FILEFORMAT = <format>
FORMAT_OPTIONS ('mergeSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');

عبارة SQL أعلاه متكررة ويمكن جدولتها للتشغيل لاستيعاب البيانات مرة واحدة بالضبط في جدول Delta.

إشعار

جدول Delta الفارغ غير قابل للاستخدام خارج COPY INTO. INSERT INTO ولا MERGE INTO يتم اعتماد لكتابة البيانات في جداول Delta بلا مخطط. بعد إدراج البيانات في الجدول باستخدام COPY INTO، يصبح الجدول قابلا للاستعلام.

راجع إنشاء جداول الهدف ل COPY INTO.

مثال: تعيين المخطط وتحميل البيانات في جدول Delta Lake

يوضح المثال التالي كيفية إنشاء جدول Delta ثم استخدام COPY INTO الأمر SQL لتحميل نموذج البيانات من مجموعات بيانات Databricks في الجدول. يمكنك تشغيل مثال التعليمات البرمجية Python أو R أو Scala أو SQL من دفتر ملاحظات مرفق بمجموعة Azure Databricks. يمكنك أيضا تشغيل التعليمات البرمجية SQL من استعلام مقترن بمستودع SQL في Databricks SQL.

SQL

DROP TABLE IF EXISTS default.loan_risks_upload;

CREATE TABLE default.loan_risks_upload (
  loan_id BIGINT,
  funded_amnt INT,
  paid_amnt DOUBLE,
  addr_state STRING
);

COPY INTO default.loan_risks_upload
FROM '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
FILEFORMAT = PARQUET;

SELECT * FROM default.loan_risks_upload;

-- Result:
-- +---------+-------------+-----------+------------+
-- | loan_id | funded_amnt | paid_amnt | addr_state |
-- +=========+=============+===========+============+
-- | 0       | 1000        | 182.22    | CA         |
-- +---------+-------------+-----------+------------+
-- | 1       | 1000        | 361.19    | WA         |
-- +---------+-------------+-----------+------------+
-- | 2       | 1000        | 176.26    | TX         |
-- +---------+-------------+-----------+------------+
-- ...

Python

table_name = 'default.loan_risks_upload'
source_data = '/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet'
source_format = 'PARQUET'

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" \
  "loan_id BIGINT, " + \
  "funded_amnt INT, " + \
  "paid_amnt DOUBLE, " + \
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name + \
  " FROM '" + source_data + "'" + \
  " FILEFORMAT = " + source_format
)

loan_risks_upload_data = spark.sql("SELECT * FROM " + table_name)

display(loan_risks_upload_data)

'''
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
'''

R

library(SparkR)
sparkR.session()

table_name = "default.loan_risks_upload"
source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
source_format = "PARQUET"

sql(paste("DROP TABLE IF EXISTS ", table_name, sep = ""))

sql(paste("CREATE TABLE ", table_name, " (",
  "loan_id BIGINT, ",
  "funded_amnt INT, ",
  "paid_amnt DOUBLE, ",
  "addr_state STRING)",
  sep = ""
))

sql(paste("COPY INTO ", table_name,
  " FROM '", source_data, "'",
  " FILEFORMAT = ", source_format,
  sep = ""
))

loan_risks_upload_data = tableToDF(table_name)

display(loan_risks_upload_data)

# Result:
# +---------+-------------+-----------+------------+
# | loan_id | funded_amnt | paid_amnt | addr_state |
# +=========+=============+===========+============+
# | 0       | 1000        | 182.22    | CA         |
# +---------+-------------+-----------+------------+
# | 1       | 1000        | 361.19    | WA         |
# +---------+-------------+-----------+------------+
# | 2       | 1000        | 176.26    | TX         |
# +---------+-------------+-----------+------------+
# ...

Scala

val table_name = "default.loan_risks_upload"
val source_data = "/databricks-datasets/learning-spark-v2/loans/loan-risks.snappy.parquet"
val source_format = "PARQUET"

spark.sql("DROP TABLE IF EXISTS " + table_name)

spark.sql("CREATE TABLE " + table_name + " (" +
  "loan_id BIGINT, " +
  "funded_amnt INT, " +
  "paid_amnt DOUBLE, " +
  "addr_state STRING)"
)

spark.sql("COPY INTO " + table_name +
  " FROM '" + source_data + "'" +
  " FILEFORMAT = " + source_format
)

val loan_risks_upload_data = spark.table(table_name)

display(loan_risks_upload_data)

/*
Result:
+---------+-------------+-----------+------------+
| loan_id | funded_amnt | paid_amnt | addr_state |
+=========+=============+===========+============+
| 0       | 1000        | 182.22    | CA         |
+---------+-------------+-----------+------------+
| 1       | 1000        | 361.19    | WA         |
+---------+-------------+-----------+------------+
| 2       | 1000        | 176.26    | TX         |
+---------+-------------+-----------+------------+
...
*/

للتنظيف، قم بتشغيل التعليمات البرمجية التالية، والتي تحذف الجدول:

Python

spark.sql("DROP TABLE " + table_name)

R

sql(paste("DROP TABLE ", table_name, sep = ""))

Scala

spark.sql("DROP TABLE " + table_name)

SQL

DROP TABLE default.loan_risks_upload

المرجع

  • Databricks Runtime 7.x وما فوق: COPY INTO

الموارد الإضافية