Bufory protokołu odczytu i zapisu

Usługa Azure Databricks zapewnia natywną obsługę serializacji i deserializacji między strukturami platformy Apache Spark i buforami protokołu (protobuf). Obsługa protobuf jest implementowana jako transformator ramki danych platformy Apache Spark i może być używana z przesyłaniem strumieniowym ze strukturą lub operacjami wsadowymi.

Jak deserializować i serializować bufory protokołu

W środowisku Databricks Runtime 12.2 LTS i nowszym można używać from_protobuf funkcji i to_protobuf do serializacji i deserializacji danych. Serializacja Protobuf jest często używana w obciążeniach przesyłania strumieniowego.

Podstawowa składnia funkcji protobuf jest podobna dla funkcji odczytu i zapisu. Przed użyciem należy zaimportować te funkcje.

from_protobuf rzutuje kolumnę binarną na strukturę i to_protobuf rzutuje kolumnę struktury na binarne. Musisz podać rejestr schematów określony z argumentem options lub plikiem deskryptora descFilePath zidentyfikowanym przez argument.

Python

from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

Scala

// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

W poniższych przykładach przedstawiono przetwarzanie binarnych rekordów protobuf za pomocą from_protobuf() i konwertowanie struktury Spark SQL na binarne protobuf za pomocą polecenia to_protobuf().

Używanie narzędzia protobuf z rejestrem schematów platformy Confluent

Usługa Azure Databricks obsługuje definiowanie narzędzia Protobuf przy użyciu rejestru schematów confluent.

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

schema_registry_options = {
  "schema.registry.subject" : "app-events-value",
  "schema.registry.address" : "https://schema-registry:8081/"
}

# Convert binary Protobuf to SQL struct with from_protobuf():
proto_events_df = (
  input_df
    .select(
      from_protobuf("proto_bytes", options = schema_registry_options)
        .alias("proto_event")
    )
)

# Convert SQL struct to binary Protobuf with to_protobuf():
protobuf_binary_df = (
  proto_events_df
    .selectExpr("struct(name, id, context) as event")
    .select(
      to_protobuf("event", options = schema_registry_options)
        .alias("proto_bytes")
    )
)

Scala

import org.apache.spark.sql.protobuf.functions._
import scala.collection.JavaConverters._

val schemaRegistryOptions = Map(
    "schema.registry.subject" -> "app-events-value",
    "schema.registry.address" -> "https://schema-registry:8081/"
)

// Convert binary Protobuf to SQL struct with from_protobuf():
val protoEventsDF = inputDF
    .select(
        from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
            .as("proto_event")
    )

// Convert SQL struct to binary Protobuf with to_protobuf():
val protobufBinaryDF = protoEventsDF
    .selectExpr("struct(name, id, context) as event")
    .select(
        to_protobuf($"event", options = schemaRegistryOptions.asJava)
            .as("proto_bytes")
    )

Uwierzytelnianie w zewnętrznym rejestrze schematów confluent

Aby uwierzytelnić się w zewnętrznym rejestrze schematów confluent, zaktualizuj opcje rejestru schematów, aby uwzględnić poświadczenia uwierzytelniania i klucze interfejsu API.

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
    "confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)

Używanie plików magazynu zaufania i magazynu kluczy w woluminach wykazu aparatu Unity

W środowisku Databricks Runtime 14.3 LTS i nowszym można użyć plików magazynu zaufania i magazynu kluczy w woluminach wykazu aparatu Unity, aby uwierzytelnić się w rejestrze schematów platformy Confluent. Zaktualizuj opcje rejestru schematów zgodnie z poniższym przykładem:

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
    "confluent.schema.registry.ssl.truststore.password" : "<password>",
    "confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
    "confluent.schema.registry.ssl.keystore.password" : "<password>",
    "confluent.schema.registry.ssl.key.password" : "<password>"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "<password>",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
      "confluent.schema.registry.ssl.keystore.password" -> "<password>",
      "confluent.schema.registry.ssl.key.password" -> "<password>"
)

