Búferes de protocolo de lectura y escritura

Azure Databricks proporciona compatibilidad nativa con la serialización y deserialización entre estructuras de Apache Spark y búferes de protocolo (protobuf). La compatibilidad con protobuf se implementa como un transformador dataFrame de Apache Spark y se puede usar con Structured Streaming o para operaciones por lotes.

Deserialización y serialización de búferes de protocolo

En Databricks Runtime 12.2 LTS y versiones posteriores, puede usar las funciones from_protobuf y to_protobuf para serializar y deserializar datos. La serialización de protobuf se usa normalmente en cargas de trabajo de streaming.

La sintaxis básica de las funciones protobuf es similar para las funciones de lectura y escritura. Debe importar estas funciones antes de usarlas.

from_protobuf convierte una columna binaria en un struct y to_protobuf convierte una columna struct en binaria. Debe proporcionar un registro de esquema especificado con el argumento options o un archivo descriptor identificado por el argumento descFilePath.

Python

from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

Scala

// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

En los ejemplos siguientes se muestra cómo procesar registros protobuf binarios con from_protobuf() y convertir la estructura de Spark SQL en protobuf binario con to_protobuf().

Uso de protobuf con el registro de esquema de Confluent

Azure Databricks admite el uso del Registro de esquemas de Confluent para definir Protobuf.

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

schema_registry_options = {
  "schema.registry.subject" : "app-events-value",
  "schema.registry.address" : "https://schema-registry:8081/"
}

# Convert binary Protobuf to SQL struct with from_protobuf():
proto_events_df = (
  input_df
    .select(
      from_protobuf("proto_bytes", options = schema_registry_options)
        .alias("proto_event")
    )
)

# Convert SQL struct to binary Protobuf with to_protobuf():
protobuf_binary_df = (
  proto_events_df
    .selectExpr("struct(name, id, context) as event")
    .select(
      to_protobuf("event", options = schema_registry_options)
        .alias("proto_bytes")
    )
)

Scala

import org.apache.spark.sql.protobuf.functions._
import scala.collection.JavaConverters._

val schemaRegistryOptions = Map(
    "schema.registry.subject" -> "app-events-value",
    "schema.registry.address" -> "https://schema-registry:8081/"
)

// Convert binary Protobuf to SQL struct with from_protobuf():
val protoEventsDF = inputDF
    .select(
        from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
            .as("proto_event")
    )

// Convert SQL struct to binary Protobuf with to_protobuf():
val protobufBinaryDF = protoEventsDF
    .selectExpr("struct(name, id, context) as event")
    .select(
        to_protobuf($"event", options = schemaRegistryOptions.asJava)
            .as("proto_bytes")
    )

Autenticación en un registro de esquema de Confluent externo

Para autenticarse en un registro de esquema de Confluent externo, actualice las opciones del registro de esquema para incluir credenciales de autenticación y claves de API.

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
    "confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)

Uso del almacén de confianza y los archivos de almacén de claves en volúmenes de Unity Catalog

En Databricks Runtime 14.3 LTS y posteriores, puede usar archivos de almacén de confianza y almacén de claves en volúmenes de Unity Catalog para autenticarse en un registro de esquemas de Confluent. Actualice las opciones del registro de esquema según el ejemplo siguiente:

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
    "confluent.schema.registry.ssl.truststore.password" : "<password>",
    "confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
    "confluent.schema.registry.ssl.keystore.password" : "<password>",
    "confluent.schema.registry.ssl.key.password" : "<password>"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "<password>",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
      "confluent.schema.registry.ssl.keystore.password" -> "<password>",
      "confluent.schema.registry.ssl.key.password" -> "<password>"
)

Uso de protobuf con un archivo descriptor

También puede hacer referencia a un archivo descriptor protobuf que está disponible para el clúster de proceso. Asegúrese de que tiene los permisos adecuados para leer el archivo, en función de su ubicación.

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

descriptor_file = "/path/to/proto_descriptor.desc"

proto_events_df = (
    input_df.select(
      from_protobuf(input_df.value, "BasicMessage", descFilePath=descriptor_file).alias("proto")
    )
)

proto_binary_df = (
  proto_events_df
    .select(
      to_protobuf(proto_events_df.proto, "BasicMessage", descriptor_file).alias("bytes")
    )
)

Scala

import org.apache.spark.sql.protobuf.functions._

val descriptorFile = "/path/to/proto_descriptor.desc"

val protoEventsDF = inputDF
  .select(
    from_protobuf($"value", "BasicMessage", descFilePath=descriptorFile).as("proto")
  )

val protoBytesDF = protoEventsDF
  .select(
    to_protobuf($"proto", "BasicMessage", descriptorFile).as("bytes")
  )

