Share via


Avro dosyası

Apache Avro bir veri serileştirme sistemidir. Avro şu bilgileri sağlar:

  • Zengin veri yapıları.
  • Kompakt, hızlı, ikili veri biçimi.
  • Kalıcı verileri depolamak için bir kapsayıcı dosyası.
  • Uzaktan yordam çağrısı (RPC).
  • Dinamik dillerle basit tümleştirme. Kod oluşturma, veri dosyalarını okumak veya yazmak ya da RPC protokollerini kullanmak veya uygulamak için gerekli değildir. İsteğe bağlı bir iyileştirme olarak kod oluşturma, yalnızca statik olarak yazılan diller için uygulamaya değer.

Avro veri kaynağı aşağıdakileri destekler:

  • Şema dönüştürme: Apache Spark SQL ve Avro kayıtları arasında otomatik dönüştürme.
  • Bölümleme: Ek yapılandırma olmadan bölümlenmiş verileri kolayca okuma ve yazma.
  • Sıkıştırma: Diske Avro yazarken kullanılacak sıkıştırma. Desteklenen türler , snappyve deflate'lerdiruncompressed. Ayrıca, kullanımdan kaldırma düzeyini de belirtebilirsiniz.
  • Kayıt adları: ve ile recordName parametre eşlemesi geçirerek adı ve recordNamespacead alanını kaydedin.

Ayrıca bkz . Akış Avro verilerini okuma ve yazma.

Yapılandırma

Çeşitli yapılandırma parametrelerini kullanarak Avro veri kaynağının davranışını değiştirebilirsiniz.

Okurken uzantı olmadan .avro dosyaları yoksaymak için Hadoop yapılandırmasında parametresini avro.mapred.ignore.inputs.without.extension ayarlayabilirsiniz. Varsayılan değer: false.

spark
  .sparkContext
  .hadoopConfiguration
  .set("avro.mapred.ignore.inputs.without.extension", "true")

Yazarken sıkıştırmayı yapılandırmak için aşağıdaki Spark özelliklerini ayarlayın:

  • Sıkıştırma codec'i: spark.sql.avro.compression.codec. Desteklenen codec'ler ve deflate'tirsnappy. Varsayılan codec bileşenidir snappy.
  • Sıkıştırma codec'i ise deflate, sıkıştırma düzeyini şu şekilde ayarlayabilirsiniz: spark.sql.avro.deflate.level. Varsayılan düzey: -1.

Bu özellikleri kullanarak küme Spark yapılandırmasında veya çalışma zamanında spark.conf.set()ayarlayabilirsiniz. Örneğin:

spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

Databricks Runtime 9.1 LTS ve üzeri için, dosyaları okurken seçeneğini sağlayarak mergeSchema Avro'daki varsayılan şema çıkarımı davranışını değiştirebilirsiniz. ayarı mergeSchematrue , hedef dizindeki bir Avro dosyaları kümesinden şema çıkartır ve tek bir dosyadan okuma şemasını çıkarsamak yerine bunları birleştirir.

Avro için desteklenen türler -> Spark SQL dönüşümü

Bu kitaplık tüm Avro türlerinin okunmasını destekler. Avro türlerinden Spark SQL türlerine aşağıdaki eşlemeyi kullanır:

Avro türü Spark SQL türü
boolean BooleanType
int IntegerType
uzun LongType
kayan noktalı sayı FloatType
çift DoubleType
bayt BinaryType
Dize StringType
Kayıt StructType
enum StringType
dizi ArrayType
map MapType
Sabit BinaryType
birleşim Bkz. Birleşim türleri.

Birleşim türleri

Avro veri kaynağı okuma union türlerini destekler. Avro, aşağıdaki üç türü tür olarak union kabul eder:

  • union(int, long)LongTypeile eşler.
  • union(float, double)DoubleTypeile eşler.
  • union(something, null), burada something desteklenen herhangi bir Avro türüdür. Bu, ile aynı Spark SQL türüne somethingeşler nullable ve olarak ayarlanır true.

Diğer union tüm türler karmaşık türlerdir. Alan adlarının, üyelerine unionuygun olarak , member1vb. olduğu yere eşlenirStructType.member0 Bu, Avro ile Parquet arasında dönüştürme sırasındaki davranışla tutarlıdır.

Mantıksal türler

Avro veri kaynağı aşağıdaki Avro mantıksal türlerinin okunmasını destekler:

Avro mantıksal türü Avro türü Spark SQL türü
tarih int Datetype
zaman damgası milis uzun Zaman Damgası Türü
zaman damgası mikroları uzun Zaman Damgası Türü
ondalık Sabit Ondalık Türü
ondalık bayt Ondalık Türü

Dekont

Avro veri kaynağı, Avro dosyasında bulunan belgeleri, diğer adları ve diğer özellikleri yoksayar.

Spark SQL için desteklenen türler -> Avro dönüştürme

Bu kitaplık, tüm Spark SQL türlerinin Avro'ya yazılma özelliğini destekler. Çoğu tür için Spark türlerinden Avro türlerine eşleme basittir (örneğin IntegerType , öğesine dönüştürülür int); aşağıdaki birkaç özel durum listesidir:

Spark SQL türü Avro türü Avro mantıksal türü
ByteType int
ShortType int
BinaryType bayt
Ondalık Türü Sabit ondalık
Zaman Damgası Türü uzun zaman damgası mikroları
Datetype int tarih

Spark SQL türlerinin diğer Avro türlerine dönüştürülebilmesi için çıkış Avro şemasının tamamını seçeneğiyle avroSchemade belirtebilirsiniz. Aşağıdaki dönüştürmeler varsayılan olarak uygulanmaz ve kullanıcı tarafından belirtilen Avro şemasını gerektirir:

Spark SQL türü Avro türü Avro mantıksal türü
ByteType Sabit
StringType enum
Ondalık Türü bayt ondalık
Zaman Damgası Türü uzun zaman damgası milis

Örnekler

Bu örneklerde episodes.avro dosyası kullanılır.

Scala

// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records

val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")

Bu örnekte özel bir Avro şeması gösterilmektedir:

import org.apache.avro.Schema

val schema = new Schema.Parser().parse(new File("episode.avsc"))

spark
  .read
  .format("avro")
  .option("avroSchema", schema.toString)
  .load("/tmp/episodes.avro")
  .show()

Bu örnekte Avro sıkıştırma seçenekleri gösterilmektedir:

// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

val df = spark.read.format("avro").load("/tmp/episodes.avro")

// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")

Bu örnekte bölümlenmiş Avro kayıtları gösterilmektedir:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().master("local").getOrCreate()

val df = spark.createDataFrame(
  Seq(
    (2012, 8, "Batman", 9.8),
    (2012, 8, "Hero", 8.7),
    (2012, 7, "Robot", 5.5),
    (2011, 7, "Git", 2.0))
  ).toDF("year", "month", "title", "rating")

df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")

Bu örnekte kayıt adı ve ad alanı gösterilmektedir:

val df = spark.read.format("avro").load("/tmp/episodes.avro")

val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)

df.write.options(parameters).format("avro").save("/tmp/output")

Python

# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")

#  Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")

SQL

SQL'de Avro verilerini sorgulamak için veri dosyasını tablo veya geçici görünüm olarak kaydedin:

CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")

SELECT * from episodes

Not defteri örneği: Avro dosyalarını okuma ve yazma

Aşağıdaki not defteri Avro dosyalarının nasıl okunduğunu ve yazılıp yazılamını gösterir.

Avro dosyaları not defterini okuma ve yazma

Not defterini alma