read_kafka funkcja wartości tabeli

Dotyczy:zaznacz pole wyboru oznaczone jako tak Databricks SQL zaznacz pole wyboru oznaczone jako tak Databricks Runtime 13.3 LTS i nowsze

Odczytuje dane z klastra platformy Apache Kafka i zwraca dane w postaci tabelarycznej.

Może odczytywać dane z co najmniej jednego tematu platformy Kafka. Obsługuje zarówno zapytania wsadowe, jak i pozyskiwanie przesyłania strumieniowego.

Składnia

read_kafka([option_key => option_value ] [, ...])

Argumenty

Ta funkcja wymaga wywołania nazwanego parametru.

  • option_key: nazwa opcji do skonfigurowania. Należy użyć backticks (') dla opcji zawierających kropki (.).
  • option_value: wyrażenie stałe do ustawienia opcji. Akceptuje literały i funkcje skalarne.

Zwraca

Rekordy odczytane z klastra platformy Apache Kafka z następującym schematem:

  • key BINARY: klucz rekordu platformy Kafka.
  • value BINARY NOT NULL: wartość rekordu platformy Kafka.
  • topic STRING NOT NULL: nazwa tematu platformy Kafka, z których jest odczytywany rekord.
  • partition INT NOT NULL: identyfikator partycji platformy Kafka, z których jest odczytywany rekord.
  • offset BIGINT NOT NULL: numer przesunięcia rekordu na platformie Kafka TopicPartition.
  • timestamp TIMESTAMP NOT NULL: wartość znacznika czasu dla rekordu. Kolumna timestampType definiuje, co odpowiada znacznikowi czasu.
  • timestampType INTEGER NOT NULL: typ znacznika czasu określonego w kolumnie timestamp .
  • headers ARRAY<STRUCT<key: STRING, VALUE: BINARY>>: wartości nagłówka podane jako część rekordu (jeśli jest włączone).

Przykłady

-- A batch query to read from a topic.
> SELECT value::string as value
  FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  ) LIMIT 10;

-- A more advanced query with security credentials for Kafka.
> SELECT * FROM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events',
    startingOffsets => 'earliest',
    `kafka.security.protocol` => 'SASL_SSL',
    `kafka.sasl.mechanism` => 'PLAIN',
    `kafka.sasl.jaas.config` =>  'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="{USER_NAME}" password="{PASSWORD}";',
  );

-- Streaming ingestion from Kafka with JSON parsing.
> CREATE OR REFRESH STREAMING TABLE catalog.schema.raw_events AS
  SELECT
    value::string:events,                 -- extract the field `events`
    to_timestamp(value::string:ts) as ts  -- extract the field `ts` and cast to timestamp
  FROM STREAM read_kafka(
    bootstrapServers => 'kafka_server:9092',
    subscribe => 'events'
  );

Opcje

Szczegółową listę opcji można znaleźć w dokumentacji platformy Apache Spark.

Wymagane opcje

Podaj poniższą opcję nawiązywania połączenia z klastrem platformy Kafka.

Opcja
bootstrapServers

Typ: String

Rozdzielona przecinkami lista par hostów/portów wskazująca klaster platformy Kafka.

Wartość domyślna: Brak

Podaj tylko jedną z poniższych opcji, aby skonfigurować tematy platformy Kafka do ściągania danych.

Opcja
assign

Typ: String

Ciąg JSON zawierający określone partycje tematu, z których mają być używane. Na przykład w przypadku '{"topicA":[0,1],"topicB":[2,4]}'partycji 0 i 1. tematu będą używane partycje.

Wartość domyślna: Brak
subscribe

Typ: String

Rozdzielona przecinkami lista tematów platformy Kafka do odczytania.

Wartość domyślna: Brak
subscribePattern

Typ: String

Wyrażenie regularne pasujące do tematów do subskrybowania.

Wartość domyślna: Brak

Różne opcje