Opciones admitidas en las funciones Protobuf

Las siguientes opciones se admiten en las funciones Protobuf.

  • modo: Determina cómo se controlan los errores al deserializar los registros Protobuf. Los errores pueden deberse a varios tipos de registros con formato incorrecto, incluido un error de coincidencia entre el esquema real del registro y el esquema esperado proporcionado en from_protobuf().
    • Valores:
      • FAILFAST(valor predeterminado): se produce un error cuando se encuentra un registro con formato incorrecto y se produce un error en la tarea.
      • PERMISSIVE: se devuelve un valor NULL para los registros con formato incorrecto. Use esta opción cuidadosamente, ya que puede provocar la eliminación de muchos registros. Esto resulta útil cuando una pequeña fracción de los registros del origen es incorrecta.
  • recursive.fields.max.depth: Agrega soporte con campos recursivos. Los esquemas SQL de Spark no admiten campos recursivos. Cuando no se especifica esta opción, no se permiten campos recursivos. Para admitir campos recursivos en Protobufs, deben expandirse a una profundidad especificada.
    • Valores:

      • -1 (valor predeterminado): No se permiten campos recursivos.

      • 0: Se anulan los campos recursivos.

      • 1: Permite un único nivel de recursividad.

      • [2-10]: Especifique un umbral para varias recursividades, hasta 10 niveles.

        Si se establece un valor en mayor de 0, se pueden expandir los campos recursivos a la profundidad configurada. No se permiten valores mayores de 10 para evitar la creación involuntaria de esquemas muy grandes. Si un mensaje Protobuf tiene profundidad más allá del límite configurado, la estructura de Spark devuelta se trunca después del límite de recursividad.

    • Ejemplo: Considere un Protobuf con el siguiente campo recursivo:

      message Person { string name = 1; Person friend = 2; }
      

      A continuación se lista el esquema final con diferentes valores para esta configuración:

      • Opción establecida en 1: STRUCT<name: STRING>
      • Opción establecida en 2: STRUCT<name STRING, friend: STRUCT<name: STRING>>
      • Opción establecida en 3: STRUCT<name STRING, friend: STRUCT<name STRING, friend: STRUCT<name: STRING>>>
  • convert.any.fields.to.json: esta opción habilita la conversión de los campos Protobuf Cualquier en JSON. Esta característica debe habilitarse cuidadosamente. La conversión y el procesamiento de JSON son ineficaces. Además, el campo de cadena JSON pierde la seguridad del esquema Protobuf, lo que hace que el procesamiento de bajada sea propenso a errores.
    • Valores:

      • False ( predeterminado): en tiempo de ejecución, estos campos con caracteres comodín pueden contener mensajes Protobuf arbitrarios como datos binarios. De manera predeterminada, estos campos se controlan como un mensaje Protobuf normal. Tiene dos campos con el esquema (STRUCT<type_url: STRING, value: BINARY>). De manera predeterminada, el campo binario value no se interpreta de ninguna manera. Pero es posible que los datos binarios no sean cómodos en la práctica para trabajar en algunas aplicaciones.
      • True: establecer este valor en True permite convertir Any campos en cadenas JSON en tiempo de ejecución. Con esta opción, el binario se analiza y el mensaje Protobuf se deserializa en una cadena JSON.
    • Ejemplo: Considere dos tipos Protobuf definidos de la manera siguiente:

      message ProtoWithAny {
         string event_name = 1;
         google.protobuf.Any details = 2;
      }
      
      message Person {
         string name = 1;
         int32 id = 2;
      }
      

      Con esta opción habilitada, el esquema de from_protobuf("col", messageName ="ProtoWithAny") sería: STRUCT<event_name: STRING, details: STRING>.

      En tiempo de ejecución, si details el campo contiene Person el mensaje Protobuf, el valor devuelto es similar al siguiente: ('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}').

    • Requisitos:

      • Las definiciones de todos los posibles tipos de Protobuf que se usan en Any campos deben estar disponibles en el archivo descriptor Protobuf pasado a from_protobuf().
      • Si Any no se encuentra Protobuf, se producirá un error para ese registro.
      • Esta característica no se admite actualmente con el registro de esquema.
  • emit.default.values: Habilita los campos de representación con cero valores al deserializar Protobuf en una estructura de Spark. Esta opción debe usarse con moderación. Normalmente no es aconsejable depender de estas diferencias más finas en la semántica.
    • Valores

      • False (valor predeterminado): cuando un campo está vacío en el Protobuf serializado, el campo resultante en la estructura de Spark es null de manera predeterminada. Es más fácil no habilitar esta opción y tratar null como el valor predeterminado.
      • True: cuando esta opción está habilitada, estos campos se rellenan con los valores predeterminados correspondientes.
    • Ejemplo: considere el siguiente Protobuf con el Protobuf construido como Person(age=0, middle_name=""):

      syntax = "proto3";
      
      message Person {
         string name = 1;
         int64 age = 2;
         optional string middle_name = 3;
         optional int64 salary = 4;
      }
      
      • Con esta opción establecida en False, la estructura de Spark después de llamar a from_protobuf() sería todos los valores null: {"name": null, "age": null, "middle_name": "", "salary": null}. Aunque dos campos (age y middle_name) tenían valores establecidos, Protobuf no los incluye en formato de conexión, ya que son valores predeterminados.
      • Con esta opción establecida en True, la estructura de Spark después de llamar a from_protobuf() sería: {"name": "", "age": 0, "middle_name": "", "salary": null}. El campo salary permanece null, ya que se declara explícitamente optional y no se establece en el registro de entrada.
  • enums.as.ints: cuando está habilitado, los campos de enumeración de Protobuf se representan como campos enteros en Spark.
    • Valores

      • False (valor predeterminado)
      • True: cuando está habilitado, los campos de enumeración de Protobuf se representan como campos enteros en Spark.
    • Ejemplo: tenga en cuenta lo siguiente:

      syntax = "proto3";
      
      message Person {
         enum Job {
           NONE = 0;
           ENGINEER = 1;
           DOCTOR = 2;
           NURSE = 3;
         }
         Job job = 1;
      }
      

      Dado un mensaje Protobuf como Person(job = ENGINEER):

      • Con esta opción deshabilitada, la estructura de Spark correspondiente sería {"job": "ENGINEER"}.
      • Con esta opción habilitada, la estructura de Spark correspondiente sería {"job": 1}.

      Observe que el esquema de estos campos es diferente en cada caso (entero en lugar de cadena predeterminada). Este cambio puede afectar al esquema de las tablas de bajada.

