次の方法で共有


read_kinesis ストリーミングテーブル値関数

適用対象:check marked yes Databricks SQL Databricks Runtime 13.3 LTS 以降

1 つ以上のストリームの Kinesis から読み取られたレコードを含むテーブルを返します。

構文

read_kinesis ( { parameter => value } [, ...] )

引数

read_kinesis には、名前付きパラメーター呼び出しが必要です。

唯一必要な引数は streamName です。 他の引数はすべて省略可能です。

ここでは引数について簡単に説明します。 詳細については、Amazon Kinesis のドキュメント を参照してください。

AWS に接続して認証するための接続オプションにはさまざまなものがあります。 awsAccessKey、および awsSecretKey は、secret 関数を使用して関数の引数で指定するか、引数を使用して手動で設定するか、次に示すように環境変数として構成できます。 roleArnroleExternalIDroleSessionName は、インスタンス プロファイルを使用して AWS で認証するときに使用することもできます。 どれも指定されていない場合は、既定の AWS プロバイダー チェーンが使用されます。

パラメーター 説明
streamName STRING 必須。1 つ以上の kinesis ストリームのコンマ区切りのリスト。
awsAccessKey STRING AWS アクセス キー (存在する場合)。 環境変数 (AWS_ACCESS_KEY_ID) や資格情報プロファイル ファイルなど、AWS の既定の資格情報プロバイダー チェーンでサポートされているさまざまなオプションを使用して指定することもできます。
awsSecretKey STRING アクセス キーに対応する秘密鍵。 引数で指定するか、環境変数 (AWS_SECRET_KEY または AWS_SECRET_ACCESS_KEY) や資格情報プロファイル ファイルなど、AWS の既定の資格情報プロバイダー チェーンでサポートされているさまざまなオプションを使用して指定することができます。
roleArn STRING Kinesis にアクセスするときに想定されるロールの Amazon リソース名。
roleExternalId STRING AWS アカウントへのアクセスを委任するときに使用されます。
roleSessionName STRING AWS ロールセッション名。
stsEndpoint STRING 一時的なアクセス資格情報を要求するためのエンドポイント。
region STRING 指定するストリームのリージョン。 既定値は、ローカルに解決されたリージョンです。
endpoint STRING Kinesis データ ストリームのリージョン エンドポイント。 既定値は、ローカルに解決されたリージョンです。
initialPosition STRING ストリーム内からの読み取りの開始場所。 次のいずれか: 'latest' (既定値)、'trim_horizon'、'earliest'、'at_timestamp'。
consumerMode STRING 次のいずれか: 'polling' (既定値)、または 'EFO' (enhanced-fan-out)。
consumerName STRING コンシューマーの名前。 すべてのコンシューマーにプレフィックス 'databricks_' が付加されます。 既定値は空の文字列です。
registerConsumerTimeoutInterval STRING Kinesis EFO コンシューマーが Kinesis ストリームに登録されるのを待つときの最大タイムアウト。この時間が経過した後、エラーがスローされます。 既定値は '300s' です。
requireConsumerDeregistration BOOLEAN true の場合、クエリの終了時に EFO コンシューマーが登録解除されます。 既定値は false です。
deregisterConsumerTimeoutInterval STRING Kinesis EFO コンシューマーが Kinesis ストリームから登録解除されるのを待つときの最大タイムアウト。この時間が経過した後、エラーがスローされます。 既定値は '300s' です。
consumerRefreshInterval STRING コンシューマーが確認および更新される間隔。 既定値は '300s' です。

次の引数は、Kinesis の読み取りスループットと待機時間の制御に使用されます。

パラメーター 説明
maxRecordsPerFetch INTEGER (>0) 省略可能。Kinesis への API 要求ごとに読み取られるレコード数 (既定では 10,000)。
maxFetchRate STRING シャードごとにデータをプリフェッチする速度。 MB/s で測定された '1.0' と '2.0' の間の値。 既定値は '1.0' です。
minFetchPeriod STRING 連続するプリフェッチ試行間の最大待機時間。 既定値は '400ms' です。
maxFetchDuration STRING プリフェッチされた新しいデータをバッファーする最長期間。 既定値は '10s' です。
fetchBufferSize STRING 次のトリガーのデータ量。 既定値は '20gb' です。
shardsPerTask INTEGER (>0) Spark タスクごとに並行でプリフェッチする Kinesis シャードの数。 既定値は 5 です。
shardFetchinterval STRING 再シャード化のためのポーリング頻度。 既定値は '1s' です。
coalesceThresholdBlockSize INTEGER (>0) 自動結合が発生するしきい値。 既定値は 10,000,000 です。
coalesce BOOLEAN true の場合、プリフェッチされた要求が結合されます。 既定値は、true です。
coalesceBinSize INTEGER (>0) 結合後の概算ブロック サイズ。 既定値は 128,000,000 です。
reuseKinesisClient BOOLEAN true の場合、キャッシュに格納されている Kinesis クライアントが再利用されます。 既定値は true です (PE クラスターを除く)。
clientRetries INTEGER (>0) 再試行シナリオでの再試行回数。 既定値は 5 です。

返品

次のスキーマを持つ Kinesis レコードのテーブル。

名前 データ型 Nullable Standard 説明
partitionKey STRING いいえ ストリームのシャード間でデータを分散するときに使用されるキー。 パーティション キーが同じデータ レコードはすべて、同じシャードから読み取られます。
data BINARY いいえ kinesis データ ペイロード、base-64 エンコード済み。
stream STRING いいえ データ読み取り元のストリームの名前。
shardId STRING いいえ データ読み取り元のシャードの一意識別子。
sequenceNumber BIGINT いいえ シャード内のレコードの一意識別子。
approximateArrivalTimestamp TIMESTAMP いいえ ストリームへのおおよそのレコード挿入時間。

(stream, shardId, sequenceNumber) 列が主キーを構成します。

-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        awsAccessKey => secret(‘test-databricks’, ‘awsAccessKey’),
        awsSecretKey => secret(‘test-databricks’, ‘awsSecretKey’),
        initialPosition => 'earliest');

-- The data would now need to be queried from the testing.streaming_table

-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest');

-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest',
        roleArn => 'arn:aws:iam::123456789012:role/MyRole',
        roleSessionName => 'testing@databricks.com');