تشغيل حمل عمل ETL الأول على Azure Databricks

تعرف على كيفية استخدام أدوات جاهزة للإنتاج من Azure Databricks لتطوير ونشر أول مسارات استخراج وتحويل وتحميل (ETL) لتنسيق البيانات.

بنهاية هذه المقالة، سوف تشعر بالراحة:

  1. بدء تشغيل نظام مجموعة حوسبة لجميع الأغراض في Databricks.
  2. إنشاء دفتر ملاحظات Databricks.
  3. تكوين استيعاب البيانات المتزايدة إلى Delta Lake باستخدام أداة التحميل التلقائي.
  4. تنفيذ خلايا دفتر الملاحظات لمعالجة البيانات والاستعلام عنها ومعاينة البيانات.
  5. جدولة دفتر ملاحظات كوظيفة Databricks.

يستخدم هذا البرنامج التعليمي دفاتر ملاحظات تفاعلية لإكمال مهام ETL الشائعة في Python أو Scala.

يمكنك أيضا استخدام Delta Live Tables لإنشاء مسارات ETL. أنشأت Databricks Delta Live Tables لتقليل تعقيد بناء وتوزيع وصيانة خطوط أنابيب ETL للإنتاج. راجع البرنامج التعليمي: تشغيل خط أنابيب Delta Live Tables الأول.

يمكنك أيضا استخدام موفر Databricks Terraform لإنشاء موارد هذه المقالة. راجع إنشاء أنظمة مجموعات ودفاتر ملاحظات ومهام باستخدام Terraform.

المتطلبات

إشعار

إذا لم يكن لديك امتيازات التحكم في نظام المجموعة، فلا يزال بإمكانك إكمال معظم الخطوات أدناه طالما لديك حق الوصول إلى نظام مجموعة.

الخطوة 1: إنشاء نظام مجموعة

للقيام بتحليل البيانات الاستكشافية وهندسة البيانات، قم بإنشاء نظام مجموعة لتوفير موارد الحوسبة اللازمة لتنفيذ الأوامر.

  1. انقر فوق أيقونة الحسابحساب في الشريط الجانبي.
  2. في صفحة Compute، انقر فوق Create Cluster. يؤدي ذلك إلى فتح صفحة New Cluster.
  3. حدد اسما فريدا للمجموعة، واترك القيم المتبقية في حالتها الافتراضية، وانقر فوق Create Cluster.

لمعرفة المزيد حول مجموعات Databricks، راجع الحساب.

الخطوة 2: إنشاء دفتر ملاحظات Databricks

لبدء كتابة التعليمات البرمجية التفاعلية وتنفيذها على Azure Databricks، قم بإنشاء دفتر ملاحظات.

  1. انقر فوق أيقونة جديدةجديد في الشريط الجانبي، ثم انقر فوق دفتر الملاحظات.
  2. في صفحة إنشاء دفتر ملاحظات:
    • حدد اسما فريدا لدفتر الملاحظات.
    • تأكد من تعيين اللغة الافتراضية إلى Python أو Scala.
    • حدد المجموعة التي أنشأتها في الخطوة 1 من القائمة المنسدلة Cluster .
    • انقر فوق Create.

يتم فتح دفتر ملاحظات مع وجود خلية فارغة في الأعلى.

لمعرفة المزيد حول إنشاء دفاتر الملاحظات وإدارتها، راجع إدارة دفاتر الملاحظات.

الخطوة 3: تكوين أداة التحميل التلقائي لاستيعاب البيانات في Delta Lake

توصي Databricks باستخدام أداة التحميل التلقائي لاستيعاب البيانات المتزايدة. يقوم "التحميل التلقائي" تلقائيا بالكشف عن الملفات الجديدة ومعالجتها عند وصولها إلى تخزين كائن السحابة.

توصي Databricks بتخزين البيانات باستخدام Delta Lake. Delta Lake هي طبقة تخزين مصدر مفتوح توفر معاملات ACID وتمكن مستودع البيانات. Delta Lake هو التنسيق الافتراضي للجداول التي تم إنشاؤها في Databricks.

