Share via


XML dosyalarını okuma ve yazma

Önemli

Bu özellik Genel Önizlemededir.

Bu makalede XML dosyalarının nasıl okunduğu ve yazıldığı açıklanır.

Genişletilebilir biçimlendirme dili (XML), verileri metin biçiminde biçimlendirmeye, depolamaya ve paylaşmaya yönelik bir işaretleme dilidir. Belgelerden rastgele veri yapılarına kadar değişen verileri seri hale getirmek için bir dizi kural tanımlar.

Yerel XML dosya biçimi desteği, xml verilerini toplu işleme veya akış için alma, sorgulama ve ayrıştırma işlemlerini etkinleştirir. Şema ve veri türlerini otomatik olarak çıkarıp geliştirebilir, gibi from_xmlSQL ifadelerini destekler ve XML belgeleri oluşturabilir. Dış jar gerektirmez ve Otomatik Yükleyici read_files ve COPY INTOile sorunsuz çalışır.

Gereksinimler

Databricks Runtime 14.3 ve üzeri

XML kayıtlarını ayrıştırma

XML belirtimi iyi biçimlendirilmiş bir yapıyı zorunlu kullanır. Ancak, bu belirtim hemen tablosal bir biçimle eşlenemez. bir ile rowTag eşleyen XML öğesini belirtmek için DataFrameRowseçeneğini belirtmeniz gerekir. rowTag öğesi en üst düzey structolur. öğesinin rowTag alt öğeleri, en üst düzey structalanının alanları haline gelir.

Bu kaydın şemasını belirtebilir veya otomatik olarak çıkarılmalarını sağlayabilirsiniz. Ayrıştırıcı yalnızca öğeleri incelediğinden rowTag , DTD ve dış varlıklar filtrelenir.

Aşağıdaki örneklerde, farklı rowTag seçenekler kullanılarak xml dosyasının şema çıkarımı ve ayrıştırılması gösterilmektedir:

Python

xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""

xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString, True)

Scala

val xmlString = """
  <books>
    <book id="bk103">
      <author>Corets, Eva</author>
      <title>Maeve Ascendant</title>
    </book>
    <book id="bk104">
      <author>Corets, Eva</author>
      <title>Oberon's Legacy</title>
    </book>
  </books>"""
val xmlPath = "dbfs:/tmp/books.xml"
dbutils.fs.put(xmlPath, xmlString)

XML dosyasını "kitaplar" seçeneğiyle rowTag okuyun:

Python

df = spark.read.option("rowTag", "books").format("xml").load(xmlPath)
df.printSchema()
df.show(truncate=False)

Scala

val df = spark.read.option("rowTag", "books").xml(xmlPath)
df.printSchema()
df.show(truncate=false)

Çıkış:

root
|-- book: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _id: string (nullable = true)
| | |-- author: string (nullable = true)
| | |-- title: string (nullable = true)

