Share via


Ler e gravar arquivos XML

Importante

Esta funcionalidade está em Pré-visualização Pública.

Este artigo descreve como ler e gravar arquivos XML.

Extensible Markup Language (XML) é uma linguagem de marcação para formatação, armazenamento e compartilhamento de dados em formato textual. Ele define um conjunto de regras para serializar dados que variam de documentos a estruturas de dados arbitrárias.

O suporte nativo ao formato de arquivo XML permite a ingestão, consulta e análise de dados XML para processamento em lote ou streaming. Ele pode inferir e evoluir automaticamente esquemas e tipos de dados, suporta expressões SQL como from_xml, e pode gerar documentos XML. Ele não requer frascos externos e funciona perfeitamente com Auto Loader, read_files e COPY INTO.

Requisitos

Databricks Runtime 14.3 e superior

Analisar registros XML

A especificação XML exige uma estrutura bem formada. No entanto, essa especificação não é mapeada imediatamente para um formato tabular. Você deve especificar a rowTag opção para indicar o elemento XML que mapeia para um DataFrameRowarquivo . O rowTag elemento torna-se o nível structsuperior. Os elementos filho de rowTag tornar-se os campos do nível structsuperior.

Você pode especificar o esquema para esse registro ou deixá-lo ser inferido automaticamente. Como o analisador examina apenas os elementos, o DTD e as rowTag entidades externas são filtrados.

Os exemplos a seguir ilustram a inferência de esquema e a análise de um arquivo XML usando diferentes rowTag opções:

Python

xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""

xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)

Scala

val xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)

Leia o arquivo XML com rowTag opção como "livros":

Python

df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)

Scala

val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)

Resultado:

root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)

+------------------------------------------------------------------------------+
|book                                                                          |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+

Leia o arquivo XML com rowTag como "livro":

Python

df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:

Scala

val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:

Resultado:

root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)

+-----+-----------+---------------+
|_id  |author     |title          |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+

Opções de fonte de dados

As opções de fonte de dados para XML podem ser especificadas das seguintes maneiras:

Para obter uma lista de opções, consulte Opções do carregador automático.

Suporte a XSD

Opcionalmente, você pode validar cada registro XML de nível de linha por uma definição de esquema XML (XSD). O arquivo XSD é especificado na rowValidationXSDPath opção. O XSD não afeta o esquema fornecido ou inferido. Um registro que falha na validação é marcado como "corrompido" e tratado com base na opção de modo de tratamento de registro corrompido descrita na seção de opções.

Você pode usar XSDToSchema para extrair um esquema Spark DataFrame de um arquivo XSD. Ele suporta apenas tipos simples, complexos e de sequência, e suporta apenas a funcionalidade XSD básica.

import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path

