Configuración de inferencia y evolución de esquemas en Auto Loader

Puede configurar Auto Loader para detectar automáticamente el esquema de los datos cargados, lo que le permite inicializar tablas sin declarar explícitamente el esquema de datos y evolucionar el esquema de tabla a medida que se introducen nuevas columnas. Esto elimina la necesidad de realizar un seguimiento manual y aplicar los cambios de esquema a lo largo del tiempo.

El cargador automático también puede "rescatar" datos inesperados (por ejemplo, con tipos de datos diferentes) en una columna de blob JSON, a los que puede optar por acceder más adelante mediante las API de acceso a datos semiestructurados.

Se admiten los siguientes formatos para la inferencia y evolución de esquemas:

Formato de archivo Versiones compatibles
JSON Todas las versiones
CSV Todas las versiones
XML Databricks Runtime 14.3 LTS y versiones posteriores
Avro Databricks Runtime 10.4 LTS y versiones posteriores
Parquet Databricks Runtime 11.3 LTS y versiones posteriores
ORC No admitidas
Text No aplicable (esquema fijo)
Binaryfile No aplicable (esquema fijo)

Sintaxis de la inferencia y evolución del esquema

Especificar un directorio de destino para la opción cloudFiles.schemaLocation habilita la inferencia de esquema y la evolución. Puede optar por usar el mismo directorio que especifique para checkpointLocation. Si usa Delta Live Tables, Azure Databricks administra automáticamente la ubicación del esquema y otra información de punto de comprobación.

Nota:

Si tiene más de una ubicación de datos de origen que se carga en la tabla de destino, cada carga de trabajo de ingesta de Auto Loader requiere un punto de comprobación de streaming independiente.

En el ejemplo siguiente se usa parquet para cloudFiles.format. Use csv, avro o json para otros orígenes de archivos. Todas las demás opciones de lectura y escritura permanecen iguales para los comportamientos predeterminados para cada formato.

Python

(spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  # The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")
)

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "parquet")
  // The schema location directory keeps track of your data schema over time
  .option("cloudFiles.schemaLocation", "<path-to-checkpoint>")
  .load("<path-to-source-data>")
  .writeStream
  .option("checkpointLocation", "<path-to-checkpoint>")
  .start("<path_to_target")

¿Cómo funciona la inferencia de esquemas de Auto Loader?

Para inferir el esquema la primera vez que se leen los datos, Auto Loader muestrea los primeros 50 GB o 1000 archivos que detecta, según qué límite se supere primero. Auto Loader almacena la información de esquema en un directorio _schemas en el cloudFiles.schemaLocation configurado para realizar un seguimiento de los cambios de esquema en los datos de entrada a lo largo del tiempo.

Nota:

Para cambiar el tamaño de la muestra que se usa, puede establecer las configuraciones de SQL siguientes:

spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes

(cadena de bytes, por ejemplo, 10gb)

y

spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles

(entero)

De manera predeterminada, la inferencia de esquemas de Auto Loader busca evitar problemas de evolución del esquema debido a errores de coincidencia de tipos. Para los formatos que no codifican tipos de datos (JSON, CSV y XML), el cargador automático deduce todas las columnas como cadenas (incluidos los campos anidados en archivos JSON). En el caso de los formatos con esquema con tipo (Parquet y Avro), Auto Loader muestrea un subconjunto de archivos y combina los esquemas de los archivos individuales. Este comportamiento se resume en la tabla siguiente:

Formato de archivo Tipo de datos inferido predeterminado
JSON String
CSV String
XML String
Avro Tipos codificados en el esquema de Avro
Parquet Tipos codificados en el esquema de Parquet

DataFrameReader de Apache Spark usa un comportamiento diferente para la inferencia de esquemas, seleccionando tipos de datos para columnas en orígenes JSON, CSV y XML basados en datos de ejemplo. Para habilitar este comportamiento con Auto Loader, establezca la opción cloudFiles.inferColumnTypes en true.

Nota:

Al deducir el esquema de los datos CSV, Auto Loader supone que los archivos contienen encabezados. Si los archivos CSV no contienen encabezados, proporcione la opción .option("header", "false"). Además, Auto Loader combina los esquemas de todos los archivos del ejemplo para crear un esquema global. Luego, Auto Loader puede leer cada archivo según su encabezado y analizar correctamente el archivo CSV.

Nota:

Cuando una columna tiene tipos de datos diferentes en dos archivos Parquet, Auto Loader elige el tipo más ancho. Puede usar schemaHints para invalidar esta opción. Cuando se especifican sugerencias de esquema, Auto Loader no convierte la columna en el tipo especificado, sino que indica al lector de Parquet que lea la columna como el tipo especificado. En el caso de un error de coincidencia, la columna se rescata en la columna de datos de .

¿Cómo funciona la evolución del esquema de Auto Loader?

El cargador automático detecta la adición de nuevas columnas a medida que procesa los datos. Cuando Auto Loader detecta una nueva columna, la secuencia se detiene con un UnknownFieldException. Antes de que la secuencia genere este error, Auto Loader realiza la inferencia de esquemas en el microlote de datos más reciente y actualiza la ubicación del esquema con el esquema más reciente combinando las columnas nuevas al final del esquema. Los tipos de datos de las columnas existentes permanecen sin cambios.

Databricks recomienda configurar las secuencias de Auto Loader con flujos de trabajo para reiniciarse automáticamente después de estos cambios de esquema.

El cargador automático admite los siguientes modos para la evolución del esquema, que se establecen en la opción cloudFiles.schemaEvolutionMode:

Modo Comportamiento al leer una columna nueva
addNewColumns (valor predeterminado) Se produce un error en la secuencia. Se agregan nuevas columnas al esquema. Las columnas existentes no evolucionan tipos de datos.
rescue El esquema nunca evoluciona y la secuencia no genera errores debido a cambios de esquema. Todas las columnas nuevas se registran en la columna de datos rescatados.
failOnNewColumns Se produce un error en la secuencia. La secuencia no se reinicia a menos que se actualice el esquema proporcionado o se quite el archivo de datos incorrecto.
none No evoluciona el esquema, las nuevas columnas se omiten y los datos no se rescaten a menos que se establezca la opción rescuedDataColumn. No se produce un error en la secuencia debido a cambios de esquema.

¿Cómo funcionan las particiones con Auto Loader?

Auto Loader intenta inferir columnas de partición a partir de la estructura de directorios subyacente de los datos si estos están dispuestos en la creación de particiones de estilo Hive. Por ejemplo, la ruta de acceso de archivo base_path/event=click/date=2021-04-01/f0.json da lugar a la inferencia de date y event como columnas de partición. Si la estructura de directorios subyacente contiene particiones de Hive en conflicto o no contiene particiones de estilo Hive, se omiten las columnas de partición.

Los formatos de archivo text y archivo binario (binaryFile) tienen esquemas de datos fijos, pero admiten la inferencia de columnas de partición. Databricks recomienda establecer cloudFiles.schemaLocation para estos formatos de archivo. Esto evita posibles errores o pérdida de información y evita la inferencia de columnas de particiones cada vez que se inicia una instancia de Auto Loader.

Las columnas de partición no se tienen en cuenta para la evolución del esquema. Si tenía una estructura de directorios inicial como base_path/event=click/date=2021-04-01/f0.json y empieza a recibir archivos nuevos como base_path/event=click/date=2021-04-01/hour=01/f1.json, Auto Loader omite la columna de hora. Para capturar información para nuevas columnas de partición, establezca cloudFiles.partitionColumns en event,date,hour.

Nota:

La opción cloudFiles.partitionColumns toma una lista de nombres de columna separados por comas. Solo se analizan las columnas que existen como pares key=value en la estructura de directorios.

¿Qué es la columna de datos rescatados?

Cuando Auto Loader infiere el esquema, se agrega automáticamente una columna de datos rescatados al esquema como _rescued_data. Puede cambiar el nombre de la columna o incluirla en los casos en los que proporcione un esquema mediante la opción rescuedDataColumn.

La columna de datos rescatados garantiza que las columnas que no coincidan con el esquema se rescaten en lugar de quitarse. La columna de datos rescatados contiene los datos que no se analizan por los siguientes motivos:

  • Falta la columna del esquema.
  • Errores de coincidencia de tipos.
  • Errores de coincidencia de mayúsculas y minúsculas.

La columna de datos rescatados contiene un JSON que contiene las columnas rescatadas y la ruta de acceso del archivo de origen del registro.

Nota:

Los analizadores JSON y CSV admiten tres modos al analizar registros: PERMISSIVE, DROPMALFORMED y FAILFAST. Cuando se usan junto con rescuedDataColumn, las discrepancias de tipo de datos no hacen que los registros se eliminen en el modo DROPMALFORMED ni que se genere un error en el modo FAILFAST. Únicamente se quitan o producen errores dañados, como JSON o CSV incompletos o con formato incorrecto. Si usa badRecordsPath al analizar JSON o CSV, las discrepancias de tipo de datos no se consideran como registros no válidos al usar rescuedDataColumn. Solo los registros JSON o CSV incompletos y con formato incorrecto se almacenan en badRecordsPath.

Cambio del comportamiento que distingue mayúsculas de minúsculas

A menos que se habilite la distinción entre mayúsculas y minúsculas, las columnas abc, Abc y ABC se consideran la misma para los propósitos de inferencia del esquema. El caso de mayúsculas y minúsculas es arbitraria y depende de los datos muestreados. Puede usar sugerencias de esquema para exigir el uso de mayúsculas o de minúsculas. Una vez realizada una selección y se deduce el esquema, Auto Loader no tiene en cuenta las variantes de mayúsculas y minúsculas que no se han selecionado de forma coherente con el esquema.

Cuando la columna de datos rescatados está habilitada, los campos denominados en un caso distinto del esquema se cargan en la columna _rescued_data. Cambiaa este comportamiento al establezcer la opción readerCaseSensitive en false, en cuyo caso el cargador automático lee los datos de una manera que no distingue mayúsculas de minúsculas.

Invalidación de la inferencia de esquemas con sugerencias de esquema

Puede usar sugerencias de esquema para aplicar la información del esquema que conoce y espera en un esquema inferido. Si sabe que una columna es de un tipo de datos específico o si quiere elegir un tipo de datos aún más general (por ejemplo, un valor double en lugar de un integer), puede proporcionar un número arbitrario de sugerencias para los tipos de datos de columnas como una cadena mediante la sintaxis de especificación de esquemas de SQL de la siguiente manera:

.option("cloudFiles.schemaHints", "tags map<string,string>, version int")

Consulte en la documentación sobre los tipos de datos una lista de los tipos de datos admitidos.

Si una columna no está presente al principio de la secuencia, puede usar también sugerencias de esquema para agregar dicha columna al esquema inferido.

A continuación se muestra un ejemplo de un esquema inferido para ver el comportamiento con sugerencias de esquema.

Esquema inferido:

|-- date: string
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string
|-- purchase_options: struct
|    |-- delivery_address: string

Si especifica las siguientes sugerencias de esquema:

.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")

obtendrá:

|-- date: string -> date
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp

Nota:

La compatibilidad con sugerencias de esquema de matriz y asignación está disponible en Databricks Runtime 9.1 LTS y versiones posteriores.

A continuación se muestra un ejemplo de un esquema inferido con tipos de datos complejos para ver el comportamiento con sugerencias de esquema.

Esquema inferido:

|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int

Si especifica las siguientes sugerencias de esquema:

.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")

obtendrá:

|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string -> int
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string -> int
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int -> string

Nota:

Las sugerencias de esquema se usan únicamente si no se proporciona un esquema al cargador automático. Puede usar sugerencias de esquema tanto si cloudFiles.inferColumnTypes está habilitado como deshabilitado.