read_kafka można używać w zapytaniach wsadowych i w zapytaniach przesyłanych strumieniowo. Poniższe opcje określają, do którego typu zapytania mają zastosowanie.

Opcja
endingOffsets

Typ: Typ zapytania: String tylko partia

Przesunięcie do odczytu dla zapytania wsadowego, "latest" aby określić najnowsze rekordy lub ciąg JSON określający przesunięcie końcowe dla każdej części tematu. W formacie JSON jako -1 przesunięcie może służyć do odwoływania się do najnowszej wersji. -2 (najwcześniejsze) jako przesunięcie jest niedozwolone.

Wartość domyślna: "latest"
endingOffsetsByTimestamp

Typ: Typ zapytania: String tylko partia

Ciąg JSON określający znacznik czasu zakończenia do odczytania do każdego elementu TopicPartition. Znaczniki czasu muszą być podane jako długa wartość znacznika czasu w milisekundach od 1970-01-01 00:00:00 UTC, na przykład
1686444353000. Zobacz uwagę poniżej, aby uzyskać szczegółowe informacje o zachowaniu ze znacznikami czasu.
endingOffsetsByTimestamp ma pierwszeństwo przed endingOffsets.

Wartość domyślna: Brak
endingTimestamp

Typ: Typ zapytania: String tylko partia

Wartość ciągu znacznika czasu w milisekundach od
1970-01-01 00:00:00 UTC, na przykład "1686444353000". Jeśli platforma Kafka nie zwróci dopasowanego przesunięcia, przesunięcie zostanie ustawione na najnowsze. Zobacz uwagę poniżej, aby uzyskać szczegółowe informacje o zachowaniu ze znacznikami czasu. Uwaga: endingTimestamp ma pierwszeństwo przed endingOffsetsByTimestamp i
endingOffsets.

Wartość domyślna: Brak
includeHeaders

Typ: Typ zapytania: Boolean przesyłanie strumieniowe i wsadowe

Określa, czy w wierszu mają być uwzględniane nagłówki platformy Kafka.

Wartość domyślna: false
kafka.<consumer_option>

Typ: Typ zapytania: String przesyłanie strumieniowe i wsadowe

Wszystkie opcje specyficzne dla konsumentów platformy Kafka można przekazać za pomocą prefiksu kafka. . Te opcje muszą być otoczone przez backticks po podaniu, w przeciwnym razie zostanie wyświetlony błąd analizatora. Opcje można znaleźć w dokumentacji platformy Kafka.

Uwaga: nie należy ustawiać następujących opcji za pomocą tej funkcji:
key.deserializer, , value.deserializer, , bootstrap.serversgroup.id

Wartość domyślna: Brak
maxOffsetsPerTrigger

Typ: Typ zapytania: Long tylko przesyłanie strumieniowe

Limit szybkości maksymalnej liczby przesunięć lub wierszy przetworzonych na interwał wyzwalacza. Określona łączna liczba przesunięć zostanie proporcjonalnie podzielona między elementy TopicPartitions.

Wartość domyślna: Brak
startingOffsets

Typ: Typ zapytania: String przesyłanie strumieniowe i wsadowe

Punkt początkowy podczas uruchamiania zapytania, "earliest" czyli od najwcześniejszych przesunięć, "latest" czyli od najnowszych przesunięć, lub ciągu JSON określającego przesunięcie początkowe dla każdej części tematu. W formacie JSON jako -2 przesunięcie można odwoływać się najwcześniej do najnowszej -1 .

Uwaga: w przypadku zapytań wsadowych najnowsza wersja (niejawnie lub przy użyciu -1 w formacie JSON) jest niedozwolona. W przypadku zapytań przesyłanych strumieniowo ma to zastosowanie tylko wtedy, gdy zostanie uruchomione nowe zapytanie. Ponownie uruchomione zapytania przesyłane strumieniowo będą kontynuowane z przesunięć zdefiniowanych w punkcie kontrolnym zapytania. Nowo odnalezione partycje podczas zapytania będą uruchamiane najwcześniej.