+------------------------------------------------------------------------------+
|book                                                                          |
+------------------------------------------------------------------------------+
|[{bk103, Corets, Eva, Maeve Ascendant}, {bk104, Corets, Eva, Oberon's Legacy}]|
+------------------------------------------------------------------------------+

XML dosyasını rowTag "book" olarak okuyun:

Python

df = spark.read.option("rowTag", "book").format("xml").load(xmlPath)
# Infers three top-level fields and parses `book` in separate rows:

Scala

val df = spark.read.option("rowTag", "book").xml(xmlPath)
// Infers three top-level fields and parses `book` in separate rows:

Çıkış:

root
|-- _id: string (nullable = true)
|-- author: string (nullable = true)
|-- title: string (nullable = true)

+-----+-----------+---------------+
|_id  |author     |title          |
+-----+-----------+---------------+
|bk103|Corets, Eva|Maeve Ascendant|
|bk104|Corets, Eva|Oberon's Legacy|
+-----+-----------+---------------+

Veri kaynağı seçenekleri

XML için veri kaynağı seçenekleri aşağıdaki yollarla belirtilebilir:

  • Aşağıdaki .option/.options yöntemleri:
    • DataFrameReader
    • DataFrameWriter
    • DataStreamReader
    • DataStreamWriter
  • Aşağıdaki yerleşik işlevler:
  • OPTIONS CREATE TABLE USING DATA_SOURCE yan tümcesi

Seçeneklerin listesi için bkz . Otomatik Yükleyici seçenekleri.

XSD desteği

İsteğe bağlı olarak her satır düzeyi XML kaydını bir XML Şema Tanımı (XSD) ile doğrulayabilirsiniz. Seçeneğinde rowValidationXSDPath XSD dosyası belirtilir. XSD, sağlanan veya çıkarılmış şemayı başka şekilde etkilemez. Doğrulamada başarısız olan bir kayıt "bozuk" olarak işaretlenir ve seçenek bölümünde açıklanan bozuk kayıt işleme modu seçeneğine göre işlenir.

Bir XSD dosyasından Spark DataFrame şeması ayıklamak için kullanabilirsiniz XSDToSchema . Yalnızca basit, karmaşık ve sıra türlerini destekler ve yalnızca temel XSD işlevlerini destekler.

import org.apache.spark.sql.execution.datasources.xml.XSDToSchema
import org.apache.hadoop.fs.Path

val xsdPath = "dbfs:/tmp/books.xsd"
val xsdString = """<?xml version="1.0" encoding="UTF-8" ?>
  <xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
    <xs:element name="book">
      <xs:complexType>
        <xs:sequence>
          <xs:element name="author" type="xs:string" />
          <xs:element name="title" type="xs:string" />
          <xs:element name="genre" type="xs:string" />
          <xs:element name="price" type="xs:decimal" />
          <xs:element name="publish_date" type="xs:date" />
          <xs:element name="description" type="xs:string" />
        </xs:sequence>
        <xs:attribute name="id" type="xs:string" use="required" />
      </xs:complexType>
    </xs:element>
  </xs:schema>"""

dbutils.fs.put(xsdPath, xsdString, true)

val schema1 = XSDToSchema.read(xsdString)
val schema2 = XSDToSchema.read(new Path(xsdPath))

Aşağıdaki tabloda XSD veri türlerinin Spark veri türlerine dönüştürülmesi gösterilmektedir:

XSD Veri Türleri Spark Veri Türleri
boolean BooleanType
decimal DecimalType
unsignedLong DecimalType(38, 0)
double DoubleType
float FloatType
byte ByteType
short, unsignedByte ShortType
integer, negativeInteger, nonNegativeInteger, , nonPositiveInteger, positiveInteger, unsignedShort IntegerType
long, unsignedInt LongType
date DateType
dateTime TimestampType
Others StringType

İç içe XML ayrıştırma

Mevcut DataFrame'deki dize değerli bir sütundaki XML verileri ile schema_of_xml ayrıştırılabilir ve from_xml şemayı ve ayrıştırılan sonuçları yeni struct sütunlar olarak döndürür. ve bağımsız schema_of_xmlfrom_xml değişkeni olarak geçirilen XML verileri tek bir iyi biçimlendirilmiş XML kaydı olmalıdır.

schema_of_xml

Söz dizimi

schema_of_xml(xmlStr [, options] )

Bağımsız Değişkenler

  • xmlStr: İyi biçimlendirilmiş tek bir XML kaydı belirten STRING ifadesi.
  • options: İsteğe bağlı MAP<STRING,STRING> değişmez değer belirtme yönergeleri.

İadeler

Sütun adlarının XML öğesinden ve öznitelik adlarından türetildiği n dize alanı içeren bir yapının tanımını tutan STRING. Alan değerleri türetilmiş biçimlendirilmiş SQL türlerini barındırır.

from_xml

Söz dizimi

from_xml(xmlStr, schema [, options])

Bağımsız Değişkenler

  • xmlStr: İyi biçimlendirilmiş tek bir XML kaydı belirten STRING ifadesi.
  • schema: BIR STRING ifadesi veya işlevi çağırma schema_of_xml .
  • options: İsteğe bağlı MAP<STRING,STRING> değişmez değer belirtme yönergeleri.

İadeler

Şema tanımıyla eşleşen alan adlarını ve türlerini içeren bir yapı. Şema, içinde kullanıldığı gibi virgülle ayrılmış sütun adı ve veri türü çiftleri olarak tanımlanmalıdır. Örneğin, CREATE TABLE. Veri kaynağı seçeneklerinde gösterilen çoğu seçenek aşağıdaki özel durumlar için geçerlidir:

  • rowTag: Yalnızca bir XML kaydı olduğundan seçeneği rowTag geçerli değildir.
  • mode (varsayılan: PERMISSIVE): Ayrıştırma sırasında bozuk kayıtlarla ilgilenmek için bir moda izin verir.
    • PERMISSIVE: Bozuk bir kaydı karşıladığında, hatalı biçimlendirilmiş dizeyi tarafından columnNameOfCorruptRecordyapılandırılan bir alana yerleştirir ve hatalı biçimlendirilmiş alanları olarak nullayarlar. Bozuk kayıtları tutmak için, kullanıcı tanımlı şemada adlı columnNameOfCorruptRecord bir dize türü alanı ayarlayabilirsiniz. Bir şemanın alanı yoksa, ayrıştırma sırasında bozuk kayıtları bırakır. Şema çıkarıldığında, bir çıktı şemasına örtük olarak bir columnNameOfCorruptRecord alan ekler.
    • FAILFAST: Bozuk kayıtları karşıladığında bir özel durum oluşturur.

Yapı dönüştürme

DataFrame ile XML arasındaki yapı farklılıkları nedeniyle, XML verilerinden XML verilerine ve DataFrame XML verilerine DataFrame bazı dönüştürme kuralları vardır. Öznitelikleri işlemenin seçeneğiyle excludeAttributedevre dışı bırakılabildiğini unutmayın.

XML'den DataFrame'e dönüştürme

Öznitelikler: Öznitelikler, başlık ön ekine attributePrefixsahip alanlar olarak dönüştürülür.

<one myOneAttrib="AAAA">
  <two>two</two>
  <three>three</three>
</one>

aşağıdaki şemayı oluşturur:

root
|-- _myOneAttrib: string (nullable = true)
|-- two: string (nullable = true)
|-- three: string (nullable = true)

Öznitelik veya alt öğe içeren bir öğedeki karakter verileri: Bunlar alana ayrıştırılır valueTag . Karakter verilerinin birden çok tekrarı varsa, valueTag alan bir array türe dönüştürülür.

<one>
  <two myTwoAttrib="BBBBB">two</two>
  some value between elements
  <three>three</three>
  some other value between elements
</one>

aşağıdaki şemayı oluşturur:

root
 |-- _VALUE: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- two: struct (nullable = true)
 |    |-- _VALUE: string (nullable = true)
| |-- _myTwoAttrib: string (nullable = true)
|-- three: string (nullable = true)

DataFrame'den XML'ye dönüştürme

Dizideki bir dizi olarak öğe: Öğesiyle birlikte bir alana ArrayType sahip bir DataFrame XML dosyası yazma, öğesi ArrayType için ek bir iç içe alan olması gibi. Bu durum XML verilerini okurken ve yazarken değil, diğer kaynaklardan okuma yazarken DataFrame meydana gelir. Bu nedenle, XML dosyalarını okuma ve yazmada gidiş dönüş aynı yapıya sahiptir, ancak başka kaynaklardan okuma yazmak DataFrame farklı bir yapıya sahip olabilir.

Aşağıdaki şemaya sahip DataFrame:

|-- a: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)

ve aşağıdaki verilerle:

+------------------------------------+
| a|
+------------------------------------+
|[WrappedArray(aa), WrappedArray(bb)]|
+------------------------------------+

aşağıda bir XML dosyası oluşturur:

<a>
  <item>aa</item>
</a>
<a>
  <item>bb</item>
</a>

içindeki DataFrame adlandırılmamış dizinin öğe adı seçeneğiyle arrayElementName belirtilir (Varsayılan: item).

Kurtarılan veri sütunu

Kurtarılan veri sütunu, ETL sırasında verileri asla kaybetmenizi veya kaçırmamanızı sağlar. Bir kayıttaki bir veya daha fazla alanda aşağıdaki sorunlardan biri olduğundan, kurtarılan veri sütununu ayrıştırılmayan verileri yakalamak için etkinleştirebilirsiniz:

  • Sağlanan şemada yok
  • Sağlanan şemanın veri türüyle eşleşmiyor
  • Sağlanan şemadaki alan adlarıyla büyük/küçük harf uyuşmazlığı var

Kurtarılan veri sütunu, kurtarılan sütunları ve kaydın kaynak dosya yolunu içeren bir JSON belgesi olarak döndürülür. Kurtarılan veri sütunundan kaynak dosya yolunu kaldırmak için aşağıdaki SQL yapılandırmasını ayarlayabilirsiniz:

Python

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")

Scala

spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false").

Ile gibi _rescued_dataspark.read.option("rescuedDataColumn", "_rescued_data").format("xml").load(<path>)verileri okurken seçeneğini rescuedDataColumn bir sütun adı olarak ayarlayarak kurtarılan veri sütununu etkinleştirebilirsiniz.

XML ayrıştırıcısı kayıtları ayrıştırırken üç modu destekler: PERMISSIVE, DROPMALFORMEDve FAILFAST. ile rescuedDataColumnbirlikte kullanıldığında, veri türü uyuşmazlıkları kayıtların modda DROPMALFORMED bırakılmasına veya modda hata FAILFAST oluşturmasına neden olmaz. Yalnızca bozuk kayıtlar (eksik veya yanlış biçimlendirilmiş XML) bırakılır veya hata oluşturur.

Otomatik Yükleyici'de şema çıkarımı ve evrimi

Bu konu başlığı ve geçerli seçenekler hakkında ayrıntılı bilgi için bkz . Otomatik Yükleyici'de şema çıkarımı ve evrimini yapılandırma. Otomatik Yükleyici'yi, yüklenen XML verilerinin şemasını otomatik olarak algılayarak veri şemasını açıkça bildirmeden tabloları başlatmanıza ve yeni sütunlar kullanıma sunulduğunda tablo şemasını geliştirmenize olanak tanıyacak şekilde yapılandırabilirsiniz. Bu, şema değişikliklerini zaman içinde el ile izleme ve uygulama gereksinimini ortadan kaldırır.

Varsayılan olarak, Otomatik Yükleyici şema çıkarımı, tür uyuşmazlıkları nedeniyle şema evrimi sorunlarından kaçınmayı arar. Veri türlerini (JSON, CSV ve XML) kodlamamış biçimler için Otomatik Yükleyici, XML dosyalarındaki iç içe yerleştirilmiş alanlar da dahil olmak üzere tüm sütunları dize olarak çıkarsar. Apache Spark DataFrameReader şema çıkarımı için farklı bir davranış kullanır ve örnek verilere göre XML kaynaklarındaki sütunlar için veri türlerini seçer. Otomatik Yükleyici ile bu davranışı etkinleştirmek için seçeneğini cloudFiles.inferColumnTypes olarak trueayarlayın.

Otomatik Yükleyici, verilerinizi işlerken yeni sütunların eklenmesini algılar. Otomatik Yükleyici yeni bir sütun algıladığında akış ile UnknownFieldExceptiondurdurulur. Akışınız bu hatayı oluşturmadan önce, Otomatik Yükleyici en son mikro veri toplu işleminde şema çıkarımını gerçekleştirir ve şemanın sonuna yeni sütunları birleştirerek şema konumunu en son şemayla güncelleştirir. Mevcut sütunların veri türleri değişmeden kalır. Otomatik Yükleyici, seçeneğinde cloudFiles.schemaEvolutionModeayarladığınız şema evrimi için farklı modları destekler.

Bildiğiniz ve çıkarım yapılan bir şemada beklediğiniz şema bilgilerini zorunlu kılmak için şema ipuçlarını kullanabilirsiniz. Sütunun belirli bir veri türünde olduğunu biliyorsanız veya daha genel bir veri türü (örneğin, tamsayı yerine çift) seçmek istiyorsanız, SQL şema belirtimi söz dizimini kullanarak sütun veri türleri için dize olarak rastgele sayıda ipucu sağlayabilirsiniz. Kurtarılan veri sütunu etkinleştirildiğinde, şemanın dışında bir durumda adlandırılan alanlar sütuna _rescued_data yüklenir. Seçeneğini olarak ayarlayarak readerCaseSensitivefalsebu davranışı değiştirebilirsiniz. Bu durumda Otomatik Yükleyici verileri büyük/küçük harfe duyarlı olmayan bir şekilde okur.

Örnekler

Bu bölümdeki örneklerde Apache Spark GitHub deposunda indirilebilen bir XML dosyası kullanılmaktadır.

XML okuma ve yazma

Python

df = (spark.read
  .format('xml')
  .options(rowTag='book')
  .load(xmlPath))  # books.xml

selected_data = df.select("author", "_id")
(selected_data.write
  .options(rowTag='book', rootTag='books')
  .xml('newbooks.xml'))

Scala

val df = spark.read
  .option("rowTag", "book")
  .xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write
  .option("rootTag", "books")
  .option("rowTag", "book")
  .xml("newbooks.xml")

R

df <- loadDF("books.xml", source = "xml", rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

Verileri okurken şemayı el ile belirtebilirsiniz:

Python

from pyspark.sql.types import StructType, StructField, StringType, DoubleType

custom_schema = StructType([
    StructField("_id", StringType(), True),
    StructField("author", StringType(), True),
    StructField("description", StringType(), True),
    StructField("genre", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("publish_date", StringType(), True),
    StructField("title", StringType(), True)
])
df = spark.read.options(rowTag='book').xml('books.xml', schema = customSchema)

selected_data = df.select("author", "_id")
selected_data.write.options(rowTag='book', rootTag='books').xml('newbooks.xml')

Scala

import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}

val customSchema = StructType(Array(
  StructField("_id", StringType, nullable = true),
  StructField("author", StringType, nullable = true),
  StructField("description", StringType, nullable = true),
  StructField("genre", StringType, nullable = true),
  StructField("price", DoubleType, nullable = true),
  StructField("publish_date", StringType, nullable = true),
  StructField("title", StringType, nullable = true)))
val df = spark.read.option("rowTag", "book").schema(customSchema).xml(xmlPath) // books.xml

val selectedData = df.select("author", "_id")
selectedData.write.option("rootTag", "books").option("rowTag", "book").xml("newbooks.xml")

R

customSchema <- structType(
  structField("_id", "string"),
  structField("author", "string"),
  structField("description", "string"),
  structField("genre", "string"),
  structField("price", "double"),
  structField("publish_date", "string"),
  structField("title", "string"))

df <- loadDF("books.xml", source = "xml", schema = customSchema, rowTag = "book")
# In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
saveDF(df, "newbooks.xml", "xml", "overwrite")

SQL API'si

XML veri kaynağı veri türlerini çıkarsayabilir:

DROP TABLE IF EXISTS books;
CREATE TABLE books
USING XML
OPTIONS (path "books.xml", rowTag "book");
SELECT * FROM books;

DDL'de sütun adlarını ve türlerini de belirtebilirsiniz. Bu durumda şema otomatik olarak çıkarılmaz.

DROP TABLE IF EXISTS books;

CREATE TABLE books (author string, description string, genre string, _id string,
price double, publish_date string, title string)
USING XML
OPTIONS (path "books.xml", rowTag "book");

COPY INTO kullanarak XML yükleme

DROP TABLE IF EXISTS books;
CREATE TABLE IF NOT EXISTS books;

COPY INTO books
FROM "/FileStore/xmltestDir/input/books.xml"
FILEFORMAT = XML
FORMAT_OPTIONS ('mergeSchema' = 'true', 'rowTag' = 'book')
COPY_OPTIONS ('mergeSchema' = 'true');

Satır doğrulama ile XML okuma

Python

df = (spark.read
    .format("xml")
    .option("rowTag", "book")
    .option("rowValidationXSDPath", xsdPath)
    .load(inputPath))
df.printSchema()

Scala

val df = spark.read
  .option("rowTag", "book")
  .option("rowValidationXSDPath", xsdPath)
  .xml(inputPath)
df.printSchema

İç içe XML ayrıştırma (from_xml ve schema_of_xml)

Python

from pyspark.sql.functions import from_xml, schema_of_xml, lit, col

xml_data = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>
"""

df = spark.createDataFrame([(8, xml_data)], ["number", "payload"])
schema = schema_of_xml(df.select("payload").limit(1).collect()[0][0])
parsed = df.withColumn("parsed", from_xml(col("payload"), schema))
parsed.printSchema()
parsed.show()

Scala

import org.apache.spark.sql.functions.{from_xml,schema_of_xml,lit}

val xmlData = """
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>""".stripMargin

val df = Seq((8, xmlData)).toDF("number", "payload")
val schema = schema_of_xml(xmlData)
val parsed = df.withColumn("parsed", from_xml($"payload", schema))
parsed.printSchema()
parsed.show()

SQL API ile from_xml ve schema_of_xml

SELECT from_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>',
  schema_of_xml('
  <book id="bk103">
    <author>Corets, Eva</author>
    <title>Maeve Ascendant</title>
    <genre>Fantasy</genre>
    <price>5.95</price>
    <publish_date>2000-11-17</publish_date>
  </book>')
);

Otomatik Yükleyici ile XML Yükleme

Python

query = (spark
  .readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "xml")
    .option("rowTag", "book")
    .option("cloudFiles.inferColumnTypes", True)
    .option("cloudFiles.schemaLocation", schemaPath)
    .option("cloudFiles.schemaEvolutionMode", "rescue")
    .load(inputPath)
    .writeStream
    .format("delta")
    .option("mergeSchema", "true")
    .option("checkpointLocation", checkPointPath)
    .trigger(Trigger.AvailableNow()))

query = query.start(outputPath).awaitTermination()
df = spark.read.format("delta").load(outputPath)
df.show()

Scala

val query = spark
.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "xml")
  .option("rowTag", "book")
  .option("cloudFiles.inferColumnTypes", true)
  .option("cloudFiles.schemaLocation", schemaPath)
  .option("cloudFiles.schemaEvolutionMode", "rescue")
  .load(inputPath)
  .writeStream
  .format("delta")
  .option("mergeSchema", "true")
  .option("checkpointLocation", checkPointPath)
  .trigger(Trigger.AvailableNow())

query.start(outputPath).awaitTermination()
val df = spark.read.format("delta").load(outputPath)
df.show()

Ek kaynaklar

Spark-xml kitaplığını kullanarak XML verilerini okuma ve yazma