Краткое руководство по разностным динамическим таблицам

Важно!

Эта функция предоставляется в режиме общедоступной предварительной версии. Чтобы запросить доступ, свяжитесь со своим представителем Azure Databricks.

Вы можете легко создать и выполнить разностный конвейер динамических таблиц с помощью Azure Databricks записной книжки. В этой статье показано, как использовать разностный конвейер динамических таблиц для набора данных, содержащего данные Википедии навигации, в:

  • Чтение необработанных данных JSON навигации в таблице.
  • Чтение записей из таблицы необработанных данных и использование разностных таблиц в ожидании для создания новой таблицы, содержащей очищенные данные.
  • Используйте записи из таблицы очищенные данные, чтобы сделать запросы разностных динамических таблиц, которые создают производные наборы данных.

В этом кратком руководстве вы выполните указанные ниже задачи.

  1. Создайте новую записную книжку и добавьте код для реализации конвейера.
  2. Создайте новое задание конвейера с помощью записной книжки.
  3. Запуск обновления задания конвейера.
  4. Просмотр результатов задания конвейера.

Требования

Для запуска конвейера необходимо иметь разрешения на создание кластера . Среда выполнения разностных динамических таблиц создает кластер перед запуском конвейера и завершается ошибкой, если у вас нет нужных разрешений.

Создание записной книжки

Вы можете использовать Пример записной книжки или создать новую записную книжку, чтобы выполнить конвейер разностной динамической таблицы:

  1. Перейдите на целевую страницу Azure Databricks и выберите создать пустую записную книжку.

  2. В диалоговом окне Создание записной книжки введите имя записной книжки и выберите Python или SQL в раскрывающемся меню язык по умолчанию . В качестве значения параметра cluster можно оставить значение по умолчанию. Среда выполнения разностных динамических таблиц создает кластер перед запуском конвейера.

  3. Нажмите кнопку Создать.

  4. Скопируйте пример кода Python или SQL и вставьте его в новую записную книжку. Можно добавить пример кода в одну ячейку записной книжки или несколько ячеек.

    Примечание

    Необходимо запустить конвейер на вкладке разностные динамические таблицы пользовательского интерфейса задания. Если щелкнуть  значок запуска для выполнения конвейера, будет возвращена ошибка.

Пример кода

Python

import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *

json_path = "/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
@dlt.create_table(
  comment="The raw wikipedia click stream dataset, ingested from /databricks-datasets.",
  table_properties={
    "quality": "bronze"
  }
)
def clickstream_raw():
  return (
    spark.read.json(json_path)
  )

@dlt.create_table(
  comment="Wikipedia clickstream dataset with cleaned-up datatypes / column names and quality expectations.",
  table_properties={
    "quality": "silver"
  }
)
@dlt.expect("valid_current_page", "current_page_id IS NOT NULL AND current_page_title IS NOT NULL")
@dlt.expect_or_fail("valid_count", "click_count > 0")
def clickstream_clean():
  return (
    dlt.read("clickstream_raw")
      .withColumn("current_page_id", expr("CAST(curr_id AS INT)"))
      .withColumn("click_count", expr("CAST(n AS INT)"))
      .withColumn("previous_page_id", expr("CAST(prev_id AS INT)"))
      .withColumnRenamed("curr_title", "current_page_title")
      .withColumnRenamed("prev_title", "previous_page_title")
      .select("current_page_id", "current_page_title", "click_count", "previous_page_id", "previous_page_title")
  )

@dlt.create_table(
  comment="A table of the most common pages that link to the Apache Spark page.",
  table_properties={
    "quality": "gold"
  }
)
def top_spark_referrers():
  return (
    dlt.read("clickstream_clean")
      .filter(expr("current_page_title == 'Apache_Spark'"))
      .withColumnRenamed("previous_page_title", "referrer")
      .sort(desc("click_count"))
      .select("referrer", "click_count")
      .limit(10)
  )

@dlt.create_table(
  comment="A list of the top 50 pages by number of clicks.",
  table_properties={
    "quality": "gold"
  }
)
def top_pages():
  return (
    dlt.read("clickstream_clean")
      .groupBy("current_page_title")
      .agg(sum("click_count").alias("total_clicks"))
      .sort(desc("total_clicks"))
      .limit(50)
  )

SQL-код