Używanie narzędzia Protobuf z plikiem deskryptora

Możesz również odwołać się do pliku deskryptora protobuf dostępnego dla klastra obliczeniowego. Upewnij się, że masz odpowiednie uprawnienia do odczytu pliku, w zależności od jego lokalizacji.

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

descriptor_file = "/path/to/proto_descriptor.desc"

proto_events_df = (
    input_df.select(
      from_protobuf(input_df.value, "BasicMessage", descFilePath=descriptor_file).alias("proto")
    )
)

proto_binary_df = (
  proto_events_df
    .select(
      to_protobuf(proto_events_df.proto, "BasicMessage", descriptor_file).alias("bytes")
    )
)

Scala

import org.apache.spark.sql.protobuf.functions._

val descriptorFile = "/path/to/proto_descriptor.desc"

val protoEventsDF = inputDF
  .select(
    from_protobuf($"value", "BasicMessage", descFilePath=descriptorFile).as("proto")
  )

val protoBytesDF = protoEventsDF
  .select(
    to_protobuf($"proto", "BasicMessage", descriptorFile).as("bytes")
  )

Obsługiwane opcje w funkcjach Protobuf

Następujące opcje są obsługiwane w funkcjach Protobuf.

  • mode: określa, jak błędy podczas deserializacji rekordów Protobuf są obsługiwane. Błędy mogą być spowodowane przez różne typy źle sformułowanych rekordów, w tym niezgodność między rzeczywistym schematem rekordu a oczekiwanym schematem podanym w programie from_protobuf().
    • Wartości:
      • FAILFAST(ustawienie domyślne): w przypadku napotkania źle sformułowanego rekordu występuje błąd i zadanie kończy się niepowodzeniem.
      • PERMISSIVE: Zwracana jest wartość NULL dla nieprawidłowo sformułowanych rekordów. Użyj tej opcji ostrożnie, ponieważ może to spowodować usunięcie wielu rekordów. Jest to przydatne, gdy niewielki ułamek rekordów w źródle jest niepoprawny.
  • recursive.fields.max.depth: dodaje obsługę pól cyklicznych. Schematy Spark SQL nie obsługują pól cyklicznych. Jeśli ta opcja nie zostanie określona, pola cykliczne nie są dozwolone. Aby obsługiwać pola cyklicznego w protobufs, muszą one być rozszerzane do określonej głębokości.
    • Wartości:

      • -1 (wartość domyślna): Pola cykliczne nie są dozwolone.

      • 0: Pola cykliczne są porzucane.

      • 1: Umożliwia pojedynczy poziom rekursji.

      • [2–10]: Określ próg dla wielu rekursji, maksymalnie 10 poziomów.

        Ustawienie wartości większej niż 0 umożliwia cykliczne pola przez rozszerzenie zagnieżdżonych pól do skonfigurowanej głębokości. Wartości większe niż 10 nie są dozwolone, aby uniknąć przypadkowo tworzenia bardzo dużych schematów. Jeśli komunikat Protobuf przekracza skonfigurowany limit, zwrócona struktura platformy Spark zostanie obcięta po limicie rekursji.

    • Przykład: Rozważ użycie narzędzia Protobuf z następującym polem cyklisywnym:

      message Person { string name = 1; Person friend = 2; }
      

      Poniżej wymieniono schemat końcowy z różnymi wartościami dla tego ustawienia:

      • Opcja ustawiona na 1: STRUCT<name: STRING>
      • Opcja ustawiona na 2: STRUCT<name STRING, friend: STRUCT<name: STRING>>
      • Opcja ustawiona na 3: STRUCT<name STRING, friend: STRUCT<name STRING, friend: STRUCT<name: STRING>>>
  • convert.any.fields.to.json: ta opcja umożliwia konwertowanie pól Protobuf Any na format JSON. Ta funkcja powinna być dokładnie włączona. Konwersja i przetwarzanie JSON są nieefektywne. Ponadto pole ciągu JSON traci bezpieczeństwo schematu Protobuf, co sprawia, że przetwarzanie podrzędne jest podatne na błędy.
    • Wartości:

      • Fałsz (wartość domyślna): w czasie wykonywania takie pola wieloznaczne mogą zawierać dowolne komunikaty Protobuf jako dane binarne. Domyślnie takie pola są obsługiwane jak normalny komunikat Protobuf. Zawiera dwa pola ze schematem (STRUCT<type_url: STRING, value: BINARY>). Domyślnie pole binarne value nie jest interpretowane w żaden sposób. Jednak dane binarne mogą nie być wygodne w praktyce do pracy w niektórych aplikacjach.
      • Prawda: ustawienie tej wartości na True umożliwia konwertowanie Any pól na ciągi JSON w czasie wykonywania. Dzięki tej opcji plik binarny jest analizowany, a komunikat Protobuf jest deserializowany w ciągu JSON.
    • Przykład: Rozważ dwa typy Protobuf zdefiniowane w następujący sposób:

      message ProtoWithAny {
         string event_name = 1;
         google.protobuf.Any details = 2;
      }
      
      message Person {
         string name = 1;
         int32 id = 2;
      }
      

      Po włączeniu tej opcji schemat będzie from_protobuf("col", messageName ="ProtoWithAny") następujący: STRUCT<event_name: STRING, details: STRING>.

      W czasie wykonywania, jeśli details pole zawiera Person komunikat Protobuf, zwrócona wartość wygląda następująco: ('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}').

    • Wymagania:

      • Definicje wszystkich możliwych typów Protobuf używanych w Any polach powinny być dostępne w pliku deskryptora Protobuf przekazanym do from_protobuf().
      • Jeśli Any nie można odnaleźć narzędzia Protobuf, spowoduje to błąd dla tego rekordu.
      • Ta funkcja nie jest obecnie obsługiwana w przypadku rejestru schematów.
  • emit.default.values: włącza renderowanie pól z wartościami zerowymi podczas deserializacji protobuf do struktury spark. Ta opcja powinna być używana oszczędnie. Zwykle nie zaleca się, aby zależeć od takich drobnszych różnic w semantyce.
    • Wartości

      • Fałsz (wartość domyślna): jeśli pole jest puste w serializowanym protobuf, pole wynikowe w strukturę spark jest domyślnie zerowe. Łatwiej jest włączyć tę opcję i traktować null ją jako wartość domyślną.
      • Prawda: po włączeniu tej opcji takie pola są wypełniane odpowiednimi wartościami domyślnymi.
    • Przykład: rozważ następujące elementy Protobuf z konstrukcją Protobuf, na przykład Person(age=0, middle_name=""):

      syntax = "proto3";
      
      message Person {
         string name = 1;
         int64 age = 2;
         optional string middle_name = 3;
         optional int64 salary = 4;
      }
      
      • Jeśli ta opcja ma wartość False, struktura platformy Spark po wywołaniu from_protobuf() będzie miała wartość null: {"name": null, "age": null, "middle_name": "", "salary": null}. Mimo że dwa pola (age i middle_name) miały ustawione wartości, protobuf nie uwzględnia ich w formacie przewodowym, ponieważ są to wartości domyślne.
      • Jeśli ta opcja ma wartość True, struktura platformy Spark po wywołaniu from_protobuf() będzie następująca: {"name": "", "age": 0, "middle_name": "", "salary": null}. Pole salary pozostaje puste, ponieważ jest jawnie zadeklarowane optional i nie jest ustawione w rekordzie wejściowym.
  • enums.as.ints: po włączeniu pola wyliczenia w protobuf są renderowane jako pola liczb całkowitych na platformie Spark.
    • Wartości

      • False (domyślnie)
      • Prawda: po włączeniu pola wyliczenia w narzędziu Protobuf są renderowane jako pola liczb całkowitych na platformie Spark.
    • Przykład: Rozważ następujące elementy Protobuf:

      syntax = "proto3";
      
      message Person {
         enum Job {
           NONE = 0;
           ENGINEER = 1;
           DOCTOR = 2;
           NURSE = 3;
         }
         Job job = 1;
      }
      

      Nadaj komunikatowi Protobuf, takiego jak Person(job = ENGINEER):

      • Po wyłączeniu tej opcji odpowiednia struktura platformy Spark to {"job": "ENGINEER"}.
      • Po włączeniu tej opcji odpowiednia struktura platformy Spark to {"job": 1}.

      Zwróć uwagę, że schemat dla tych pól różni się w każdym przypadku (liczba całkowita, a nie ciąg domyślny). Taka zmiana może mieć wpływ na schemat tabel podrzędnych.

