Avro 파일

Apache Avro는 데이터 직렬화 시스템입니다. Avro는 다음을 제공합니다.

  • 풍부한 데이터 구조.
  • 작고 빠른 이진 데이터 형식입니다.
  • 영구 데이터를 저장하기 위한 컨테이너 파일입니다.
  • RPC(원격 프로시저 호출)입니다.
  • 동적 언어와의 간단한 통합. 데이터 파일을 읽거나 쓰거나 RPC 프로토콜을 사용하거나 구현하기 위해 코드 생성이 필요하지 않습니다. 선택적인 최적화로서의 코드 생성은 정적으로 유형이 지정된 언어에 대해서만 구현할 가치가 있습니다.

Avro 데이터 원본는 다음을 지원합니다.

  • 스키마 변환: Apache Spark SQL과 Avro 레코드 간의 자동 변환.
  • 분할: 추가 구성 없이 파티션된 데이터를 쉽게 읽고 쓸 수 있습니다.
  • 압축: Avro를 디스크에 쓸 때 사용할 압축입니다. 지원되는 형식은 uncompressed, snappydeflate입니다. 수축 수준을 지정할 수도 있습니다.
  • 레코드 이름: recordNamerecordNamespace를 사용하여 매개 변수 맵을 전달하여 이름 및 네임스페이스를 기록합니다.

스트리밍 Avro 데이터 읽기 및 쓰기도 참조하세요.

구성

다양한 구성 매개 변수를 사용하여 Avro 데이터 원본의 동작을 변경할 수 있습니다.

읽을 때 .avro 확장자가 없는 파일을 무시하려면 Hadoop 구성에서 매개 변수 avro.mapred.ignore.inputs.without.extension를 설정할 수 있습니다. 기본값은 false입니다.

spark
  .sparkContext
  .hadoopConfiguration
  .set("avro.mapred.ignore.inputs.without.extension", "true")

작성 시 압축을 구성하려면 다음 Spark 속성을 설정합니다.

  • 압축 코덱: spark.sql.avro.compression.codec. 지원되는 코덱은 snappydeflate입니다. 기본 코덱은 snappy입니다.
  • 압축 코덱이 deflate인 경우 spark.sql.avro.deflate.level을 사용하여 압축 수준을 설정할 수 있습니다. 기본 수준은 -1입니다.

이러한 속성은 클러스터 Spark 구성에서 또는 런타임에 spark.conf.set()을 사용하여 설정할 수 있습니다. 예시:

spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

Databricks Runtime 9.1 LTS 이상의 경우 파일을 읽을 때 mergeSchema 옵션을 제공하여 Avro의 기본 스키마 유추 동작을 변경할 수 있습니다. mergeSchematrue로 설정하면 단일 파일에서 읽기 스키마를 유추하는 대신 대상 디렉터리의 Avro 파일 집합에서 스키마를 유추하고 병합합니다.

Avro -> Spark SQL 변환에 지원되는 유형

이 라이브러리는 모든 Avro 유형 읽기를 지원합니다. Avro 유형에서 Spark SQL 형식으로의 다음 매핑을 사용합니다.

Avro 유형 Spark SQL 형식
부울 값 BooleanType
int IntegerType
long LongType
float FloatType
double DoubleType
bytes BinaryType
string StringType
녹화 StructType
열거형 StringType
배열 ArrayType
map Maptype
fixed BinaryType
union 공용 구조체 유형을 참조하세요.

공용 구조체 유형

Avro 데이터 원본은 읽기 union 유형을 지원합니다. Avro는 다음 세 가지 유형을 union 유형으로 간주합니다.

  • union(int, long)LongType에 매핑됩니다.
  • union(float, double)DoubleType에 매핑됩니다.
  • union(something, null), 여기서 something은 지원되는 Avro 유형입니다. 이는 something과 동일한 Spark SQL 형식에 매핑되며 nullabletrue로 설정됩니다.

다른 모든 union 유형은 복합 형식입니다. union의 멤버에 따라 필드 이름이 member0, member1 등인 StructType에 매핑됩니다. 이는 Avro와 Parquet 간에 변환할 때의 동작과 일치합니다.

