Freigeben über


read_kinesis-Streamingtabellenwertfunktionen

Gilt für:Häkchen gesetzt ja Databricks SQL Häkchen gesetzt ja Databricks Runtime ab Version 13.3 LTS

Gibt eine Tabelle mit Datensätzen zurück, die aus Kinesis aus einem oder mehreren Streams gelesen werden.

Syntax

read_kinesis ( { parameter => value } [, ...] )

Argumente

read_kinesis erfordert einen benannten Parameteraufruf.

Das einzige erforderliche Argument ist streamName. Alle anderen Argumente sind optional.

Hier werden die Argumente nur kurz beschrieben. Weitere Details finden Sie in der Dokumentation zu Amazon Kinesis.

Es gibt verschiedene Optionen, um eine Verbindung mit AWS herzustellen und sich bei AWS zu authentifizieren. awsAccessKey und awsSecretKey können entweder mit der Funktion „secret“ in den Funktionsargumenten angegeben, manuell in den Argumenten festgelegt oder wie unten angegeben als Umgebungsvariablen konfiguriert werden. roleArn, roleExternalID und roleSessionName können auch verwendet werden, um sich mithilfe von Instanzprofilen bei AWS zu authentifizieren. Wenn keine dieser Angaben vorhanden ist, wird die AWS-Standardanbieterkette verwendet.

Parameter Typ Beschreibung
streamName STRING Erforderliche, durch Kommas getrennte Liste mit mindestens einem Kinesis-Stream.
awsAccessKey STRING Der AWS-Zugriffsschlüssel, falls vorhanden. Kann auch über die verschiedenen Optionen angegeben werden, die über die AWS-Standardanbieterkette für Anmeldeinformationen unterstützt werden, beispielsweise über Umgebungsvariablen (AWS_ACCESS_KEY_ID) oder eine Datei mit Anmeldeinformationsprofilen.
awsSecretKey STRING Der geheime Schlüssel, der dem Zugriffsschlüssel entspricht. Kann in den Argumenten oder über die verschiedenen Optionen angegeben werden, die über die AWS-Standardanbieterkette für Anmeldeinformationen unterstützt werden, beispielsweise über Umgebungsvariablen (AWS_SECRET_KEYoder AWS_SECRET_ACCESS_KEY) oder eine Datei mit Anmeldeinformationsprofilen.
roleArn STRING Der Amazon-Ressourcenname der Rolle, die beim Zugriff auf Kinesis angenommen werden soll.
roleExternalId STRING Wird beim Delegieren des Zugriffs auf das AWS-Konto verwendet.
roleSessionName STRING Name der AWS-Rollensitzung.
stsEndpoint STRING Ein Endpunkt zum Anfordern temporärer Anmeldeinformationen für den Zugriff.
region STRING Bereich für die anzugebenden Streams. Standardwert: die lokal aufgelöste Region.
endpoint STRING Regionaler Endpunkt für Kinesis-Datenströme. Standardwert: die lokal aufgelöste Region.
initialPosition STRING Startposition für das Lesen aus dem Datenstrom. Einer dieser Werte: „latest“ (Standard), „trim_horizon“, „earliest“, „at_timestamp“.
consumerMode STRING Einer dieser Werte: „polling“ (Standard) oder „EFO“ (enhanced-fan-out).
consumerName STRING Der Name des Consumers. Allen Consumern ist „databricks_“ vorangestellt. Der Standardwert ist eine leere Zeichenfolge.
registerConsumerTimeoutInterval STRING Der maximale Timeoutwert für die Zeitspanne, die abgewartet wird, bis der Kinesis-EFO-Consumer beim Kinesis-Datenstrom registriert wurde, bevor ein Fehler ausgelöst wird. Standardwert: 300 s.
requireConsumerDeregistration BOOLEAN true zum Aufheben der Registrierung des EFO-Consumers bei Beendigung der Abfrage. Der Standardwert ist false.
deregisterConsumerTimeoutInterval STRING Der maximale Timeoutwert für die Zeitspanne, die abgewartet wird, bis die Registrierung des Kinesis-EFO-Consumer beim Kinesis-Datenstrom aufgehoben wurde, bevor ein Fehler ausgelöst wird. Standardwert: 300 s.
consumerRefreshInterval STRING Das Intervall, in dem der Consumer überprüft und aktualisiert wird. Standardwert: 300 s.

