Read and write streaming Avro data

Important

This feature is in Public Preview.

Apache Avro is a commonly used data serialization system in the streaming world. A typical solution is to put data in Avro format in Apache Kafka, metadata in Confluent Schema Registry, and then run queries with a streaming framework that connects to both Kafka and Schema Registry.

Azure Databricks supports the from_avro and to_avro functions to build streaming pipelines with Avro data in Kafka and metadata in Schema Registry. The function to_avro encodes a column as binary in Avro format and from_avro decodes Avro binary data into a column. Both functions transform one column to another column, and the input/output SQL data type can be a complex type or a primitive type.

Note

The from_avro and to_avro functions:

  • Are available in Python, Scala, and Java.
  • Can be passed to SQL functions in both batch and streaming queries.

Also see Avro file data source.

Basic example

Similar to from_json and to_json, you can use from_avro and to_avro with any binary column, but you must specify the Avro schema manually.

import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder

// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
    from_avro($"value", SchemaBuilder.builder().intType()).as("value"))

// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
  .select(
    to_avro($"key").as("key"),
    to_avro($"value").as("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("article", "t")
  .save()

jsonFormatSchema example

You can also specify a schema as a JSON string. For example, if /tmp/user.avsc is:

{
  "namespace": "example.avro",
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "favorite_color", "type": ["string", "null"]}
  ]
}

You can create a JSON string:

from pyspark.sql.avro.functions import from_avro, to_avro

jsonFormatSchema = open("/tmp/user.avsc", "r").read()

Then use the schema in from_avro:

# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.

output = df\
  .select(from_avro("value", jsonFormatSchema).alias("user"))\
  .where('user.favorite_color == "red"')\
  .select(to_avro("user.name").alias("value"))

Example with Schema Registry

If your cluster has a Schema Registry service, from_avro can work with it so that you don’t need to specify the Avro schema manually.

Note

Integration with Schema Registry is available only in Scala and Java.

import org.apache.spark.sql.avro.functions._

// Read a Kafka topic "t", assuming the key and value are already
// registered in Schema Registry as subjects "t-key" and "t-value" of type
// string and int. The binary key and value columns are turned into string
// and int type with Avro and Schema Registry. The schema of the resulting DataFrame
// is: <key: string, value: int>.
val schemaRegistryAddr = "https://myhost:8081"
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr).as("value"))

For to_avro, the default output Avro schema might not match the schema of the target subject in the Schema Registry service for the following reasons:

  • The mapping from Spark SQL type to Avro schema is not one-to-one. See Supported types for Spark SQL -> Avro conversion.
  • If the converted output Avro schema is of record type, the record name is topLevelRecord and there is no namespace by default.

If the default output schema of to_avro matches the schema of the target subject, you can do the following:

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("article", "t")
.save()

Otherwise, you must provide the schema of the target subject in the to_avro function:

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("article", "t")
.save()