Потоковые операции чтения и записи в таблицеTable streaming reads and writes

Разностная Lake тесно интегрирована с структурированной потоковой передачей Spark с помощью readStream и writeStream .Delta Lake is deeply integrated with Spark Structured Streaming through readStream and writeStream. Дельта Lake перекладывает многие ограничения, которые обычно связаны с системами потоковой передачи и файлами, включая:Delta Lake overcomes many of the limitations typically associated with streaming systems and files, including:

  • Объединение небольших файлов, созданных с помощью приема низкой задержкиCoalescing small files produced by low latency ingest

  • Поддержание обработки "ровно один раз" с несколькими потоками (или параллельными пакетными заданиями)Maintaining “exactly-once” processing with more than one stream (or concurrent batch jobs)

  • Эффективное обнаружение файлов, которые являются новыми при использовании файлов в качестве источника для потокаEfficiently discovering which files are new when using files as the source for a stream

Разностная таблица в качестве источника потока Delta table as a stream source

При загрузке разностной таблицы в качестве источника потока и использовании ее в потоковой обработке запрос обрабатывает все данные, имеющиеся в таблице, а также любые новые данные, поступающие после запуска потока.When you load a Delta table as a stream source and use it in a streaming query, the query processes all of the data present in the table as well as any new data that arrives after the stream is started.

В качестве потока можно загрузить как пути, так и таблицы.You can load both paths and tables as a stream.

spark.readStream.format("delta")
  .load("/mnt/delta/events")

import io.delta.implicits._
spark.readStream.delta("/mnt/delta/events")

oror

import io.delta.implicits._

spark.readStream.format("delta")
  .table("events")

Кроме того, вы можете сделать следующее:You can also:

  • Контролируйте максимальный размер любого микропакета, для которого Дельта Lake предоставляет потоковую передачу, задав maxFilesPerTrigger параметр.Control the maximum size of any micro-batch that Delta Lake gives to streaming by setting the maxFilesPerTrigger option. Указывает максимальное число новых файлов, которые будут учитываться в каждом триггере.This specifies the maximum number of new files to be considered in every trigger. Значение по умолчанию — 1000.The default is 1000.
  • Rate — Ограничьте объем данных, обрабатываемых в каждом микропакете, задав maxBytesPerTrigger параметр.Rate-limit how much data gets processed in each micro-batch by setting the maxBytesPerTrigger option. Этот параметр задает "Soft Max", что означает, что Пакетная обработка занимает примерно такой объем данных и может обрабатывать больше, чем ограничение.This sets a “soft max”, meaning that a batch processes approximately this amount of data and may process more than the limit. Если используется Trigger.Once для потоковой передачи, этот параметр игнорируется.If you use Trigger.Once for your streaming, this option is ignored. При использовании этого параметра в сочетании с maxFilesPerTrigger пакет Micro-Batch обрабатывает данные до тех пор, пока maxFilesPerTrigger maxBytesPerTrigger не будет достигнут предел.If you use this option in conjunction with maxFilesPerTrigger, the micro-batch processes data until either the maxFilesPerTrigger or maxBytesPerTrigger limit is reached.

Примечание

В случаях, когда транзакции исходной таблицы очищаются из-за logRetentionDuration конфигурации и поток перестает обрабатываться, Дельта Lake обрабатывает данные, соответствующие последнему доступному журналу транзакций исходной таблицы, но не завершает поток.In cases when the source table transactions are cleaned up due to the logRetentionDuration configuration and the stream lags in processing, Delta Lake processes the data corresponding to the latest available transaction history of the source table but does not fail the stream. Это может привести к удалению данных.This can result in data being dropped.

Пропустить обновления и удаленияIgnore updates and deletes

