Отслеживание конвейеров динамических таблиц Delta

В этой статье описывается, как использовать встроенные функции мониторинга и наблюдаемости для конвейеров Delta Live Tables, включая происхождение данных, журнал обновлений и отчеты о качестве данных.

Большинство данных мониторинга можно просматривать вручную с помощью пользовательского интерфейса сведений о конвейере. Некоторые задачи проще выполнить, запрашивая метаданные журнала событий. См. раздел " Что такое журнал событий Delta Live Tables?".

Какие сведения о конвейере доступны в пользовательском интерфейсе?

График конвейера отображается сразу после успешного запуска обновления конвейера. Стрелки представляют зависимости между наборами данных в конвейере. По умолчанию на странице сведений о конвейере отображается последнее обновление таблицы, но в раскрывающемся меню можно выбрать старые обновления.

Сведения, отображаемые, включают идентификатор конвейера, исходные библиотеки, затраты на вычисления, выпуск продукта и канал, настроенный для конвейера.

Чтобы просмотреть табличное представление наборов данных, щелкните вкладку "Список ". Представление списка позволяет просматривать все наборы данных в конвейере, представленные в виде строки в таблице, и полезно, если daG конвейера слишком велик, чтобы визуализироваться в представлении Графа . Вы можете управлять наборами данных, отображаемыми в таблице, с помощью нескольких фильтров, таких как имя набора данных, тип и состояние. Чтобы вернуться к визуализации DAG, щелкните Graph.

Пользователь Запуск от имени является владельцем конвейера, и обновления конвейера выполняются с разрешениями этого пользователя. Чтобы изменить пользователя run as, щелкните Разрешения и измените владельца конвейера.

Как просмотреть сведения о наборе данных?

Щелкнув набор данных в графе конвейера или списке наборов данных, отображаются сведения о наборе данных. Сведения включают схему набора данных, метрики качества данных и ссылку на исходный код, определяющий набор данных.

Просмотр журнала обновлений

Чтобы просмотреть журнал и состояние обновлений конвейера, щелкните раскрывающееся меню журнала обновлений в верхней строке.

Чтобы просмотреть граф, сведения и события для обновления, выберите обновление в раскрывающемся меню. Чтобы вернуться к последнему обновлению, щелкните Показать последнее обновление.

Получение уведомлений о событиях конвейера

Чтобы получать уведомления в режиме реального времени о событиях конвейера, например об успешном завершении или сбое обновления конвейера, добавьте параметр Добавить уведомления по электронной почте для событий конвейера при создании или изменении конвейера.

Что такое журнал событий Delta Live Tables?

Журнал событий разностных динамических таблиц содержит все сведения, связанные с конвейером, включая журналы аудита, проверки качества данных, ход конвейера и происхождение данных. С помощью журнала событий можно отслеживать, анализировать и контролировать состояние конвейеров данных.

Записи журнала событий можно просматривать в пользовательском интерфейсе разностных динамических таблиц, API разностных динамических таблиц или путем прямого запроса к журналу событий. В этом разделе основное внимание уделяется запросу журнала событий напрямую.

Можно также определить пользовательские действия, выполняемые при регистрации событий, например отправку оповещений с перехватчиками событий.

Схема журнала событий

Схема журнала событий описана в таблице ниже. Некоторые из этих полей содержат данные JSON, требующие синтаксического анализа для выполнения некоторых запросов, таких как details поле. Azure Databricks поддерживает : оператор для анализа полей JSON. См . оператор : (знак двоеточия).

Поле Description
id Уникальный идентификатор записи журнала событий.
sequence Документ JSON, содержащий метаданные для обнаружения и упорядочения событий.
origin Документ JSON, содержащий метаданные для источника события, например поставщика облачных служб, региона поставщика облачных служб, user_idили для отображения места создания конвейера либо WORKSPACEDBSQL .pipeline_typepipeline_id
timestamp Время записи события.
message Удобное для чтения сообщение, описывающее событие.
level Тип события, например , INFO, WARNERRORили METRICS.
error В случае возникновения ошибки сведения, описывающие ошибку.
details Документ JSON, содержащий структурированные сведения о событии. Это основное поле, используемое для анализа событий.
event_type Тип события.
maturity_level Стабильность схемы событий. Возможны следующие значения:

* STABLE: схема стабильна и не изменится.
* NULL: схема стабильна и не изменится. Значение может быть NULL , если запись была создана до maturity_level добавления поля (выпуск 2022.37).
* EVOLVING: схема не стабильна и может измениться.
* DEPRECATED: схема устарела, и среда выполнения Delta Live Tables может перестать создавать это событие в любое время.

Запрос журнала событий

Расположение журнала событий и интерфейса для запроса журнала событий зависит от того, настроен ли конвейер на использование хранилища метаданных Hive или каталога Unity.

Хранилище метаданных Hive