Opciones del Registro de esquema

Las siguientes opciones del Registro de esquema son relevantes al usar el registro de esquema con funciones Protobuf.

  • schema.registry.subject
    • Obligatorio
    • Especifica el asunto del esquema en el Registro de esquemas, como "client-event"
  • schema.registry.address
    • Obligatorio
    • Dirección URL del registro de esquema, comohttps://schema-registry.example.com:8081
  • schema.registry.protobuf.name
    • Opcionales
    • Predeterminado: <NONE>.
    • Una entrada del Registro de esquema para un sujeto puede contener varias definiciones de Protobuf, al igual que un solo archivoproto. Cuando no se especifica esta opción, se usa el primer Protobuf para el esquema. Especifique el nombre del mensaje Protobuf cuando no sea el primero de la entrada. Por ejemplo, considere una entrada con dos definiciones de Protobuf: “Persona” y “Ubicación” en ese orden. Si la secuencia corresponde a “Ubicación” en lugar de “Persona”, establezca esta opción en “Ubicación” (o su nombre completo, incluido el paquete “com.example.protos.Location”).
  • schema.registry.schema.evolution.mode
    • Valor predeterminado: “reiniciar”.
    • Modos admitidos:
      • “Reiniciar”
      • “Ninguno”
    • Esta opción establece el modo de evolución del esquema para from_protobuf(). Al principio de una consulta, Spark registra el identificador de esquema más reciente para el asunto especificado. Esto determina el esquema de from_protobuf(). Es posible que se publique un nuevo esquema en el registro de esquema después de que se inicie la consulta. Cuando se observa un identificador de esquema más reciente en un registro entrante, indica un cambio en el esquema. Esta opción determina cómo se controla este cambio en el esquema:
      • reiniciar (valor predeterminado): Desencadena un UnknownFieldException cuando se observa un identificador de esquema más reciente. Esto finaliza la consulta. Databricks recomienda configurar flujos de trabajo para reiniciarse en caso de error de consulta para recoger los cambios de esquema.
      • ninguno: se omiten los cambios del identificador de esquema. Los registros con el identificador de esquema más reciente se analizan con el mismo esquema que se observó al principio de la consulta. Se espera que las nuevas definiciones de Protobuf sean compatibles con versiones anteriores, y los nuevos campos se omiten.
  • confluent.schema.registry.<schema-registy-client-option>
    • Opcionales
    • Registro de esquemas se conecta a registro de esquemas Confluent mediante el cliente del Registro de esquemas de Confluent. Las opciones de configuración admitidas por el cliente se pueden especificar con el prefijo "confluent.schema.registry". Por ejemplo, las dos opciones siguientes proporcionan credenciales de autenticación de "USER_INFO":
      • "confluent.schema.registry.basic.auth.credentials.source": "USER_INFO"
      • "confluent.schema.registry.basic.auth.user.info": "<KEY> : <SECRET>"