논리적 형식

Avro 데이터 원본은 다음 Avro 논리 유형 읽기를 지원합니다.

Avro 논리 유형 Avro 유형 Spark SQL 형식
날짜 int DateType
timestamp-millis long TimestampType
timestamp-micros long TimestampType
decimal fixed DecimalType
decimal bytes DecimalType

참고 항목

Avro 데이터 원본은 Avro 파일에 있는 문서, 별칭 및 기타 속성을 무시합니다.

Spark SQL에 지원되는 유형 -> Avro 변환

이 라이브러리는 Avro에 모든 Spark SQL 형식 쓰기를 지원합니다. 대부분의 유형에서 Spark 유형에서 Avro 유형으로의 매핑은 간단합니다(예: IntegerTypeint로 변환됨). 다음은 몇 가지 특별한 경우의 목록입니다.

Spark SQL 형식 Avro 유형 Avro 논리 유형
ByteType int
ShortType int
BinaryType bytes
DecimalType fixed decimal
TimestampType long timestamp-micros
DateType int 날짜

Spark SQL 형식을 다른 Avro 유형으로 변환할 수 있도록 avroSchema 옵션을 사용하여 전체 출력 Avro 스키마를 지정할 수도 있습니다. 다음 변환은 기본적으로 적용되지 않으며 사용자 지정 Avro 스키마가 필요합니다.

Spark SQL 형식 Avro 유형 Avro 논리 유형
ByteType fixed
StringType 열거형
DecimalType bytes decimal
TimestampType long timestamp-millis

예제

이 예에서는 episodes.avro 파일을 사용합니다.

Scala

// The Avro records are converted to Spark types, filtered, and
// then written back out as Avro records

val df = spark.read.format("avro").load("/tmp/episodes.avro")
df.filter("doctor > 5").write.format("avro").save("/tmp/output")

이 예에서는 사용자 지정 Avro 스키마를 보여 줍니다.

import org.apache.avro.Schema

val schema = new Schema.Parser().parse(new File("episode.avsc"))

spark
  .read
  .format("avro")
  .option("avroSchema", schema.toString)
  .load("/tmp/episodes.avro")
  .show()

이 예에서는 Avro 압축 옵션을 보여 줍니다.

// configuration to use deflate compression
spark.conf.set("spark.sql.avro.compression.codec", "deflate")
spark.conf.set("spark.sql.avro.deflate.level", "5")

val df = spark.read.format("avro").load("/tmp/episodes.avro")

// writes out compressed Avro records
df.write.format("avro").save("/tmp/output")

이 예는 분할된 Avro 레코드를 보여 줍니다.

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().master("local").getOrCreate()

val df = spark.createDataFrame(
  Seq(
    (2012, 8, "Batman", 9.8),
    (2012, 8, "Hero", 8.7),
    (2012, 7, "Robot", 5.5),
    (2011, 7, "Git", 2.0))
  ).toDF("year", "month", "title", "rating")

df.toDF.write.format("avro").partitionBy("year", "month").save("/tmp/output")

이 예에서는 레코드 이름과 네임스페이스를 보여 줍니다.

val df = spark.read.format("avro").load("/tmp/episodes.avro")

val name = "AvroTest"
val namespace = "org.foo"
val parameters = Map("recordName" -> name, "recordNamespace" -> namespace)

df.write.options(parameters).format("avro").save("/tmp/output")

Python

# Create a DataFrame from a specified directory
df = spark.read.format("avro").load("/tmp/episodes.avro")

#  Saves the subset of the Avro records read in
subset = df.where("doctor > 5")
subset.write.format("avro").save("/tmp/output")

SQL

SQL에서 Avro 데이터를 쿼리하려면 데이터 파일을 테이블 또는 임시 보기로 등록합니다.

CREATE TEMPORARY VIEW episodes
USING avro
OPTIONS (path "/tmp/episodes.avro")

SELECT * from episodes

Notebook 예제: Avro 파일 읽기 및 쓰기

다음 Notebook은 Avro 파일을 읽고 쓰는 방법을 보여 줍니다.

Avro 파일 Notebook 읽기 및 쓰기

전자 필기장 가져오기