Die folgenden Argumente werden verwendet, um den Lesedurchsatz und die Leselatenz für Kinesis zu steuern:

Parameter Typ Beschreibung
maxRecordsPerFetch INTEGER (>0) Optional, mit dem Standardwert von 10 000 Datensätzen, die pro API-Anforderung an Kinesis gelesen werden sollen.
maxFetchRate STRING Wie schnell Daten pro Shard vorab abgerufen werden. Ein Wert zwischen „1,0“ und „2,0“, der in MB/s gemessen wird. Standardwert: 1,0.
minFetchPeriod STRING Die maximale Wartezeit zwischen aufeinanderfolgenden Versuchen eines Vorababrufs. Standardwert: 400 ms.
maxFetchDuration STRING Die maximale Dauer zum Puffern vorab abgerufener neuer Daten. Standardwert: 10 s.
fetchBufferSize STRING Die Datenmenge für den nächsten Trigger. Standardwert: 20 GB.
shardsPerTask INTEGER (>0) Die Anzahl von Kinesis-Shards, die pro Spark-Aufgabe parallel vorab abgerufen werden sollen. Der Standard ist 5.
shardFetchinterval STRING Anzahl der Pollingvorgänge zum Resharding. Standardwert: 1 s.
coalesceThresholdBlockSize INTEGER (>0) Der Schwellenwert, bei dem eine automatische Zusammenfügung erfolgt. Standardwert: 10.000.000.
coalesce BOOLEAN true zum Zusammenfügen vorab abgerufener Anforderungen. Der Standardwert ist true.
coalesceBinSize INTEGER (>0) Die ungefähre Blockgröße nach dem Zusammenfügen. Standardwert: 128.000.000.
reuseKinesisClient BOOLEAN true zum Wiederverwenden des im Cache gespeicherten Kinesis-Clients. Standardwert: true, außer für einen PE-Cluster.
clientRetries INTEGER (>0) Die Anzahl der Wiederholungsversuche im Wiederholungsszenario. Der Standard ist 5.

Gibt zurück

Eine Tabelle mit Kinesis-Datensätzen mit dem folgenden Schema:

Name Datentyp Nullable Standard BESCHREIBUNG
partitionKey STRING Nein Ein Schlüssel, der zum Verteilen von Daten auf die Shards eines Datenstroms verwendet wird. Alle Datensätze mit demselben Partitionsschlüssel werden aus demselben Shard gelesen.
data BINARY Nein Die Kinesis-Datenpayload, Base64-codiert.
stream STRING Nein Der Name des Datenstroms, aus dem die Daten gelesen wurden.
shardId STRING Nein Ein eindeutiger Bezeichner für den Shard, aus dem die Daten gelesen wurden.
sequenceNumber BIGINT Nein Der eindeutige Bezeichner des Datensatzes innerhalb des Shards.
approximateArrivalTimestamp TIMESTAMP Nein Die ungefähre Zeit, zu der der Datensatz in den Datenstrom eingefügt wurde.

Die Spalten (stream, shardId, sequenceNumber) stellen einen Primärschlüssel dar.

Beispiele

-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        awsAccessKey => secret(‘test-databricks’, ‘awsAccessKey’),
        awsSecretKey => secret(‘test-databricks’, ‘awsSecretKey’),
        initialPosition => 'earliest');

-- The data would now need to be queried from the testing.streaming_table

-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest');

-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest',
        roleArn => 'arn:aws:iam::123456789012:role/MyRole',
        roleSessionName => 'testing@databricks.com');