read_pubsub streaming table-valued function

Applies to: check marked yes Databricks SQL check marked yes Databricks Runtime 13.3 LTS and above

Returns a table with records read from Pub/Sub from a topic. Only supports streaming queries.

Syntax

read_pubsub( { parameter => value } [, ...])

Arguments

read_pubsub requires named parameter invocation.

The only required arguments are subscriptionId, projectId, and topicId. All other arguments are optional.

For full argument descriptions, see Configure options for Pub/Sub streaming read.

Databricks recommends using secrets when providing authorization options. See secret function.

For details on configuring access to Pub/Sub, see Configure access to Pub/Sub.

Parameter Type Description
subscriptionId STRING Required, the unique identifier assigned to a Pub/Sub subscription.
projectId STRING Required, the Google Cloud project ID associated with the Pub/Sub topic.
topicId STRING Required, the ID or name of the Pub/Sub topic to subscribe to.
clientEmail STRING The email address associated with a service account for authentication.
clientId STRING The client ID associated with the service account for authentication.
privateKeyId STRING The ID of the private key associated with the service account.
privateKey STRING The private key associated with the service account for authentication.

These arguments are used for further fine-tuning when reading from Pub/Sub:

Parameter Type Description
numFetchPartitions STRING Optional with default number of executors. The number of parallel Spark tasks that fetch records from a subscription.
deleteSubscriptionOnStreamStop BOOLEAN Optional with default false. If set to true, the subscription passed to the stream is deleted when the streaming job ends.
maxBytesPerTrigger STRING A soft limit for the batch size to be processed during each triggered micro-batch. The default is ‘none’.
maxRecordsPerFetch STRING The number of records to fetch per task before processing records. The default is ‘1000’.
maxFetchPeriod STRING The time duration for each task to fetch before processing records. The default is ’10s’.

Returns

A table of Pub/Sub records with the following schema. The attributes column could be null but all other columns are not null.

Name Data type Nullable Standard Description
messageId STRING No Unique identifier for the Pub/Sub message.
payload BINARY No The content of the Pub/Sub message.
attributes STRING Yes Key-value pairs representing the attributes of the Pub/Sub message. This is a json-encoded string.
publishTimestampInMillis BIGINT No The timestamp when the message was published, in milliseconds.
sequenceNumber BIGINT No The unique identifier of the record within its shard.

Examples

-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’,
                clientEmail => secret(‘app-events’, ‘clientEmail’),
                clientId => secret(‘app-events’, ‘clientId’),
        privateKeyId => secret(‘app-events’, ‘privateKeyId’),
                privateKey => secret(‘app-events’, ‘privateKey’)
);

-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’
);

The data would now need to be queried from the testing.streaming_table for further analysis.

Erroneous queries:

-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’
);

-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’,
                maxRecordsPerFetchLimit => ‘1000001’
);