Opcje rejestru schematów

Poniższe opcje rejestru schematów są istotne podczas korzystania z rejestru schematów z funkcjami Protobuf.

  • schema.registry.subject
    • Wymagania
    • Określa temat schematu w rejestrze schematów, na przykład "client-event"
  • schema.registry.address
    • Wymagania
    • Adres URL rejestru schematów, taki jak https://schema-registry.example.com:8081
  • schema.registry.protobuf.name
    • Opcjonalnie
    • Wartość domyślna: <NONE>.
    • Wpis rejestru schematów dla tematu może zawierać wiele definicji Protobuf, podobnie jak pojedynczy proto plik. Jeśli ta opcja nie zostanie określona, pierwszy protobuf jest używany dla schematu. Określ nazwę komunikatu Protobuf, jeśli nie jest to pierwszy w wpisie. Rozważmy na przykład wpis z dwiema definicjami Protobuf: "Person" i "Location" w tej kolejności. Jeśli strumień odpowiada wartości "Location", a nie "Person", ustaw tę opcję na "Location" (lub jej pełną nazwę, w tym pakiet "com.example.protos.Location").
  • schema.registry.schema.evolution.mode
    • Ustawienie domyślne: "uruchom ponownie".
    • Obsługiwane tryby:
      • "Uruchom ponownie"
      • "brak"
    • Ta opcja ustawia tryb ewolucji schematu dla elementu from_protobuf(). Na początku zapytania platforma Spark rejestruje najnowszy identyfikator schematu dla danego tematu. Określa schemat dla elementu from_protobuf(). Po uruchomieniu zapytania można opublikować nowy schemat w rejestrze schematów. Po zauważeniu nowszego identyfikatora schematu w rekordzie przychodzącym wskazuje zmianę schematu. Ta opcja określa sposób obsługi takiej zmiany schematu:
      • restart (ustawienie domyślne): wyzwala UnknownFieldException element po zauważeniu nowszego identyfikatora schematu. Spowoduje to zakończenie zapytania. Usługa Databricks zaleca skonfigurowanie przepływów pracy w celu ponownego uruchomienia w przypadku niepowodzenia wykonywania zapytań w celu pobrania zmian schematu.
      • brak: zmiany identyfikatora schematu są ignorowane. Rekordy z nowszym identyfikatorem schematu są analizowane przy użyciu tego samego schematu, który zaobserwowano na początku zapytania. Oczekuje się, że nowsze definicje Protobuf będą zgodne z poprzednimi wersjami, a nowe pola są ignorowane.
  • confluent.schema.registry.<schema-registy-client-option>
    • Opcjonalnie
    • Rejestr schematów łączy się z rejestrem schematów Confluent przy użyciu klienta rejestru schematów confluent. Wszelkie opcje konfiguracji obsługiwane przez klienta można określić z prefiksem "confluent.schema.registry". Na przykład następujące dwa ustawienia zawierają poświadczenia uwierzytelniania "USER_INFO":
      • "confluent.schema.registry.basic.auth.credentials.source": "USER_INFO"
      • "confluent.schema.registry.basic.auth.user.info": "<KEY> : <SECRET>"