CREATE LIVE TABLE clickstream_raw
COMMENT "The raw wikipedia click stream dataset, ingested from /databricks-datasets."
TBLPROPERTIES ("quality" = "bronze")
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`

CREATE LIVE TABLE clickstream_clean(
  CONSTRAINT valid_current_page EXPECT (current_page_id IS NOT NULL and current_page_title IS NOT NULL),
  CONSTRAINT valid_count EXPECT (click_count > 0) ON VIOLATION FAIL UPDATE
)
COMMENT "Wikipedia clickstream dataset with cleaned-up datatypes / column names and quality expectations."
TBLPROPERTIES ("quality" = "silver")
AS SELECT
  CAST (curr_id AS INT) AS current_page_id,
  curr_title AS current_page_title,
  CAST(n AS INT) AS click_count,
  CAST (prev_id AS INT) AS previous_page_id,
  prev_title AS previous_page_title
FROM live.clickstream_raw

CREATE LIVE TABLE top_spark_referers
COMMENT "A table of the most common pages that link to the Apache Spark page."
TBLPROPERTIES ("quality" = "gold")
AS SELECT
  previous_page_title as referrer,
  click_count
FROM live.clickstream_clean
WHERE current_page_title = 'Apache_Spark'
ORDER BY click_count DESC
LIMIT 10

CREATE LIVE TABLE top_pages
COMMENT "A list of the top 50 pages by number of clicks."
TBLPROPERTIES ("quality" = "gold")
AS SELECT
  current_page_title,
  SUM(click_count) as total_clicks
FROM live.clickstream_clean
GROUP BY current_page_title
ORDER BY 2 DESC
LIMIT 50

Создание конвейера

Чтобы создать новый конвейер с помощью записной книжки разностной динамической таблицы:

  1. Щелкните  задания значок задания на боковой панели, перейдите на вкладку конвейеры и нажмите кнопку создать конвейер.
  2. Присвойте конвейеру имя и введите полный путь к записной книжке.
  3. При необходимости введите место хранения выходных данных из конвейера. Если оставить место хранения пустым, система использует расположение по умолчанию.
  4. Выберите активировано для режима конвейера.
  5. Нажмите кнопку Создать.

Создание конвейера

Система отобразит страницу сведения о конвейере после нажатия кнопки создать. Вы также можете получить доступ к конвейеру, щелкнув имя конвейера на вкладке конвейеры .

Запуск конвейера

Чтобы начать обновление для нового конвейера, нажмите кнопку Значок запуска разностных динамических таблиц (кнопки) на верхней панели. Система возвращает сообщение, подтверждающее начало конвейера.

Запустить конвейер

После успешного запуска обновления система разностных динамических таблиц:

  1. Запускает кластер с использованием конфигурации кластера, созданной системой разностных динамических таблиц. Можно также указать пользовательскую конфигурацию кластера.
  2. Создает все несуществующие таблицы и обеспечивает правильную схему для всех существующих таблиц.
  3. Обновляет таблицы с последними доступными данными.
  4. Завершает работу кластера после завершения обновления.

Ход обновления можно отслеживать, просмотрев журнал событий в нижней части страницы сведений о конвейере .

Просмотр журнала событий конвейера

Просмотр результатов

Для просмотра сведений о конвейерной обработке можно использовать пользовательский интерфейс Дельта Live Tables. Это включает визуальное представление графа конвейера и схем, а также сведения об обработке записей, например число обработанных записей и записей, которые не прошли проверку.

Просмотр графа конвейера

Чтобы просмотреть граф обработки для конвейера, перейдите на вкладку граф . Можно использовать мышь для настройки кнопок просмотра или изменения кнопок на  диаграмме разностные динамические таблицы в правом верхнем углу панели графа.

Просмотреть граф конвейера

Просмотр сведений о наборе данных

Щелкните набор данных, чтобы просмотреть сведения о схеме для набора данных.

Просмотреть схему конвейера

Просмотр сведений об обработке

Чтобы просмотреть сведения об обработке, включая количество обработанных записей и метрик качества данных, выберите запись в журнале событий и перейдите на вкладку JSON .

Просмотр сведений о журнале событий

Просмотр параметров конвейера

Перейдите на вкладку Параметры , чтобы просмотреть созданную конфигурацию для вашего конвейера. Нажмите кнопку изменить параметры , чтобы изменить конфигурацию конвейера. Дополнительные сведения о параметрах конфигурации см. в разделе Параметры разностных динамических таблиц .

Публикация наборов данных

Можно сделать выходные данные конвейера доступными для запросов, опубликовав таблицы в Azure Databricks хранилище метаданных:

  1. Нажмите кнопку изменить параметры .

  2. Добавьте целевой параметр, чтобы настроить имя базы данных для таблиц.

    Настройка имени базы данных

  3. Нажмите кнопку Сохранить

  4. Нажмите кнопку Значок запуска разностных динамических таблиц Нажмите кнопку, чтобы запустить новое обновление для конвейера.

После завершения обновления можно Просмотреть базу данных и таблицы, запросить данные или использовать данные в подчиненных приложениях.

Запрос данных Википедии

Примеры записных книжек

В этих записных книжках представлены примеры Python и SQL, реализующие разностный конвейер таблиц.

  • Чтение необработанных данных JSON навигации в таблицу.
  • Чтение записей из таблицы необработанных данных и использование разностных таблиц в ожидании для создания новой таблицы, содержащей очищенные данные.
  • Используйте записи из таблицы очищенные данные, чтобы сделать запросы разностных динамических таблиц, которые создают производные наборы данных.

Начало работы с разностными таблицами в записной книжке Python

Получить записную книжку

Начало работы с записной книжкой SQL для разностных динамических таблиц

Получить записную книжку