read_pulsar funkcja przesyłania strumieniowego o wartości tabeli

Dotyczy:check marked yes Databricks SQL check marked yes Databricks Runtime 14.1 i nowsze

Ważne

Ta funkcja jest dostępna w publicznej wersji zapoznawczej.

Zwraca tabelę z rekordami odczytanymi z pulsaru.

Ta funkcja z wartością tabeli obsługuje tylko przesyłanie strumieniowe, a nie zapytanie wsadowe.

Składnia

read_pulsar ( { option_key => option_value } [, ...] )

Argumenty

Ta funkcja wymaga wywołania nazwanego parametru dla kluczy opcji.

Opcje serviceUrl i topic są obowiązkowe.

Opisy argumentów są krótkie tutaj. Aby uzyskać rozszerzone opisy, zobacz ustrukturyzowaną dokumentację pulsaru przesyłania strumieniowego.

Opcja Typ Domyślny opis
serviceUrl CIĄG Obowiązkowy Identyfikator URI usługi Pulsar.
topic CIĄG Obowiązkowy Temat do odczytania.
wstępnie zdefiniowanesubskrypcja CIĄG Brak Wstępnie zdefiniowana nazwa subskrypcji używana przez łącznik do śledzenia postępu aplikacji platformy Spark.
subscriptionPrefix CIĄG Brak Prefiks używany przez łącznik do generowania losowej subskrypcji do śledzenia postępu aplikacji platformy Spark.
pollTimeoutMs DŁUGI 120000 Limit czasu odczytywania komunikatów z Pulsar w milisekundach.
failOnDataLoss BOOLEAN prawda Określa, czy zapytanie nie powiodło się w przypadku utraty danych (na przykład tematy są usuwane lub komunikaty są usuwane z powodu zasad przechowywania).
startOffsets CIĄG latest Punkt początkowy, gdy zapytanie jest uruchamiane, najwcześniejsze, najnowsze lub ciąg JSON, który określa określone przesunięcie. Jeśli jest najnowsza, czytelnik odczytuje najnowsze rekordy po uruchomieniu. Jeśli najwcześniej, czytnik odczytuje od najwcześniejszego przesunięcia. Użytkownik może również określić ciąg JSON, który określa określone przesunięcie.
startTime CIĄG Brak Po określeniu źródło Pulsar odczytuje komunikaty rozpoczynające się od pozycji określonego czasu rozpoczęcia.

Następujące argumenty są używane do uwierzytelniania klienta pulsar:

Opcja Typ Domyślny opis
pulsarClientAuthPluginClassName CIĄG Brak Nazwa wtyczki uwierzytelniania.
pulsarClientAuthParams CIĄG Brak Parametry wtyczki uwierzytelniania.
pulsarClientUseKeyStoreTls CIĄG Brak Czy używać magazynu kluczy do uwierzytelniania tls.
pulsarClientTlsTrustStoreType CIĄG Brak Typ pliku TrustStore na potrzeby uwierzytelniania tls.
pulsarClientTlsTrustStorePath CIĄG Brak Ścieżka pliku TrustStore na potrzeby uwierzytelniania tls.
pulsarClientTlsTrustStorePassword CIĄG Brak Hasło trustStore na potrzeby uwierzytelniania tls.

Te argumenty są używane do konfiguracji i uwierzytelniania pulsar kontroli dostępu, konfiguracja administratora pulsar jest wymagana tylko w przypadku włączenia kontroli dostępu (gdy parametr maxBytesPerTrigger jest ustawiony)

Opcja Typ Domyślny opis
maxBytesPerTrigger BIGINT Brak Miękki limit maksymalnej liczby bajtów, które chcemy przetworzyć na mikrobajt. Jeśli jest to określone, należy również określić adres admin.url.
adminUrl CIĄG Brak Konfiguracja pulsar serviceHttpUrl. Wymagane tylko w przypadku określenia parametru maxBytesPerTrigger.
pulsar Administracja AuthPlugin CIĄG Brak Nazwa wtyczki uwierzytelniania.
pulsar Administracja AuthParams CIĄG Brak Parametry wtyczki uwierzytelniania.
pulsarClientUseKeyStoreTls CIĄG Brak Czy używać magazynu kluczy do uwierzytelniania tls.
pulsar Administracja TlsTrustStoreType CIĄG Brak Typ pliku TrustStore na potrzeby uwierzytelniania tls.
pulsar Administracja TlsTrustStorePath CIĄG Brak Ścieżka pliku TrustStore na potrzeby uwierzytelniania tls.
pulsar Administracja TlsTrustStorePassword CIĄG Brak Hasło trustStore na potrzeby uwierzytelniania tls.

Powroty

Tabela rekordów pulsarnych z następującym schematem.

  • __key STRING NOT NULL: Klucz komunikatu pulsar.

  • value BINARY NOT NULL: wartość komunikatu pulsu.

    Uwaga: w przypadku tematów ze schematem Avro lub JSON zamiast ładowania zawartości do pola wartości binarnej zawartość zostanie rozszerzona w celu zachowania nazw pól i typów pól tematu Pulsar.

  • __topic STRING NOT NULL: Pulsar nazwa tematu.

  • __messageId BINARY NOT NULL: identyfikator komunikatu pulsu.

  • __publishTime TIMESTAMP NOT NULL: Czas publikowania komunikatu pulsar.

  • __eventTime TIMESTAMP NOT NULL: czas zdarzenia komunikatu pulsu.

  • __messageProperties MAP<STRING, STRING>: właściwości komunikatu pulsu.

Przykłady

-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pulsar(
      serviceUrl => 'pulsar://broker.example.com:6650',
      startingOffsets => 'earliest',
      topic => 'my-topic');

-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pulsar(
        serviceUrl => 'pulsar://broker.example.com:6650',
        startingOffsets => 'earliest',
        topic => 'my-topic',
        pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
        pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
        );

The data can now to be queried from the testing.streaming_table for further analysis.