Если конвейер публикует таблицы в хранилище метаданных Hive, журнал событий хранится в /system/events расположении storage . Например, если параметр конвейера storage настроен как /Users/username/data, то журнал событий хранится в /Users/username/data/system/events пути в DBFS.

Если параметр storage не настроен, расположением журнала событий по умолчанию является /pipelines/<pipeline-id>/system/events в DBFS. Например, если идентификатор конвейера — 91de5e48-35ed-11ec-8d3d-0242ac130003, то место хранения — /pipelines/91de5e48-35ed-11ec-8d3d-0242ac130003/system/events.

Можно создать представление для упрощения запросов к журналу событий. В следующем примере создается временное представление event_log_raw. Это представление используется в примерах запросов журнала событий, включенных в эту статью:

CREATE OR REPLACE TEMP VIEW event_log_raw AS SELECT * FROM delta.`<event-log-path>`;

Замените <event-log-path> расположением журнала событий.

Каждый экземпляр выполнения конвейера называется обновлением. Часто требуется извлечь сведения для последнего обновления. Выполните следующий запрос, чтобы найти идентификатор последнего обновления и сохранить его во временном latest_update_id представлении. Это представление используется в примерах запросов журнала событий, включенных в эту статью:

CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;

Журнал событий можно запросить в записной книжке Azure Databricks или редакторе SQL. Используйте записную книжку или редактор SQL для выполнения примеров запросов журнала событий.

Каталог Unity

Если конвейер публикует таблицы в каталоге Unity, необходимо использовать event_logтабличную функцию (TVF), чтобы получить журнал событий для конвейера. Журнал событий для конвейера извлекается путем передачи идентификатора конвейера или имени таблицы в TVF. Например, чтобы получить записи журнала событий для конвейера с идентификатором 04c78631-3dd7-4856-b2a6-7d84e9b2638b:

SELECT * FROM event_log("04c78631-3dd7-4856-b2a6-7d84e9b2638b")

Чтобы получить записи журнала событий для конвейера, который создал или владеет таблицей my_catalog.my_schema.table1:

SELECT * FROM event_log(TABLE(my_catalog.my_schema.table1))

Чтобы вызвать TVF, необходимо использовать общий кластер или хранилище SQL. Например, можно использовать записную книжку, подключенную к общему кластеру, или использовать редактор SQL, подключенный к хранилищу SQL.

Чтобы упростить запросы событий для конвейера, владелец конвейера может создать представление по event_log TVF. В следующем примере создается представление журнала событий для конвейера. Это представление используется в примерах запросов журнала событий, включенных в эту статью.

Примечание.

event_log TVF может вызываться только владельцем конвейера, а представление, созданное через event_log TVF, может запрашиваться только владельцем конвейера. Представление не может быть предоставлено другим пользователям.

CREATE VIEW event_log_raw AS SELECT * FROM event_log("<pipeline-ID>");

Замените <pipeline-ID> уникальным идентификатором конвейера Delta Live Tables. Идентификатор можно найти на панели сведений о конвейере в пользовательском интерфейсе разностных динамических таблиц.

Каждый экземпляр выполнения конвейера называется обновлением. Часто требуется извлечь сведения для последнего обновления. Выполните следующий запрос, чтобы найти идентификатор последнего обновления и сохранить его во временном latest_update_id представлении. Это представление используется в примерах запросов журнала событий, включенных в эту статью:

CREATE OR REPLACE TEMP VIEW latest_update AS SELECT origin.update_id AS id FROM event_log_raw WHERE event_type = 'create_update' ORDER BY timestamp DESC LIMIT 1;

Запрос сведений о происхождении данных из журнала событий

События, содержащие сведения о происхождении данных, имеют тип события flow_definition. Объект details:flow_definition содержит output_dataset и input_datasets определяет каждую связь в графе.

Для извлечения входных и выходных наборов данных можно использовать следующий запрос, чтобы просмотреть сведения о происхождении:

SELECT
  details:flow_definition.output_dataset as output_dataset,
  details:flow_definition.input_datasets as input_dataset
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'flow_definition'
  AND
  origin.update_id = latest_update.id
output_dataset input_datasets
1 customers null
2 sales_orders_raw null
3 sales_orders_cleaned ["customers", "sales_orders_raw"]
4 sales_order_in_la ["sales_orders_cleaned"]

Запрос качества данных из журнала событий

Если вы определяете ожидания для наборов данных в конвейере, метрики качества данных хранятся в объекте details:flow_progress.data_quality.expectations . События, содержащие сведения о качестве данных, имеют тип события flow_progress. В следующем примере запрашиваются метрики качества данных для последнего обновления конвейера:

SELECT
  row_expectations.dataset as dataset,
  row_expectations.name as expectation,
  SUM(row_expectations.passed_records) as passing_records,
  SUM(row_expectations.failed_records) as failing_records
FROM
  (
    SELECT
      explode(
        from_json(
          details :flow_progress :data_quality :expectations,
          "array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
        )
      ) row_expectations
    FROM
      event_log_raw,
      latest_update
    WHERE
      event_type = 'flow_progress'
      AND origin.update_id = latest_update.id
  )
GROUP BY
  row_expectations.dataset,
  row_expectations.name
dataset expectation passing_records failing_records
1 sales_orders_cleaned valid_order_number 4083 0

Мониторинг невыполненной работы данных путем запроса к журналу событий

Delta Live Tables отслеживает количество данных, присутствующих в невыполненной работы в объекте details:flow_progress.metrics.backlog_bytes . События, содержащие метрики невыполненной работы, имеют тип flow_progressсобытия. В следующем примере запрашиваются метрики невыполненной работы для последнего обновления конвейера:

SELECT
  timestamp,
  Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
  event_log_raw,
  latest_update
WHERE
  event_type ='flow_progress'
  AND
  origin.update_id = latest_update.id

Примечание.

Метрики невыполненной работы могут быть недоступны в зависимости от типа источника данных конвейера и версии Databricks Runtime.

Мониторинг событий расширенного автомасштабирования из журнала событий

Журнал событий фиксирует изменения размера кластера при включении расширенного автомасштабирования в конвейерах. События, содержащие сведения о расширенном автомасштабировании, имеют тип события autoscale. Сведения об изменении размера кластера хранятся в объекте details:autoscale. В следующем примере приводятся запросы на изменение размера кластера расширенного автомасштабирования для последнего обновления конвейера:

SELECT
  timestamp,
  Double(
    case
      when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
      else null
    end
  ) as starting_num_executors,
  Double(
    case
      when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as partially_succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
      else null
    end
  ) as failed_num_executors
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'autoscale'
  AND
  origin.update_id = latest_update.id

Мониторинг использования вычислительных ресурсов

cluster_resources события предоставляют метрики по количеству слотов задач в кластере, количеству используемых слотов задач и количеству задач, ожидающих планирования.

Если включен расширенный автомасштабирование, cluster_resources события также содержат метрики для алгоритма автомасштабирования, включая latest_requested_num_executorsи optimal_num_executors. События также показывают состояние алгоритма в виде различных состояний, таких как CLUSTER_AT_DESIRED_SIZE, SCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORSи BLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION. Эти сведения можно просмотреть в сочетании с событиями автомасштабирования, чтобы обеспечить общую картину расширенного автомасштабирования.

В следующем примере выполняется запрос журнала размера очереди задач для последнего обновления конвейера:

SELECT
  timestamp,
  Double(details :cluster_resources.avg_num_queued_tasks) as queue_size
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

В следующем примере выполняется запрос журнала использования для последнего обновления конвейера:

SELECT
  timestamp,
  Double(details :cluster_resources.avg_task_slot_utilization) as utilization
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

В следующем примере выполняется запрос журнала счетчиков исполнителя, сопровождаемых метриками, доступными только для конвейеров расширенного автомасштабирования, включая количество исполнителей, запрашиваемых алгоритмом в последнем запросе, оптимальное количество исполнителей, рекомендуемых алгоритмом на основе последних метрик, и состояния алгоритма автомасштабирования:

SELECT
  timestamp,
  Double(details :cluster_resources.num_executors) as current_executors,
  Double(details :cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
  Double(details :cluster_resources.optimal_num_executors) as optimal_num_executors,
  details :cluster_resources.state as autoscaling_state
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id

Аудит конвейеров динамических таблиц Delta

Вы можете использовать записи журнала событий разностных динамических таблиц и другие журналы аудита Azure Databricks, чтобы получить полное представление о том, как данные обновляются в разностных динамических таблицах.

Разностные динамические таблицы используют учетные данные владельца конвейера для выполнения обновлений. Вы можете изменить используемые учетные данные, обновив владельца конвейера. Разностные динамические таблицы записывают пользователя, выполняющего действия в конвейере, включая создание конвейера, изменение конфигурации и активацию обновлений.

Сведения о событиях аудита каталога Unity см. в справочнике по событиям аудита каталога Unity.

Запрос действий пользователя в журнале событий

Можно использовать журнал событий для аудита событий, например действий пользователя. События, содержащие сведения о действиях пользователя, имеют тип события user_action.

Сведения о действии хранятся в объекте user_action в поле details. Используйте следующий запрос, чтобы создать журнал аудита событий пользователя. Сведения о создании представления, используемого event_log_raw в этом запросе, см. в разделе "Запрос журнала событий".

SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
timestamp action user_name
1 2021-05-20T19:36:03.517+0000 START user@company.com
2 2021-05-20T19:35:59.913+0000 CREATE user@company.com
3 2021-05-27T00:35:51.971+0000 START user@company.com

Сведения о среде выполнения

Вы можете просмотреть сведения о среде выполнения для обновления конвейера, например версию Databricks Runtime для обновления:

SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'
dbr_version
1 11,0