Вывод и изменение схемы в Auto Loader

Примечание

Поддержка формата JSON доступна в Databricks Runtime 8,2 и более поздних версий. Поддержка формата CSV доступна в Databricks Runtime 8,3 и более поздних версиях. Дополнительные сведения о каждом формате см. в разделе форматы данных.

Автозагрузчик может автоматически обнаруживать ввод новых столбцов в данные и перезапускать, поэтому вам не нужно самостоятельно управлять отслеживанием и обработкой изменений схемы. Автозагрузчик также может «спасти» непредвиденные данные (например, отличающиеся типы данных) в столбце больших двоичных объектов JSON, доступ к которому можно получить позже с помощью частично структурированных API доступа к данным.

Вывод схемы

Чтобы получить схему, загрузчик автоматически определит первые 50 ГБ или 1000 файлов, которые он обнаруживает, в зависимости от того, какое из ограничений пересекается первым. Чтобы избежать этой стоимости вывода при каждом запуске потока и обеспечить стабильную схему для перезапуска потоков, необходимо задать параметр cloudFiles.schemaLocation . Автозагрузчик создает в _schemas этом расположении скрытый каталог для отслеживания изменений схемы во входных данных с течением времени. Если поток содержит один cloudFiles источник для приема данных, можно указать расположение контрольной точки как cloudFiles.schemaLocation . В противном случае укажите уникальный каталог для этого параметра. Если входные данные возвращают непредвиденную схему для потока, убедитесь, что расположение схемы используется только одним источником автозагрузчика.

Примечание

Чтобы изменить размер используемого примера, можно задать конфигурации SQL:

spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes

(например, строка байта 10gb )

и

spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles

цело

По умолчанию автозагрузчик определяет столбцы в текстовых форматах файлов, таких как CSV и JSON, как string столбцы. В наборах данных JSON вложенные столбцы также выводятся в виде string столбцов. Так как данные JSON и CSV являются самоописывающимися и могут поддерживать много типов данных, вывод данных в виде строки может помочь избежать проблем, связанных с развитием схемы, таких как несовпадение числовых типов (целые числа, длинные, плавающие). Если вы хотите хранить исходное поведение вывода схемы Spark, задайте для параметра значение cloudFiles.inferColumnTypes true .

Примечание

Если учет регистра не включен, столбцы foo , Foo и считаются одним и тем FOO же столбцом. Выбор варианта, в котором столбец будет представлен в, является произвольным и зависит от данных выборки. Можно использовать указания схемы , чтобы задать, какой вариант следует использовать.

Автоматически загрузчик также пытается определить столбцы секционирования из базовой структуры каталогов данных, если данные размещаются в секционировании в стиле Hive. Например, путь к файлу, например, приведет к base_path/event=click/date=2021-04-01/f0.json выводу date и в event качестве столбцов секционирования. Типы данных для этих столбцов будут строками, если только не задано значение cloudFiles.inferColumnTypes true. Если базовая структура каталогов содержит конфликтующие секции Hive или не содержит секционирование в стиле Hive, то столбцы секционирования будут пропущены. Можно указать параметр в cloudFiles.partitionColumns виде разделенного запятыми списка имен столбцов, чтобы всегда пытаться проанализировать заданные столбцы из пути к файлу, если эти столбцы существуют как key=value пары в структуре каталогов.

Когда Auto Loader определяет схему, столбец с восстановленными данными автоматически добавляется в схему как _rescued_data . Дополнительные сведения см. в разделе об усовершенствованных столбцах данных и эволюции схем .

Примечание

Двоичный файл ( binaryFile ) и text форматы файлов имеют фиксированные схемы данных, но также поддерживают вывод столбцов секционирования. Столбцы секционирования выводятся при каждом перезапуске потока, если не указано значение cloudFiles.schemaLocation . Чтобы избежать возможных ошибок или потери информации, в модулях данных рекомендуется задавать cloudFiles.schemaLocation или cloudFiles.partitionColumns в качестве параметров для этих форматов файлов, так как cloudFiles.schemaLocation это не является обязательным параметром для этих форматов.

Указания схемы

Выводимые типы данных могут не всегда быть именно тем, что вы ищете. С помощью указания схемы можно доложить сведения, которые вы узнаете и предполагаете, на выводимую схему.

По умолчанию Apache Spark имеет стандартный подход к выведением типа столбцов данных. Например, он выводит вложенные JSON как структуры и целые числа в длину. В отличие от этого, Автоматический загрузчик считает все столбцы строками. Если известно, что столбец относится к определенному типу данных, или если вы хотите выбрать еще более общий тип данных (например, Double вместо целого числа), можно указать произвольное число подсказок для типов данных столбцов следующим образом:

.option("cloudFiles.schemaHints", "tags map<string,string>, version int")

Список поддерживаемых типов данных см. в документации по типам данных .

Если столбец отсутствует в начале потока, можно также использовать указания схемы, чтобы добавить этот столбец в выводимую схему.

Ниже приведен пример выводимой схемы для просмотра поведения с указаниями схемы. Выводимая схема:

|-- date: string
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string
|-- purchase_options: struct
|    |-- delivery_address: string

Указав следующие указания схемы:

.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")

Вы получите следующее:

|-- date: string -> date
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp

Примечание

Указания схемы используются только в том случае, если не предоставлена схема для автозагрузчика. Указания схемы можно использовать независимо от того cloudFiles.inferColumnTypes , включена или отключена.

Развитие схемы

Автоматический загрузчик обнаруживает Добавление новых столбцов при обработке данных. По умолчанию Добавление нового столбца приведет к прекращению работы потоков с UnknownFieldException . Перед тем как поток выдаст эту ошибку, автозагрузчик выполняет вывод схемы в последних микропакетах данных и обновляет расположение схемы с помощью последней схемы. Новые столбцы сливаются в конец схемы. Типы данных существующих столбцов остаются неизменными. Настроив поток автоматических загрузчика в рамках задания Azure Databricks, можно автоматически перезапускать поток после таких изменений схемы.

Auto Loader поддерживает следующие режимы для эволюции схем, которые задаются в параметре cloudFiles.schemaEvolutionMode :

  • addNewColumns: Режим по умолчанию, если для автоматической загрузки схема не предоставлена. Задание потоковой передачи завершится с ошибкой UnknownFieldException . Новые столбцы добавляются в схему. Существующие столбцы не развивают типы данных. addNewColumns не допускается, если указана схема потока. Вместо этого можно предоставить схему как указание схемы, если вы хотите использовать этот режим.
  • failOnNewColumns: Если автоматический загрузчик обнаруживает новый столбец, произойдет сбой потока. Он не будет перезапущен, если предоставленная схема не обновлена или файл данных, вызвавший ошибку, удаляется.
  • rescue: Поток выполняется с очень первой выводимой или предоставленной схемой. Все изменения типа данных или новые добавленные столбцы вносятся в столбец "изменяющие данные ", который автоматически добавляется в схему потока как _rescued_data . В этом режиме поток не будет работать из-за изменений схемы.
  • none: Режим по умолчанию при указании схемы. Не выполняет развитие схемы, новые столбцы пропускаются, и данные не изменяются, если только столбец «копия данных» не указан отдельно.

Столбцы секционирования не учитываются при развитии схемы. Если вы имели исходную структуру каталогов base_path/event=click/date=2021-04-01/f0.json , например, и затем начинаете получать новые файлы как base_path/event=click/date=2021-04-01/hour=01/f1.json , столбец Hour игнорируется. Для записи сведений о новых столбцах секционирования задайте cloudFiles.partitionColumns для значение event,date,hour .

Столбец "копия данных"

Столбец «невыполненные данные» гарантирует, что данные не будут потеряны или пропущены во время извлечения, преобразования и загрузки. Столбец "необработанные данные" содержит все данные, которые не были проанализированы, поскольку они отсутствовали в данной схеме, или из-за несовпадения типов или из-за того, что регистр столбца в записи или файле не соответствует значению в схеме. Столбец "исправленные данные" возвращается в виде большого двоичного объекта JSON, содержащего столбцы, которые были удалены, и путь к исходному файлу записи (путь к исходному файлу доступен в Databricks Runtime 8,3 и более поздних версиях). Столбец "восстановленные данные" является частью схемы, возвращенной автоматическим загрузчиком по _rescued_data умолчанию при выводимой схеме. Можно переименовать столбец или включить его в случаях, когда вы задаете схему, задав параметр rescuedDataColumn .

Поскольку значение по умолчанию cloudFiles.inferColumnTypesfalse , а cloudFiles.schemaEvolutionModeaddNewColumns при выводимой схеме, rescuedDataColumn захватывает только столбцы с регистром, отличным от регистра в схеме.

Синтаксические анализаторы JSON и CSV поддерживают три режима при анализе записей: PERMISSIVE , DROPMALFORMED и FAILFAST . Если используется вместе с rescuedDataColumn , несоответствия типов данных не приводят к удалению записей в DROPMALFORMED режиме или вызывают ошибку в FAILFAST режиме. Отбрасываются или выдаются ошибки только поврежденные записи, т. е. неполный или неправильно сформированный формат JSON или CSV. Если вы используете badRecordsPath при синтаксическом АНАЛИЗЕ JSON или CSV, несоответствия типов данных не считаются неправильными записями при использовании rescuedDataColumn . В не сохраняются только неполные и неправильно сформированные записи JSON или CSV badRecordsPath .

Форматы данных

Ограничения

  • Эволюция схем не поддерживается в приложениях Python, работающих на Databricks Runtime 8,2 и 8,3, использующих foreachBatch . foreachBatchВместо этого можно использовать в Scala.

Примеры вариантов использования

Включение простого ETL

Простой способ получить данные в разностной версии Lake без потери данных — использовать следующий шаблон и включить вывод схемы с автозагрузчиком. Модуль данных рекомендует запускать следующий код в задании Azure Databricks для автоматического перезапуска потока при изменении схемы исходных данных. По умолчанию схема выводится как строковые типы, любые ошибки синтаксического анализа (если все остаются в виде строки) _rescued_data , а все новые столбцы не будут работать в потоке и развивать схему.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.schemaLocation", "<path_to_checkpoint>") \
  .load("<path_to_source_data>") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path_to_checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path_to_checkpoint>")
  .load("<path_to_source_data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path_to_checkpoint>")
  .start("<path_to_target")

Предотвращение потери данных в хорошо структурированных данных

Если вы знакомы со схемой, но хотите иметь представление о получении непредвиденных данных, модуль данных рекомендует использовать rescuedDataColumn .

Python

spark.readStream.format("cloudFiles") \
  .schema(expected_schema) \
  .option("cloudFiles.format", "json") \
  # will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue") \
  .load("<path_to_source_data>") \
  .writeStream \
  .option("checkpointLocation", "<path_to_checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .schema(expected_schema)
  .option("cloudFiles.format", "json")
  // will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load("<path_to_source_data>")
  .writeStream
  .option("checkpointLocation", "<path_to_checkpoint>")
  .start("<path_to_target")

Если вы хотите, чтобы поток перестает обрабатываться, если новое поле не соответствует схеме, можно добавить:

.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")

Обеспечение гибких конвейеров с частично структурированными данными

При получении данных от поставщика, который вводит новые столбцы в предоставляемые им сведения, вы можете не знать точно, когда они это делают, или у вас может не быть пропускной способности для обновления конвейера данных. Теперь можно использовать развитие схемы для перезапуска потока и автоматического обновления выводимой схемы автоматически загрузчиком. Можно также использовать schemaHints для некоторых полей "независимо от схемы", которые может предоставить поставщик.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT") \
  .load("/api/requests") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path_to_checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT")
  .load("/api/requests")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path_to_checkpoint>")
  .start("<path_to_target")

Часто задаваемые вопросы (FAQ)

Как автоматически выгружает схему?

Когда таблица данных определена первым, автоматически загружается исходный каталог и выбираются последние (по времени изменения файлов) 50 ГБ или 1000, а также используются для получения схемы.

Автоматически загрузчик также выводит столбцы секционирования, изучая исходную структуру каталогов и выполнит поиск путей к файлам, содержащих /key=value/ структуру. Если исходный каталог имеет непоследовательную структуру, например:

base/path/partition=1/date=2020-12-31/file1.json
// inconsistent because date and partition directories are in different orders
base/path/date=2020-12-31/partition=2/file2.json
// inconsistent because the date directory is missing
base/path/partition=3/file3.json

Автоматический загрузчик выводит столбцы секционирования как пустые. Используйте cloudFiles.partitionColumns для явного анализа столбцов из структуры каталогов.

Как работает автоматический загрузчик, если исходная папка пуста?

Если исходный каталог пуст, то при автоматическом загрузчике требуется указать схему, так как нет данных для выполнения вывода.

Когда автозагрузчик выводит схему? Будет ли он автоматически развиваться после каждого микропакетного пакета?

Схема выводится при первом определении кадра данных в коде. Во время каждого микропакетного пакета изменения схемы оцениваются на лету; Поэтому вам не нужно беспокоиться о попадании в производительность. Когда поток перезапускается, он выбирает сохранимую схему из расположения схемы и начинает выполнение без каких бы то ни было издержек при выводе.

Каково влияние на производительность при приеме данных при использовании вывода схемы автоматического загрузчика?

Необходимо, чтобы вывод схемы занимал несколько минут для очень больших исходных каталогов во время первоначального вывода схемы. В противном случае при выполнении потока не следует наблюдать за значительным количеством результатов производительности. При выполнении кода в записной книжке Azure Databricks можно просмотреть обновления состояния, указывающие, когда в качестве каталога для выборки и выведения схемы данных будет указан автоматический загрузчик.

Из-за ошибки поврежденный файл радикально изменил свою схему. Как выполнить откат изменений схемы?

Обратитесь за помощью в службу поддержки блоков связи.