لتكوين أداة التحميل التلقائي لاستيعاب البيانات في جدول Delta Lake، انسخ التعليمات البرمجية التالية والصقها في الخلية الفارغة في دفتر ملاحظاتك:

Python

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"

# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .toTable(table_name))

Scala

// Imports
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._

// Define variables used in code below
val file_path = "/databricks-datasets/structured-streaming/events"
val username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first.get(0)
val table_name = s"${username}_etl_quickstart"
val checkpoint_path = s"/tmp/${username}/_checkpoint"

// Clear out data from previous demo execution
spark.sql(s"DROP TABLE IF EXISTS ${table_name}")
dbutils.fs.rm(checkpoint_path, true)

// Configure Auto Loader to ingest JSON data to a Delta table
spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select($"*", $"_metadata.file_path".as("source_file"), current_timestamp.as("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(Trigger.AvailableNow)
  .toTable(table_name)

إشعار

يجب أن تسمح لك المتغيرات المحددة في هذه التعليمة البرمجية بتنفيذها بأمان دون خطر التعارض مع أصول مساحة العمل الموجودة أو المستخدمين الآخرين. ستؤدي أذونات الشبكة أو التخزين المقيدة إلى حدوث أخطاء عند تنفيذ هذه التعليمة البرمجية؛ اتصل بمسؤول مساحة العمل لاستكشاف هذه القيود وإصلاحها.

لمعرفة المزيد حول "المحمل التلقائي"، راجع ما هو المحمل التلقائي؟.

الخطوة 4: معالجة البيانات والتفاعل معها

تنفذ دفاتر الملاحظات خلية منطقية بخلية. لتنفيذ المنطق في الخلية الخاصة بك:

  1. لتشغيل الخلية التي أكملتها في الخطوة السابقة، حدد الخلية واضغط على SHIFT+ENTER.

  2. للاستعلام عن الجدول الذي أنشأته للتو، انسخ التعليمات البرمجية التالية والصقها في خلية فارغة، ثم اضغط على SHIFT+ENTER لتشغيل الخلية.

    Python

    df = spark.read.table(table_name)
    

    Scala

    val df = spark.read.table(table_name)
    
  3. لمعاينة البيانات في DataFrame، انسخ التعليمات البرمجية التالية والصقها في خلية فارغة، ثم اضغط على SHIFT+ENTER لتشغيل الخلية.

    Python

    display(df)
    

    Scala

    display(df)
    

لمعرفة المزيد حول الخيارات التفاعلية لتصور البيانات، راجع المرئيات في دفاتر ملاحظات Databricks.

الخطوة 5: جدولة وظيفة

يمكنك تشغيل دفاتر ملاحظات Databricks كنصوص إنتاج عن طريق إضافتها كمهمة في مهمة Databricks. في هذه الخطوة، ستقوم بإنشاء وظيفة جديدة يمكنك تشغيلها يدويا.

لجدولة دفتر الملاحظات كمهمة:

  1. انقر فوق جدولة على الجانب الأيسر من شريط الرأس.
  2. أدخل اسما فريدا لاسم الوظيفة.
  3. انقر فوق يدوي.
  4. في القائمة المنسدلة Cluster ، حدد نظام المجموعة الذي أنشأته في الخطوة 1.
  5. انقر فوق Create.
  6. في النافذة التي تظهر، انقر فوق تشغيل الآن.
  7. لمشاهدة نتائج تشغيل المهمة، انقر فوق الأيقونة ارتباط خارجي الموجودة بجانب الطابع الزمني آخر تشغيل .

لمزيد من المعلومات حول الوظائف، راجع ما هي وظائف Azure Databricks؟.

عمليات تكامل إضافية

تعرف على المزيد حول عمليات التكامل والأدوات لهندسة البيانات مع Azure Databricks: