read_pulsar strömmande tabellvärdesfunktion

Gäller för:check marked yes Databricks SQL check marked yes Databricks Runtime 14.1 och senare

Viktigt!

Den här funktionen finns som allmänt tillgänglig förhandsversion.

Returnerar en tabell med poster som lästs från Pulsar.

Den här tabellvärdesfunktionen stöder endast direktuppspelning och inte batchfråga.

Syntax

read_pulsar ( { option_key => option_value } [, ...] )

Argument

Den här funktionen kräver namngivna parameteranrop för alternativnycklarna.

Alternativen serviceUrl och topic är obligatoriska.

Beskrivningarna av argumenten är korta här. Mer information finns i dokumentationen om strukturerade pulsarströmmar .

Alternativ Typ Standardvärde beskrivning
serviceUrl STRÄNG Obligatorisk URI:n för Pulsar-tjänsten.
Avsnitt STRÄNG Obligatorisk Ämnet att läsa från.
predefinedSubscription STRÄNG Ingen Det fördefinierade prenumerationsnamnet som används av anslutningsappen för att spåra spark-programmets förlopp.
subscriptionPrefix STRÄNG Ingen Ett prefix som används av anslutningsappen för att generera en slumpmässig prenumeration för att spåra spark-programförloppet.
pollTimeoutMs LÅNG 120000 Tidsgränsen för att läsa meddelanden från Pulsar i millisekunder.
failOnDataLoss BOOLEAN true Styr om en fråga ska misslyckas när data går förlorade (till exempel ämnen tas bort eller meddelanden tas bort på grund av kvarhållningsprincip).
startingOffsets STRÄNG senaste Startpunkten när en fråga startas, antingen tidigaste, senaste eller en JSON-sträng som anger en specifik förskjutning. Om den senaste är läser läsaren de senaste posterna när den börjar köras. Om det är tidigast läser läsaren från den tidigaste förskjutningen. Användaren kan också ange en JSON-sträng som anger en specifik förskjutning.
startingTime STRÄNG Ingen När det anges läser Pulsar-källan meddelanden från positionen för den angivna startingTime.

Följande argument används för autentisering av pulsar-klienten:

Alternativ Typ Standardvärde beskrivning
pulsarClientAuthPluginClassName STRÄNG Ingen Namnet på plugin-programmet för autentisering.
pulsarClientAuthParams STRÄNG Ingen Parametrar för plugin-programmet för autentisering.
pulsarClientUseKeyStoreTls STRÄNG Ingen Om du vill använda KeyStore för tls-autentisering.
pulsarClientTlsTrustStoreType STRÄNG Ingen TrustStore-filtyp för tls-autentisering.
pulsarClientTlsTrustStorePath STRÄNG Ingen TrustStore-filsökväg för tls-autentisering.
pulsarClientTlsTrustStorePassword STRÄNG Ingen TrustStore-lösenord för tls-autentisering.

Dessa argument används för konfiguration och autentisering av pulsar-antagningskontroll, pulsaradministratörskonfiguration krävs endast när antagningskontroll är aktiverad (när maxBytesPerTrigger har angetts)

Alternativ Typ Standardvärde beskrivning
maxBytesPerTrigger BIGINT Ingen En mjuk gräns för det maximala antalet byte som vi vill bearbeta per mikrobatch. Om detta anges måste även admin.url anges.
adminUrl STRÄNG Ingen Pulsar-tjänstenHttpUrl-konfigurationen. Behövs bara när maxBytesPerTrigger har angetts.
pulsarAdminAuthPlugin STRÄNG Ingen Namnet på plugin-programmet för autentisering.
pulsarAdminAuthParams STRÄNG Ingen Parametrar för plugin-programmet för autentisering.
pulsarClientUseKeyStoreTls STRÄNG Ingen Om du vill använda KeyStore för tls-autentisering.
pulsarAdminTlsTrustStoreType STRÄNG Ingen TrustStore-filtyp för tls-autentisering.
pulsarAdminTlsTrustStorePath STRÄNG Ingen TrustStore-filsökväg för tls-autentisering.
pulsarAdminTlsTrustStorePassword STRÄNG Ingen TrustStore-lösenord för tls-autentisering.

Returer

En tabell med pulsarposter med följande schema.

  • __key STRING NOT NULL: Pulsar-meddelandenyckel.

  • value BINARY NOT NULL: Pulsar-meddelandevärde.

    Obs! För ämnen med Avro- eller JSON-schema expanderas innehållet i stället för att läsa in innehåll i ett binärt värdefält för att bevara fältnamnen och fälttyperna för Pulsar-ämnet.

  • __topic STRING NOT NULL: Pulsar ämnesnamn.

  • __messageId BINARY NOT NULL: Pulsar-meddelande-ID.

  • __publishTime TIMESTAMP NOT NULL: Publiceringstid för Pulsar-meddelande.

  • __eventTime TIMESTAMP NOT NULL: Pulsar meddelandehändelsetid.

  • __messageProperties MAP<STRING, STRING>: Pulsar-meddelandeegenskaper.

Exempel

-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pulsar(
      serviceUrl => 'pulsar://broker.example.com:6650',
      startingOffsets => 'earliest',
      topic => 'my-topic');

-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pulsar(
        serviceUrl => 'pulsar://broker.example.com:6650',
        startingOffsets => 'earliest',
        topic => 'my-topic',
        pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
        pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
        );

The data can now to be queried from the testing.streaming_table for further analysis.