Структурированная потоковая передача не обрабатывает входные данные, которые не являются дополнением, и создает исключение, если в таблице, используемой в качестве источника, происходят какие-либо изменения.Structured Streaming does not handle input that is not an append and throws an exception if any modifications occur on the table being used as a source. Существуют две основные стратегии для работы с изменениями, которые не могут быть автоматически распространены по нисходящей.There are two main strategies for dealing with changes that cannot be automatically propagated downstream:

  • Вы можете удалить выходные данные и контрольную точку и перезапустить поток с самого начала.You can delete the output and checkpoint and restart the stream from the beginning.
  • Можно задать любой из этих двух вариантов:You can set either of these two options:
    • ignoreDeletes: игнорировать транзакции, удаляющие данные на границах секций.ignoreDeletes: ignore transactions that delete data at partition boundaries.
    • ignoreChanges: Повторно обработайте обновления, если файлы должны быть перезаписаны в исходной таблице из-за такой операции изменения данных, как UPDATE , MERGE INTO , DELETE (в секциях) или OVERWRITE .ignoreChanges: re-process updates if files had to be rewritten in the source table due to a data changing operation such as UPDATE, MERGE INTO, DELETE (within partitions), or OVERWRITE. Неизмененные строки могут по-прежнему выдаваться, поэтому нисходящие потребители должны иметь возможность обрабатывать дубликаты.Unchanged rows may still be emitted, therefore your downstream consumers should be able to handle duplicates. Удаления не распространяются на нижестоящий.Deletes are not propagated downstream. ignoreChanges субсумес ignoreDeletes .ignoreChanges subsumes ignoreDeletes. Поэтому, если используется ignoreChanges , поток не будет нарушаться удалением или обновлением исходной таблицы.Therefore if you use ignoreChanges, your stream will not be disrupted by either deletions or updates to the source table.

ПримерExample

Например, предположим, что имеется таблица user_events со date user_email столбцами, и, action секционированными по date .For example, suppose you have a table user_events with date, user_email, and action columns that is partitioned by date. Потоковая передача из user_events таблицы и необходимо удалить из нее данные из-за GDPR.You stream out of the user_events table and you need to delete data from it due to GDPR.

При удалении на границах секций (то есть в WHERE столбце секции) файлы уже разбиты по значению, поэтому удаление просто удаляет эти файлы из метаданных.When you delete at partition boundaries (that is, the WHERE is on a partition column), the files are already segmented by value so the delete just drops those files from the metadata. Таким образом, если нужно просто удалить данные из некоторых секций, можно использовать:Thus, if you just want to delete data from some partitions, you can use:

spark.readStream.format("delta")
  .option("ignoreDeletes", "true")
  .load("/mnt/delta/user_events")

Однако если необходимо удалить данные на основе user_email , необходимо будет использовать:However, if you have to delete data based on user_email, then you will need to use:

spark.readStream.format("delta")
  .option("ignoreChanges", "true")
  .load("/mnt/delta/user_events")

При обновлении user_email с помощью UPDATE инструкции файл с заданным user_email вопросом перезаписывается.If you update a user_email with the UPDATE statement, the file containing the user_email in question is rewritten. При использовании ignoreChanges Новая запись передается дальше со всеми другими неизмененными записями, которые были в том же файле.When you use ignoreChanges, the new record is propagated downstream with all other unchanged records that were in the same file. Логика должна иметь возможность обрабатывать эти входящие дублирующиеся записи.Your logic should be able to handle these incoming duplicate records.

Укажите начальное расположениеSpecify initial position

Примечание

Эта функция доступна на Databricks Runtime 7,3 LTS и более поздних версий.This feature is available on Databricks Runtime 7.3 LTS and above.

Можно использовать следующие параметры, чтобы указать начальную точку источника разностной потоковой передачи, не обрабатывая всю таблицу.You can use the following options to specify the starting point of the Delta Lake streaming source without processing the entire table.

  • startingVersion: Версия дельты Lake, с которой начинается запуск.startingVersion: The Delta Lake version to start from. Все изменения таблицы, начиная с этой версии (включительно), будут считываться источником потоковой передачи.All table changes starting from this version (inclusive) will be read by the streaming source. Можно получить версии фиксации из version столбца описания выходных данных команды журнала .You can obtain the commit versions from the version column of the DESCRIBE HISTORY command output.

    В Databricks Runtime 7,4 и более поздних версиях, чтобы возвращались только последние изменения, укажите latest .In Databricks Runtime 7.4 and above, to return only the latest changes, specify latest.

  • startingTimestamp: Отметка времени для запуска.startingTimestamp: The timestamp to start from. Все изменения в таблице, зафиксированные в или после метки времени (включительно), считываются источником потоковой передачи.All table changes committed at or after the timestamp (inclusive) will be read by the streaming source. Одно из двух значений:One of:

    • Строка метки времени.A timestamp string. Например, "2019-01-01T00:00:00.000Z".For example, "2019-01-01T00:00:00.000Z".
    • Строка даты.A date string. Например, "2019-01-01".For example, "2019-01-01".

Нельзя одновременно задать оба параметра; можно использовать только один из них.You cannot set both options at the same time; you can use only one of them. Они вступают в силу только при запуске нового потокового запроса.They take effect only when starting a new streaming query. Если потоковый запрос запущен и в его контрольной точке записан ход выполнения, эти параметры игнорируются.If a streaming query has started and the progress has been recorded in its checkpoint, these options are ignored.

Важно!

Хотя источник потоковой передачи можно запустить из указанной версии или метки времени, схема источника потоковой передачи всегда является последней схемой разностной таблицы.Although you can start the streaming source from a specified version or timestamp, the schema of the streaming source is always the latest schema of the Delta table. Необходимо убедиться в отсутствии несовместимых изменений схемы для разностной таблицы после указанной версии или метки времени.You must ensure there is no incompatible schema change to the Delta table after the specified version or timestamp. В противном случае источник потоковой передачи может вернуть неверные результаты при чтении данных с неверной схемой.Otherwise, the streaming source may return incorrect results when reading the data with an incorrect schema.

ПримерExample

Например, предположим, что у вас есть таблица user_events .For example, suppose you have a table user_events. Если вы хотите считать изменения с версии 5, используйте:If you want to read changes since version 5, use:

spark.readStream.format("delta")
  .option("startingVersion", "5")
  .load("/mnt/delta/user_events")

Если вы хотите читать изменения с 2018-10-18, используйте:If you want to read changes since 2018-10-18, use:

spark.readStream.format("delta")
  .option("startingTimestamp", "2018-10-18")
  .load("/mnt/delta/user_events")

Разностная таблица в качестве приемника Delta table as a sink

Кроме того, данные можно записывать в разностную таблицу с помощью структурированной потоковой передачи.You can also write data into a Delta table using Structured Streaming. Журнал транзакций обеспечивает Дельта Lake для гарантированной обработки ровно однажды, даже если в таблице имеются другие потоки или пакетные запросы, выполняющиеся параллельно.The transaction log enables Delta Lake to guarantee exactly-once processing, even when there are other streams or batch queries running concurrently against the table.

Режим добавления Append mode

По умолчанию потоки выполняются в режиме добавления, когда добавляются новые записи в таблицу.By default, streams run in append mode, which adds new records to the table.

Можно использовать метод path:You can use the path method:

PythonPython

events.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json")
  .start("/delta/events")

ScalaScala

events.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/mnt/delta/events/_checkpoints/etl-from-json")
  .start("/mnt/delta/events")

import io.delta.implicits._
events.writeStream
  .outputMode("append")
  .option("checkpointLocation", "/mnt/delta/events/_checkpoints/etl-from-json")
  .delta("/mnt/delta/events")

или метод Table:or the table method:

PythonPython

events.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json")
  .table("events")

ScalaScala

events.writeStream
  .outputMode("append")
  .option("checkpointLocation", "/mnt/delta/events/_checkpoints/etl-from-json")
  .table("events")

Полный режимComplete mode

Структурированную потоковую передачу также можно использовать для замены всей таблицы на каждый пакет.You can also use Structured Streaming to replace the entire table with every batch. Один из примеров использования — вычисление сводки с помощью статистической обработки:One example use case is to compute a summary using aggregation:

spark.readStream
  .format("delta")
  .load("/mnt/delta/events")
  .groupBy("customerId")
  .count()
  .writeStream
  .format("delta")
  .outputMode("complete")
  .option("checkpointLocation", "/mnt/delta/eventsByCustomer/_checkpoints/streaming-agg")
  .start("/mnt/delta/eventsByCustomer")

Предыдущий пример непрерывно обновляет таблицу, содержащую совокупное количество событий по клиентам.The preceding example continuously updates a table that contains the aggregate number of events by customer.

Для приложений с дополнительными требованиями к задержке мягкая можно сэкономить вычислительные ресурсы с одноразовыми триггерами.For applications with more lenient latency requirements, you can save computing resources with one-time triggers. Используйте их для обновления сводных статистических таблиц по заданному расписанию, обрабатывая только новые данные, поступившие с момента последнего обновления.Use these to update summary aggregation tables on a given schedule, processing only new data that has arrived since the last update.