Wartość domyślna: "latest" dla przesyłania strumieniowego dla "earliest" partii
startingOffsetsByTimestamp

Typ: Typ zapytania: String przesyłanie strumieniowe i wsadowe

Ciąg JSON określający znacznik czasu rozpoczęcia dla każdego elementu TopicPartition. Znaczniki czasu muszą być podane jako długa wartość znacznika czasu w milisekundach od 1970-01-01 00:00:00 UTC, na przykład 1686444353000. Zobacz uwagę poniżej, aby uzyskać szczegółowe informacje o zachowaniu ze znacznikami czasu. Jeśli platforma Kafka nie zwróci dopasowanego przesunięcia, zachowanie będzie zgodne z wartością opcji startingOffsetsByTimestampStrategy.
startingOffsetsByTimestamp ma pierwszeństwo przed startingOffsets.

Uwaga: w przypadku zapytań przesyłanych strumieniowo ma to zastosowanie tylko wtedy, gdy zostanie uruchomione nowe zapytanie. Ponownie uruchomione zapytania przesyłane strumieniowo będą kontynuowane z przesunięć zdefiniowanych w punkcie kontrolnym zapytania. Nowo odnalezione partycje podczas zapytania będą uruchamiane najwcześniej.

Wartość domyślna: Brak
startingOffsetsByTimestampStrategy

Typ: Typ zapytania: String przesyłanie strumieniowe i wsadowe

Ta strategia jest używana, gdy określone przesunięcie początkowe według sygnatury czasowej (globalnej lub na partycję) nie jest zgodne z zwróconym przesunięciem platformy Kafka. Dostępne strategie to:

* "error": niepowodzenie zapytania
* "latest": przypisuje najnowsze przesunięcie dla tych partycji, aby platforma Spark mogła odczytywać nowsze rekordy z tych partycji w kolejnych mikrosadach.

Wartość domyślna: "error"
startingTimestamp

Typ: Typ zapytania: String przesyłanie strumieniowe i wsadowe

Wartość ciągu znacznika czasu w milisekundach od
1970-01-01 00:00:00 UTC, na przykład "1686444353000". Zobacz uwagę poniżej, aby uzyskać szczegółowe informacje o zachowaniu ze znacznikami czasu. Jeśli platforma Kafka nie zwróci dopasowanego przesunięcia, zachowanie będzie zgodne z wartością opcji startingOffsetsByTimestampStrategy.
startingTimestamp ma pierwszeństwo przed startingOffsetsByTimestamp i startingOffsets.

Uwaga: w przypadku zapytań przesyłanych strumieniowo ma to zastosowanie tylko wtedy, gdy zostanie uruchomione nowe zapytanie. Ponownie uruchomione zapytania przesyłane strumieniowo będą kontynuowane z przesunięć zdefiniowanych w punkcie kontrolnym zapytania. Nowo odnalezione partycje podczas zapytania będą uruchamiane najwcześniej.

Wartość domyślna: Brak

Uwaga

Zwrócone przesunięcie dla każdej partycji jest najwcześniejszym przesunięciem, którego sygnatura czasowa jest większa lub równa podanemu znacznikowi czasu w odpowiedniej partycji. Zachowanie różni się w zależności od opcji, jeśli platforma Kafka nie zwraca dopasowanego przesunięcia — sprawdź opis każdej opcji.

Platforma Spark po prostu przekazuje informacje znacznika czasu do KafkaConsumer.offsetsForTimeselementu i nie interpretuje ani nie rozumuje wartości. Aby uzyskać więcej informacji na temat KafkaConsumer.offsetsForTimesusługi , zapoznaj się z dokumentacją. Ponadto znaczenie znacznika czasu w tym miejscu może się różnić w zależności od konfiguracji platformy Kafka (log.message.timestamp.type). Aby uzyskać szczegółowe informacje, zobacz dokumentację platformy Apache Kafka.