val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
  <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
    <xs:element name="book">
      <xs:complexType>
        <xs:sequence>
          <xs:element name="author" type="xs:string" />
          <xs:element name="title" type="xs:string" />
          <xs:element name="genre" type="xs:string" />
          <xs:element name="price" type="xs:decimal" />
          <xs:element name="publish_date" type="xs:date" />
          <xs:element name="description" type="xs:string" />
        </xs:sequence>
        <xs:attribute name="id" type="xs:string" use="required" />
      </xs:complexType>
    </xs:element>
  </xs:schema>"""

dbutils.fs.put(xsdPath, xsdString, true)

val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))

A tabela a seguir mostra a conversão de tipos de dados XSD em tipos de dados do Spark:

Tipos de dados XSD Tipos de dados do Spark
boolean BooleanType
decimal DecimalType
unsignedLong DecimalType(38, 0)
double DoubleType
float FloatType
byte ByteType
short, unsignedByte ShortType
integer, negativeInteger, nonNegativeInteger, nonPositiveInteger, positiveInteger, unsignedShort IntegerType
long, unsignedInt LongType
date DateType
dateTime TimestampType
Others StringType

Analisar XML aninhado

Os dados XML em uma coluna com valor de cadeia de caracteres em um DataFrame existente podem ser analisados e schema_of_xmlfrom_xml isso retorna o esquema e os resultados analisados como novas struct colunas. Os dados XML passados como um argumento para schema_of_xml e from_xml devem ser um único registro XML bem formado.

schema_of_xml

Sintaxe

schema_of_xml(xmlStr [, options] )

Argumentos

  • xmlStr: Uma expressão STRING especificando um único registro XML bem formado.
  • options: Um literal opcional MAP<STRING,STRING> especificando diretivas.

Devoluções

Um STRING que contém uma definição de um struct com n campos de strings onde os nomes de coluna são derivados do elemento XML e nomes de atributo. Os valores de campo contêm os tipos SQL formatados derivados.

from_xml

Sintaxe

from_xml(xmlStr, schema [, options])

Argumentos

  • xmlStr: Uma expressão STRING especificando um único registro XML bem formado.
  • schema: Uma expressão STRING ou invocação da schema_of_xml função.
  • options: Um literal opcional MAP<STRING,STRING> especificando diretivas.

Devoluções

Uma estrutura com nomes de campo e tipos correspondentes à definição de esquema. O esquema deve ser definido como nome de coluna separado por vírgulas e pares de tipos de dados, conforme usado, por exemplo, CREATE TABLEem . A maioria das opções mostradas nas opções da fonte de dados é aplicável com as seguintes exceções:

  • rowTag: Como há apenas um registro XML, a rowTag opção não é aplicável.
  • mode (padrão: PERMISSIVE): Permite um modo para lidar com registros corrompidos durante a análise.
    • PERMISSIVE: Quando ele encontra um registro corrompido, coloca a cadeia de caracteres malformada em um campo configurado por columnNameOfCorruptRecord, e define campos malformados como null. Para manter registros corrompidos, você pode definir um campo de tipo de cadeia de caracteres nomeado columnNameOfCorruptRecord em um esquema definido pelo usuário. Se um esquema não tiver o campo, ele descartará registros corrompidos durante a análise. Ao inferir um esquema, ele adiciona implicitamente um columnNameOfCorruptRecord campo em um esquema de saída.
    • FAILFAST: Lança uma exceção quando ela atende a registros corrompidos.

Conversão de estruturas

Devido às diferenças de estrutura entre DataFrame e XML, existem algumas regras de conversão de dados XML de e DataFrame para DataFrame dados XML. Observe que a manipulação de atributos pode ser desabilitada com a opção excludeAttribute.

Conversão de XML para DataFrame

Atributos: Os atributos são convertidos como campos com o prefixo attributePrefixdo título .

<one myOneAttrib="AAAA">
  <two>two</two>
  <three>three</three>
</one>

produz um esquema abaixo:

root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)

Dados de caracteres em um elemento que contém atributo(s) ou elemento(s) filho(s): eles são analisados no valueTag campo. Se houver várias ocorrências de dados de caracteres, o valueTag campo será convertido em um array tipo.

<one>
  <two myTwoAttrib="BBBBB">two</two>
  some value between elements
  <three>three</three>
  some other value between elements
</one>

produz um esquema abaixo:

root
 |-- _VALUE: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- two: struct (nullable = true)
 |    |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)

Conversão de DataFrame para XML

Elemento como uma matriz em uma matriz: Escrever um arquivo XML de DataFrame ter um campo ArrayType com seu elemento como ArrayType teria um campo aninhado adicional para o elemento. Isso não aconteceria na leitura e gravação de dados XML, mas na escrita de uma DataFrame leitura de outras fontes. Portanto, ida e volta na leitura e gravação de arquivos XML tem a mesma estrutura, mas escrever uma DataFrame leitura de outras fontes é possível ter uma estrutura diferente.

DataFrame com um esquema abaixo:

|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)

e com os dados abaixo:

+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+

produz um arquivo XML abaixo:

<a>
  <item>aa</item>
</a>
<a>
  <item>bb</item>
</a>

O nome do elemento da matriz sem nome no DataFrame é especificado pela opção arrayElementName (Padrão: item).

Coluna de dados resgatados

A coluna de dados resgatados garante que você nunca perca ou perca dados durante o ETL. Você pode habilitar a coluna de dados resgatados para capturar quaisquer dados que não foram analisados porque um ou mais campos em um registro têm um dos seguintes problemas:

  • Ausente do esquema fornecido
  • Não corresponde ao tipo de dados do esquema fornecido
  • Tem uma incompatibilidade de maiúsculas e minúsculas com os nomes de campo no esquema fornecido

A coluna de dados resgatados é retornada como um documento JSON contendo as colunas que foram resgatadas e o caminho do arquivo de origem do registro. Para remover o caminho do arquivo de origem da coluna de dados resgatados, você pode definir a seguinte configuração SQL:

Python

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")

Scala

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").

Você pode habilitar a coluna de dados resgatados definindo a opção rescuedDataColumn como um nome de coluna ao ler dados, como _rescued_data com spark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>).

O analisador XML suporta três modos ao analisar registros: PERMISSIVE, DROPMALFORMEDe FAILFAST. Quando usado em conjunto com rescuedDataColumno , as incompatibilidades de tipo de dados não fazem com que os registros sejam descartados no DROPMALFORMED modo ou gerem um erro no FAILFAST modo. Somente registros corrompidos (XML incompleto ou malformado) são descartados ou geram erros.

Inferência de esquema e evolução no Auto Loader

Para obter uma discussão detalhada deste tópico e das opções aplicáveis, consulte Configurar inferência e evolução de esquema no Auto Loader. Você pode configurar o Auto Loader para detetar automaticamente o esquema de dados XML carregados, permitindo inicializar tabelas sem declarar explicitamente o esquema de dados e evoluir o esquema de tabela à medida que novas colunas são introduzidas. Isso elimina a necessidade de rastrear e aplicar manualmente as alterações de esquema ao longo do tempo.

Por padrão, a inferência de esquema do Auto Loader procura evitar problemas de evolução do esquema devido a incompatibilidades de tipo. Para formatos que não codificam tipos de dados (JSON, CSV e XML), o Auto Loader infere todas as colunas como cadeias de caracteres, incluindo campos aninhados em arquivos XML. O Apache Spark DataFrameReader usa um comportamento diferente para inferência de esquema, selecionando tipos de dados para colunas em fontes XML com base em dados de exemplo. Para habilitar esse comportamento com o Auto Loader, defina a opção cloudFiles.inferColumnTypes como true.

O Auto Loader deteta a adição de novas colunas à medida que processa os seus dados. Quando o Auto Loader deteta uma nova coluna, o fluxo para com um UnknownFieldExceptionarquivo . Antes de o fluxo lançar esse erro, o Auto Loader executa a inferência de esquema no microlote de dados mais recente e atualiza o local do esquema com o esquema mais recente mesclando novas colunas no final do esquema. Os tipos de dados das colunas existentes permanecem inalterados. Auto Loader suporta diferentes modos para a evolução do esquema, que você define na opção cloudFiles.schemaEvolutionMode.

Você pode usar dicas de esquema para impor as informações de esquema que você conhece e espera em um esquema inferido. Quando você souber que uma coluna é de um tipo de dados específico, ou se quiser escolher um tipo de dados mais geral (por exemplo, um duplo em vez de um inteiro), poderá fornecer um número arbitrário de dicas para tipos de dados de coluna como uma cadeia de caracteres usando a sintaxe de especificação do esquema SQL. Quando a coluna de dados resgatados é habilitada, os campos nomeados em um caso diferente do do esquema são carregados na _rescued_data coluna. Você pode alterar esse comportamento definindo a opção readerCaseSensitive como false, caso em que o Auto Loader lê dados de forma que não diferencia maiúsculas de minúsculas.

Exemplos

Os exemplos nesta seção usam um arquivo XML disponível para download no repositório Apache Spark GitHub.

Ler e escrever XML

Python

df = (spark.read
  .format('xml')
  .options(rowTag='book')
  .load(xmlPath))  # books.xml

selected_data = df.select("author", "_id")
(selected_data.write
  .options(rowTag='book', rootTag='books')
  .xml('newbooks.xml'))

Scala

val df = spark.read
  .option("rowTag", "book")
  .xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("newbooks.xml")

R

df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

Você pode especificar manualmente o esquema ao ler dados:

Python

from pyspark.sql.types import StructType, StructField, StringType, DoubleType

custom_schema = StructType([
    StructField("_id", StringType(), True),
    StructField("author", StringType(), True),
    StructField("description", StringType(), True),
    StructField("genre", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("publish_date", StringType(), True),
    StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)

selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')

Scala

import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}

val customSchema = StructType(Array(
  StructField("_id", StringType, nullable = true),
  StructField("author", StringType, nullable = true),
  StructField("description", StringType, nullable = true),
  StructField("genre", StringType, nullable = true),
  StructField("price", DoubleType, nullable = true),
  StructField("publish_date", StringType, nullable = true),
  StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")

R

customSchema <- structType(
  structField("_id", "string"),
  structField("author", "string"),
  structField("description", "string"),
  structField("genre", "string"),
  structField("price", "double"),
  structField("publish_date", "string"),
  structField("title", "string"))

df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

API de SQL

A fonte de dados XML pode inferir tipos de dados:

DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;

Você também pode especificar nomes e tipos de colunas em DDL. Neste caso, o esquema não é inferido automaticamente.

DROP TABLE IF EXISTS books;

CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");

Carregue XML usando COPY INTO

DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;

COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');

Ler XML com validação de linha

Python

df = (spark.read
    .format("xml")
    .option("rowTag", "book")
    .option("rowValidationXSDPath", xsdPath)
    .load(inputPath))
df.printSchema()

Scala

val df = spark.read
  .option("rowTag", "book")
  .option("rowValidationXSDPath", xsdPath)
  .xml(inputPath)
df.printSchema

Analisar XML aninhado (from_xml e schema_of_xml)

Python

from pyspark.sql.functions import from_xml, schema_of_xml, lit, col

xml_data = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>
"""

df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()

Scala

import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}

val xmlData = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>""".stripMargin

val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()

from_xml e schema_of_xml com a API SQL

SELECT from_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>',
  schema_of_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>')
);

Carregar XML com o carregador automático

Python

query = (spark
  .readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "xml")
    .option("rowTag", "book")
    .option("cloudFiles.inferColumnTypes", True)
    .option("cloudFiles.schemaLocation", schemaPath)
    .option("cloudFiles.schemaEvolutionMode", "rescue")
    .load(inputPath)
    .writeStream
    .format("delta")
    .option("mergeSchema", "true")
    .option("checkpointLocation", checkPointPath)
    .trigger(Trigger.AvailableNow()))

query = query.start(outputPath).awaitTermination()
df = spark.read.format("delta").load(outputPath)
df.show()

Scala

val query = spark
.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "xml")
  .option("rowTag", "book")
  .option("cloudFiles.inferColumnTypes", true)
  .option("cloudFiles.schemaLocation", schemaPath)
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load(inputPath)
  .writeStream
  .format("delta")
  .option("mergeSchema", "true")
  .option("checkpointLocation", checkPointPath)
  .trigger(Trigger.AvailableNow())

query.start(outputPath).awaitTermination()
val df = spark.read.format("delta").load(outputPath)
df.show()

Recursos adicionais

Ler e gravar dados XML usando a biblioteca spark-xml