Выполнение первой рабочей нагрузки извлечения, преобразования и загрузки в Azure Databricks

Узнайте, как использовать готовые к производству средства из Azure Databricks для разработки и развертывания первых конвейеров извлечения, преобразования и загрузки (ETL) для оркестрации данных.

К концу этой статьи вы сможете свободно выполнять следующие операции:

  1. Запуск вычислительного кластера Databricks для всех целей.
  2. Создание записной книжки Databricks.
  3. Настройку добавочного приема данных в Delta Lake с помощью автозагрузчика.
  4. Выполнение ячеек записной книжки для обработки, запроса и предварительного просмотра данных.
  5. Планирование записной книжки в качестве задания Databricks.

В этом учебнике используются интерактивные записные книжки для выполнения распространенных задач ETL в Python или Scala.

Вы также можете использовать разностные динамические таблицы для создания конвейеров ETL. Платформа Databricks создала разностные динамические таблицы, чтобы снизить сложность сборки, развертывания и обслуживания рабочих конвейеров ETL. См . руководство. Запуск первого конвейера live tables Delta Live Tables.

Вы также можете использовать поставщик Databricks Terraform для создания ресурсов для этой статьи. См. статью "Создание кластеров, записных книжек и заданий с помощью Terraform".

Требования

Примечание.

Если у вас нет прав управления кластером, вы по-прежнему можете выполнить большинство описанных ниже действий при наличии у вас доступа к кластеру.

Шаг 1. Создание кластера

Чтобы выполнить исследовательский анализ данных и инжиниринг данных, создайте кластер для предоставления вычислительных ресурсов, необходимых для выполнения команд.

  1. На боковой панели щелкните Значок вычисленийВычислительная среда.
  2. На странице "Вычислительная среда" щелкните элемент Создать кластер. Откроется страница создания кластера.
  3. Укажите уникальное имя кластера, оставьте оставшиеся значения в состоянии по умолчанию и щелкните Создать кластер.

Дополнительные сведения о кластерах Databricks см. в статье "Вычисления".

Шаг 2. Создание записной книжки Databricks

Чтобы приступить к написанию и выполнению интерактивного кода в Azure Databricks, создайте записную книжку.

  1. Нажмите кнопку Значок "Создать" на боковой панели и щелкните "Записная книжка".
  2. На странице создания записной книжки:
    • Укажите уникальное имя для записной книжки.
    • Убедитесь, что в качестве языка по умолчанию выбран Python или Scala.
    • Выберите кластер, созданный на шаге 1, в раскрывающемся списке Кластер.
    • Нажмите кнопку Создать.

Записная книжка откроется с пустой ячейкой вверху.

Дополнительные сведения о создании записных книжек и управлении ими см. в статье Управление записными книжками.

Шаг 3. Настройка автозагрузчика для приема данных в Delta Lake

В Databricks рекомендуется использовать Автозагрузчик для добавочного приема данных. Автозагрузчик автоматически определяет и обрабатывает новые файл, когда они поступают в облачное объектное хранилище.

Databricks рекомендует хранить данные с использованием формата Delta Lake. Delta Lake — это уровень хранения открытого кода, который предоставляет транзакции ACID и обеспечивает гибридное решение "хранилище и озеро данных". Delta Lake — это формат по умолчанию для таблиц, созданных в Databricks.

Чтобы настроить автозагрузчик для приема данных в таблицу Delta Lake, скопируйте следующий код в пустую ячейку записной книжки:

Python

# Import functions
from pyspark.sql.functions import col, current_timestamp

# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"

# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .toTable(table_name))

Scala

// Imports
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.streaming.Trigger
import spark.implicits._

// Define variables used in code below
val file_path = "/databricks-datasets/structured-streaming/events"
val username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first.get(0)
val table_name = s"${username}_etl_quickstart"
val checkpoint_path = s"/tmp/${username}/_checkpoint"

// Clear out data from previous demo execution
spark.sql(s"DROP TABLE IF EXISTS ${table_name}")
dbutils.fs.rm(checkpoint_path, true)

// Configure Auto Loader to ingest JSON data to a Delta table
spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select($"*", $"_metadata.file_path".as("source_file"), current_timestamp.as("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(Trigger.AvailableNow)
  .toTable(table_name)

Примечание.

Переменные, определенные в этом коде, должны позволить безопасно выполнять его без риска конфликта с существующими ресурсами рабочей области или другими пользователями. Ограниченные разрешения сети или хранилища приведут к ошибкам при выполнении этого кода; обратитесь к администратору рабочей области, чтобы решить проблему этих ограничений.

Дополнительные сведения об автозагрузчике см. в статье Автозагрузчик.

Шаг 4. Обработка данных и взаимодействие с ними

Записные книжки выполняют логические ячейки последовательно. Для выполнения логики в ячейке:

  1. Чтобы запустить ячейку, выполненную на предыдущем шаге, выберите ячейку и нажмите клавиши SHIFT+ВВОД.

  2. Чтобы запросить только что созданную таблицу, скопируйте следующий код в пустую ячейку, а затем нажмите клавиши SHIFT+ВВОД, чтобы выполнить код в этой ячейке.

    Python

    df = spark.read.table(table_name)
    

    Scala

    val df = spark.read.table(table_name)
    
  3. Чтобы просмотреть данные в только что созданной таблице, скопируйте следующий код в пустую ячейку, а затем нажмите клавиши SHIFT+ВВОД, чтобы выполнить код в этой ячейке.

    Python

    display(df)
    

    Scala

    display(df)
    

Дополнительные сведения об интерактивных параметрах визуализации данных см. в разделе "Визуализации" в записных книжках Databricks.

Шаг 5. Планирование задания

Записные книжки Databricks можно запускать в качестве рабочих сценариев, добавляя их в виде задачи в задание Databricks. В этом шаге вы создадите новое задание, которое можно активировать вручную.

Чтобы запланировать выполнение записной книжки в качестве задачи, выполните следующие действия.

  1. Нажмите Расписание справа от строки заголовка.
  2. Введите уникальное Имя задания.
  3. Выберите Вручную.
  4. В раскрывающемся списке Кластер выберите кластер, созданный на шаге 1.
  5. Нажмите кнопку Создать.
  6. В открывшемся окне нажмите Запустить сейчас.
  7. Чтобы просмотреть результаты выполнения задания, щелкните Внешний канал значок рядом с меткой времени последнего выполнения .

Дополнительные сведения о заданиях см. в статье "Что такое задания Azure Databricks?".

Дополнительные интеграции

Дополнительные сведения об интеграции и средствах для инжиниринга данных с помощью Azure Databricks: