Inferencia y evolución de esquemas en el cargador automático

Nota

La compatibilidad con el formato JSON está disponible en Databricks Runtime 8.2 y posteriores; La compatibilidad con el formato CSV está disponible Databricks Runtime versión 8.3 y posteriores. Para obtener más información sobre cada formato, vea Formatos de datos.

Auto Loader puede detectar automáticamente la introducción de nuevas columnas a los datos y reiniciar para que no tenga que administrar el seguimiento y el control de los cambios de esquema usted mismo. Auto Loader también puede "rescate" de datos inesperados (por ejemplo, de tipos de datos diferentes) en una columna de blob JSON, a los que puede optar por acceder más adelante mediante las API de acceso a datos semiestructurados .

Inferencia de esquemas

Para deducir el esquema, Auto Loader muestrea los primeros 50 GB o 1000 archivos que detecta, lo que sea que se cruce primero. Para evitar incurrir en este costo de inferencia en cada inicio de secuencia y poder proporcionar un esquema estable entre reinicios de secuencia, debe establecer la opción cloudFiles.schemaLocation . Auto Loader crea un directorio oculto en _schemas esta ubicación para realizar un seguimiento de los cambios de esquema en los datos de entrada a lo largo del tiempo. Si la secuencia contiene un único cloudFiles origen para ingerir datos, puede proporcionar la ubicación del punto de control como cloudFiles.schemaLocation . De lo contrario, proporcione un directorio único para esta opción. Si los datos de entrada devuelven un esquema inesperado para la secuencia, compruebe que la ubicación del esquema solo la usa un único origen del cargador automático.

Nota

Para cambiar el tamaño del ejemplo que se usa, puede establecer las SQL siguientes:

spark.databricks.cloudFiles.schemaInference.sampleSize.numBytes

(cadena de bytes, por ejemplo 10gb )

y

spark.databricks.cloudFiles.schemaInference.sampleSize.numFiles

(entero)

De forma predeterminada, Auto Loader deduce columnas en formatos de archivo basados en texto como CSV y JSON como string columnas. En los conjuntos de datos JSON, las columnas anidadas también se deducen como string columnas. Dado que los datos JSON y CSV son autodescriptos y pueden admitir muchos tipos de datos, la inferencia de los datos como cadena puede ayudar a evitar problemas de evolución del esquema, como errores de coincidencia de tipos numéricos (enteros, longs, floats). Si desea conservar el comportamiento de inferencia del esquema de Spark original, establezca la opción cloudFiles.inferColumnTypes en true .

Nota

A menos que se habilite la confidencialidad de mayúsculas y minúsculas, las columnas foo , y se consideran la misma FooFOO columna. La selección del caso en el que se representará la columna es arbitraria y depende de los datos muestreados. Puede usar sugerencias de esquema para exigir qué caso se debe usar.

El cargador automático también intenta inferir columnas de partición de la estructura de directorios subyacente de los datos si los datos están dispuestos en la creación de particiones de estilo de Hive. Por ejemplo, una ruta de acceso de archivo como base_path/event=click/date=2021-04-01/f0.json daría lugar a la inferencia de y como columnas de dateevent partición. Los tipos de datos de estas columnas serán cadenas a menos que establezca cloudFiles.inferColumnTypes en true. Si la estructura de directorios subyacente contiene particiones de Hive en conflicto o no contiene particiones de estilo de Hive, se omitirán las columnas de partición. Puede proporcionar la opción como una lista separada por comas de nombres de columna para probar y analizar siempre las columnas especificadas de la ruta de acceso del archivo si estas columnas existen como pares en la estructura cloudFiles.partitionColumnskey=value de directorios.

Cuando el cargador automático deduce el esquema, se agrega automáticamente una columna de datos rescatada al esquema como . Consulte la sección sobre la columna de datos rescatadas y la evolución del esquema para obtener más información.

Nota

Los formatos de archivo binario ( ) y binaryFile tienen esquemas de datos text fijos, pero también admiten la inferencia de columnas de partición. Las columnas de partición se deducen en cada reinicio de secuencia a menos que especifique cloudFiles.schemaLocation . Para evitar posibles errores o pérdida de información, Databricks recomienda establecer o como opciones para estos formatos de archivo, ya que no es una opción necesaria cloudFiles.schemaLocationcloudFiles.partitionColumns para estos cloudFiles.schemaLocation formatos.

Sugerencias de esquema

Es posible que los tipos de datos que se deducen no siempre sean exactamente lo que busca. Mediante el uso de sugerencias de esquema, puede superponer la información que conoce y espera en un esquema deducido.

De forma predeterminada, Apache Spark un enfoque estándar para deducir el tipo de columnas de datos. Por ejemplo, deduce JSON anidado como structs y enteros como longs. Por el contrario, Auto Loader considera todas las columnas como cadenas. Cuando sepa que una columna es de un tipo de datos específico o si desea elegir un tipo de datos aún más general (por ejemplo, un valor double en lugar de un entero), puede proporcionar un número arbitrario de sugerencias para los tipos de datos de columnas de la siguiente manera:

.option("cloudFiles.schemaHints", "tags map<string,string>, version int")

Consulte la documentación sobre los tipos de datos para obtener la lista de tipos de datos admitidos.

Si una columna no está presente al principio de la secuencia, también puede usar sugerencias de esquema para agregar esa columna al esquema deducido.

Este es un ejemplo de un esquema deducido para ver el comportamiento con sugerencias de esquema. Esquema deducido:

|-- date: string
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string
|-- purchase_options: struct
|    |-- delivery_address: string

Especificando las siguientes sugerencias de esquema:

.option("cloudFiles.schemaHints", "date DATE, user_info.dob DATE, purchase_options MAP<STRING,STRING>, time TIMESTAMP")

recibirá lo siguiente:

|-- date: string -> date
|-- quantity: int
|-- user_info: struct
|    |-- id: string
|    |-- name: string
|    |-- dob: string -> date
|-- purchase_options: struct -> map<string,string>
|-- time: timestamp

Nota

La compatibilidad con sugerencias de esquema de matriz y asignación está disponible en Databricks Runtime 9.1 LTS y Databricks Runtime 9.1 LTS Photon y posteriores.

Este es un ejemplo de un esquema deducido con tipos de datos complejos para ver el comportamiento con sugerencias de esquema. Esquema deducido:

|-- products: array<string>
|-- locations: array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string>
|-- names: map<string,string>
|-- prices: map<string,string>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int

Especificando las siguientes sugerencias de esquema:

.option("cloudFiles.schemaHints", "products ARRAY<INT>, locations.element STRING, users.element.id INT, ids MAP<STRING,INT>, names.key INT, prices.value INT, discounts.key.id INT, descriptions.value.content STRING")

recibirá lo siguiente:

|-- products: array<string> -> array<int>
|-- locations: array<int> -> array<string>
|-- users: array<struct>
|    |-- users.element: struct
|    |    |-- id: string -> int
|    |    |-- name: string
|    |    |-- dob: string
|-- ids: map<string,string> -> map<string,int>
|-- names: map<string,string> -> map<int,string>
|-- prices: map<string,string> -> map<string,int>
|-- discounts: map<struct,string>
|    |-- discounts.key: struct
|    |    |-- id: string -> int
|    |-- discounts.value: string
|-- descriptions: map<string,struct>
|    |-- descriptions.key: string
|    |-- descriptions.value: struct
|    |    |-- content: int -> string

Nota

Las sugerencias de esquema solo se usan si no se proporciona un esquema al cargador automático. Puede usar sugerencias de esquema si cloudFiles.inferColumnTypes está habilitada o deshabilitada.

Evolución del esquema

Auto Loader detecta la adición de nuevas columnas a medida que procesa los datos. De forma predeterminada, la adición de una nueva columna hará que las secuencias se detengan con UnknownFieldException . Antes de que la secuencia produce este error, El cargador automático realiza la inferencia de esquemas en el micro lote de datos más reciente y actualiza la ubicación del esquema con el esquema más reciente. Las columnas nuevas se combinan al final del esquema. Los tipos de datos de las columnas existentes permanecen sin cambios. Al establecer la secuencia del cargador automático dentro de Azure Databricks trabajo, puede hacer que la secuencia se reinicie automáticamente después de estos cambios de esquema.

Auto Loader admite los siguientes modos para la evolución del esquema, que se establecen en la opción cloudFiles.schemaEvolutionMode :

  • addNewColumns: modo predeterminado cuando no se proporciona un esquema al cargador automático. Se producirá un error en el trabajo de streaming con UnknownFieldException . Se agregan nuevas columnas al esquema. Las columnas existentes no evolucionan tipos de datos. addNewColumns no se permite cuando se proporciona el esquema de la secuencia. En su lugar, puede proporcionar el esquema como una sugerencia de esquema si desea usar este modo.
  • failOnNewColumns: si el cargador automático detecta una nueva columna, se producirá un error en la secuencia. No se reiniciará a menos que se actualice el esquema proporcionado o se quite el archivo de datos ofensido.
  • rescue: la secuencia se ejecuta con el primer esquema inferido o proporcionado. Los cambios de tipo de datos o las columnas nuevas que se agregan se rescató en la columna de datos rescatadas que se agrega automáticamente al esquema del flujo como . En este modo, la secuencia no producirá un error debido a cambios de esquema.
  • none: modo predeterminado cuando se proporciona un esquema. No evoluciona el esquema, se omiten las nuevas columnas y los datos no se rescaten a menos que la columna de datos rescatadas se proporciona por separado como opción.

Las columnas de partición no se tienen en cuenta para la evolución del esquema. Si tuviera una estructura de directorios inicial como y, a continuación, empezara a recibir nuevos archivos como , se omite la columna base_path/event=click/date=2021-04-01/f0.jsonbase_path/event=click/date=2021-04-01/hour=01/f1.json de hora. Para capturar información para nuevas columnas de partición, establezca cloudFiles.partitionColumns en event,date,hour .

Columna de datos rescatadas

La columna de datos rescatadas garantiza que nunca se pierden ni se pierden datos durante el ETL. La columna de datos rescatadas contiene los datos que no se analizaron, ya sea porque faltaba en el esquema dado, porque había un error de coincidencia de tipos o porque el uso de mayúsculas y minúsculas de la columna en el registro o archivo no coincide con ese en el esquema. La columna de datos rescatadas se devuelve como un blob JSON que contiene las columnas que se han rescatado y la ruta de acceso del archivo de origen del registro (la ruta de acceso del archivo de origen está disponible en Databricks Runtime 8.3 y posteriores). La columna de datos rescatadas forma parte del esquema devuelto por Auto Loader como de forma _rescued_data predeterminada cuando se infere el esquema. Puede cambiar el nombre de la columna o incluirla en los casos en los que proporcione un esquema estableciendo la opción rescuedDataColumn .

Puesto que el valor predeterminado de es y es cuando se infere el esquema, captura solo las columnas que tienen un caso diferente al cloudFiles.inferColumnTypesfalse del cloudFiles.schemaEvolutionModeaddNewColumnsrescuedDataColumn esquema.

Los analizadores JSON y CSV admiten tres modos al analizar registros: PERMISSIVEDROPMALFORMED , y FAILFAST . Cuando se usa junto con , las discrepancias de tipos de datos no hacen que los registros se desasoyen en modo rescuedDataColumn o que se produce un error en el DROPMALFORMEDFAILFAST modo. Solo se descartan o producen errores los registros dañados, es decir, JSON o CSV incompletos o con un formato incorrecta. Si usa al analizar JSON o CSV, las discrepancias de tipos de datos no se consideran como registros no badRecordsPath válidos al usar rescuedDataColumn . Solo los registros JSON o CSV incompletos y con formato incorrecta se almacenan en badRecordsPath .

Formatos de datos

Limitaciones

  • La evolución del esquema no se admite en las aplicaciones de Python Databricks Runtime 8.2 y 8.3 que usan foreachBatch . En su lugar, foreachBatch puede usar en Scala.

Casos de uso de ejemplo

Habilitación de ETL sencillo

Una manera fácil de obtener los datos en Delta Lake sin perder ningún dato es usar el siguiente patrón y habilitar la inferencia de esquemas con el cargador automático. Databricks recomienda ejecutar el código siguiente en un trabajo Azure Databricks para que reinicie automáticamente la secuencia cuando cambie el esquema de los datos de origen. De forma predeterminada, el esquema se deduce como tipos de cadena, los errores de análisis (no debería haber ninguno si todo permanece como una cadena) irán a y las columnas nuevas producirán un error en la secuencia y evolucionarán el _rescued_data esquema.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  .option("cloudFiles.schemaLocation", "<path_to_schema_location>") \
  .load("<path_to_source_data>") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path_to_checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", "<path_to_schema_location>")
  .load("<path_to_source_data>")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path_to_checkpoint>")
  .start("<path_to_target")

Evitar la pérdida de datos en datos bien estructurados

Cuando conoce el esquema, pero quiere saber cada vez que recibe datos inesperados, Databricks recomienda usar rescuedDataColumn .

Python

spark.readStream.format("cloudFiles") \
  .schema(expected_schema) \
  .option("cloudFiles.format", "json") \
  # will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue") \
  .load("<path_to_source_data>") \
  .writeStream \
  .option("checkpointLocation", "<path_to_checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .schema(expected_schema)
  .option("cloudFiles.format", "json")
  // will collect all new fields as well as data type mismatches in _rescued_data
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load("<path_to_source_data>")
  .writeStream
  .option("checkpointLocation", "<path_to_checkpoint>")
  .start("<path_to_target")

Si desea que la secuencia deje de procesarse si se introduce un nuevo campo que no coincide con el esquema, puede agregar:

.option("cloudFiles.schemaEvolutionMode", "failOnNewColumns")

Habilitación de canalizaciones de datos semiestructuradas flexibles

Cuando recibe datos de un proveedor que introduce nuevas columnas en la información que proporcionan, es posible que no tenga en cuenta exactamente cuándo lo hacen o que no tenga el ancho de banda para actualizar la canalización de datos. Ahora puede aprovechar la evolución del esquema para reiniciar la secuencia y permitir que el cargador automático actualice automáticamente el esquema deducido. También puede aprovechar schemaHints algunos de los campos "sin esquema" que puede proporcionar el proveedor.

Python

spark.readStream.format("cloudFiles") \
  .option("cloudFiles.format", "json") \
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT") \
  .load("/api/requests") \
  .writeStream \
  .option("mergeSchema", "true") \
  .option("checkpointLocation", "<path_to_checkpoint>") \
  .start("<path_to_target")

Scala

spark.readStream.format("cloudFiles")
  .option("cloudFiles.format", "json")
  // will ensure that the headers column gets processed as a map
  .option("cloudFiles.schemaHints",
          "headers map<string,string>, statusCode SHORT")
  .load("/api/requests")
  .writeStream
  .option("mergeSchema", "true")
  .option("checkpointLocation", "<path_to_checkpoint>")
  .start("<path_to_target")

Preguntas más frecuentes (P+F)

¿Cómo deduce el esquema el cargador automático?

Cuando se define el dataframe por primera vez, el cargador automático muestra el directorio de origen y elige los archivos más recientes (por tiempo de modificación de archivos) de 50 GB o 1000, y los usa para deducir el esquema de datos.

Auto Loader también deduce columnas de partición examinando la estructura de directorios de origen y busca rutas de acceso de archivo que contengan la /key=value/ estructura. Si el directorio de origen tiene una estructura incoherente, por ejemplo:

base/path/partition=1/date=2020-12-31/file1.json
// inconsistent because date and partition directories are in different orders
base/path/date=2020-12-31/partition=2/file2.json
// inconsistent because the date directory is missing
base/path/partition=3/file3.json

Auto Loader deduce las columnas de partición como vacías. Use cloudFiles.partitionColumns para analizar explícitamente columnas de la estructura de directorios.

¿Cómo se comporta Auto Loader cuando la carpeta de origen está vacía?

Si el directorio de origen está vacío, el cargador automático requiere que proporcione un esquema, ya que no hay datos para realizar la inferencia.

¿Cuándo deduce autocargador el esquema? ¿Evoluciona automáticamente después de cada micro lote?

El esquema se deduce cuando dataframe se define por primera vez dentro del código. Durante cada micro lote, los cambios de esquema se evalúan sobre la marcha; por lo tanto, no es necesario preocuparse por los aciertos de rendimiento. Cuando se reinicia la secuencia, recoge el esquema evolucionado de la ubicación del esquema y comienza a ejecutarse sin sobrecarga de inferencia.

¿Cuál es el impacto en el rendimiento al ingerir los datos al usar la inferencia de esquema del cargador automático?

Debe esperar que la inferencia de esquema tarde un par de minutos en directorios de origen muy grandes durante la inferencia inicial del esquema. De lo contrario, no se deben observar aciertos de rendimiento significativos durante la ejecución de la secuencia. Si ejecuta el código en un cuaderno de Azure Databricks, puede ver actualizaciones de estado que especifican cuándo el cargador automático enumerará el directorio para el muestreo y la inferencia del esquema de datos.

Debido a un error, un archivo no está en buen estado ha cambiado drásticamente el esquema. ¿Qué debo hacer para revertir un cambio de esquema?

Póngase en contacto con el soporte técnico de Databricks para obtener ayuda.