Training
Module
Perform advanced streaming data transformations with Apache Spark and Kafka in Azure HDInsight
This browser is no longer supported.
Upgrade to Microsoft Edge to take advantage of the latest features, security updates, and technical support.
Note
Access to this page requires authorization. You can try signing in or changing directories.
Access to this page requires authorization. You can try changing directories.
Change Data Capture (CDC) is a technique used to track row-level changes in database tables in response to create, update, and delete operations. Debezium is a distributed platform that builds on top of Change Data Capture features available in different databases (for example, logical decoding in PostgreSQL). It provides a set of Kafka Connect connectors that tap into row-level changes in database tables and convert them into event streams that are then sent to Apache Kafka.
This tutorial walks you through how to set up a change data capture based system on Azure using Event Hubs (for Kafka), Azure Database for PostgreSQL and Debezium. It uses the Debezium PostgreSQL connector to stream database modifications from PostgreSQL to Kafka topics in Event Hubs.
Note
This article contains references to a term that Microsoft no longer uses. When the term is removed from the software, we'll remove it from this article.
In this tutorial, you take the following steps:
FileStreamSink
connectorTo complete this walk through, you require:
An Event Hubs namespace is required to send and receive from any Event Hubs service. See Creating an event hub for instructions to create a namespace and an event hub. Get the Event Hubs connection string and fully qualified domain name (FQDN) for later use. For instructions, see Get an Event Hubs connection string.
Azure Database for PostgreSQL is a relational database service based on the community version of open-source PostgreSQL database engine, and is available in three deployment options: Single Server, Flexible Server, and Cosmos DB for PostgreSQL. Follow these instructions to create an Azure Database for PostgreSQL server using the Azure portal.
This section covers the following topics:
Follow the latest instructions in the Debezium documentation to download and set up the connector.
1.2.0
of the connector, use this link - https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.2.0.Final/debezium-connector-postgres-1.2.0.Final-plugin.tar.gzMinimal reconfiguration is necessary when redirecting Kafka Connect throughput from Kafka to Event Hubs. The following connect-distributed.properties
sample illustrates how to configure Connect to authenticate and communicate with the Kafka endpoint on Event Hubs:
Important
bootstrap.servers={YOUR.EVENTHUBS.FQDN}:9093 # e.g. namespace.servicebus.windows.net:9093
group.id=connect-cluster-group
# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status
# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
rest.advertised.host.name=connect
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{YOUR.EVENTHUBS.CONNECTION.STRING}";
plugin.path={KAFKA.DIRECTORY}/libs # path to the libs directory within the Kafka release
Important
Replace {YOUR.EVENTHUBS.CONNECTION.STRING}
with the connection string for your Event Hubs namespace. For instructions on getting the connection string, see Get an Event Hubs connection string. Here's an example configuration: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=XXXXXXXXXXXXXXXX";
In this step, a Kafka Connect worker is started locally in distributed mode, using Event Hubs to maintain cluster state.
connect-distributed.properties
file locally. Be sure to replace all values in braces../bin/connect-distributed.sh /PATH/TO/connect-distributed.properties
and wait for the cluster to start.Note
Kafka Connect uses the Kafka AdminClient API to automatically create topics with recommended configurations, including compaction. A quick check of the namespace in the Azure portal reveals that the Connect worker's internal topics have been created automatically.
Kafka Connect internal topics must use compaction. The Event Hubs team isn't responsible for fixing improper configurations if internal Connect topics are incorrectly configured.
Create a configuration file (pg-source-connector.json
) for the PostgreSQL source connector - replace the values as per your Azure PostgreSQL instance.
{
"name": "todo-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "<replace with Azure PostgreSQL instance name>.postgres.database.azure.com",
"database.port": "5432",
"database.user": "<replace with database user name>",
"database.password": "<replace with database password>",
"database.dbname": "postgres",
"database.server.name": "my-server",
"plugin.name": "wal2json",
"table.whitelist": "public.todos"
}
}
Tip
database.server.name
attribute is a logical name that identifies and provides a namespace for the particular PostgreSQL database server/cluster being monitored.
To create an instance of the connector, use the Kafka Connect REST API endpoint:
curl -X POST -H "Content-Type: application/json" --data @pg-source-connector.json http://localhost:8083/connectors
To check the status of the connector:
curl -s http://localhost:8083/connectors/todo-connector/status
To see change data capture in action, you need to create/update/delete records in the Azure PostgreSQL database.
Start by connecting to your Azure PostgreSQL database (the following example uses psql).
psql -h <POSTGRES_INSTANCE_NAME>.postgres.database.azure.com -p 5432 -U <POSTGRES_USER_NAME> -W -d <POSTGRES_DB_NAME> --set=sslmode=require
e.g.
psql -h my-postgres.postgres.database.azure.com -p 5432 -U testuser@my-postgres -W -d postgres --set=sslmode=require
Create a table and insert records
CREATE TABLE todos (id SERIAL, description VARCHAR(50), todo_status VARCHAR(12), PRIMARY KEY(id));
INSERT INTO todos (description, todo_status) VALUES ('setup postgresql on azure', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('setup kafka connect', 'complete');
INSERT INTO todos (description, todo_status) VALUES ('configure and install connector', 'in-progress');
INSERT INTO todos (description, todo_status) VALUES ('start connector', 'pending');
The connector should now spring into action and send change data events to an Event Hubs topic with the following name my-server.public.todos
, assuming you have my-server
as the value for database.server.name
and public.todos
is the table whose changes you're tracking (as per table.whitelist
configuration).
Check Event Hubs topic
Let's introspect the contents of the topic to make sure everything is working as expected. The following example uses kafkacat
, but you can also create a consumer using any of the options listed here.
Create a file named kafkacat.conf
with the following contents:
metadata.broker.list=<enter event hubs namespace>.servicebus.windows.net:9093
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username=$ConnectionString
sasl.password=<enter event hubs connection string>
Note
Update metadata.broker.list
and sasl.password
attributes in kafkacat.conf
as per Event Hubs information.
In a different terminal, start a consumer:
export KAFKACAT_CONFIG=kafkacat.conf
export BROKER=<enter event hubs namespace>.servicebus.windows.net:9093
export TOPIC=my-server.public.todos
kafkacat -b $BROKER -t $TOPIC -o beginning
You should see the JSON payloads representing the change data events generated in PostgreSQL in response to the rows you had added to the todos
table. Here's a snippet of the payload:
{
"schema": {...},
"payload": {
"before": null,
"after": {
"id": 1,
"description": "setup postgresql on azure",
"todo_status": "complete"
},
"source": {
"version": "1.2.0.Final",
"connector": "postgresql",
"name": "fulfillment",
"ts_ms": 1593018069944,
"snapshot": "last",
"db": "postgres",
"schema": "public",
"table": "todos",
"txId": 602,
"lsn": 184579736,
"xmin": null
},
"op": "c",
"ts_ms": 1593018069947,
"transaction": null
}
The event consists of the payload
along with its schema
(omitted for brevity). In payload
section, notice how the create operation ("op": "c"
) is represented - "before": null
means that it was a newly INSERT
ed row, after
provides values for the columns in the row, source
provides the PostgreSQL instance metadata from where this event was picked up and so on.
You can try the same with update or delete operations as well and introspect the change data events. For example, to update the task status for configure and install connector
(assuming its id
is 3
):
UPDATE todos SET todo_status = 'complete' WHERE id = 3;
Now that all the todos
table changes are being captured in Event Hubs topic, you use the FileStreamSink connector (that is available by default in Kafka Connect) to consume these events.
Create a configuration file (file-sink-connector.json
) for the connector - replace the file
attribute as per your file system.
{
"name": "cdc-file-sink",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"tasks.max": "1",
"topics": "my-server.public.todos",
"file": "<enter full path to file e.g. /Users/foo/todos-cdc.txt>"
}
}
To create the connector and check its status:
curl -X POST -H "Content-Type: application/json" --data @file-sink-connector.json http://localhost:8083/connectors
curl http://localhost:8083/connectors/cdc-file-sink/status
Insert/update/delete database records and monitor the records in the configured output sink file:
tail -f /Users/foo/todos-cdc.txt
Kafka Connect creates Event Hubs topics to store configurations, offsets, and status that persist even after the Kafka Connect cluster has been taken down. Unless this persistence is desired, we recommend that you delete these topics. You might also want to delete the my-server.public.todos
event hub that were created during this walk through.
To learn more about Event Hubs for Kafka, see the following articles:
Training
Module
Perform advanced streaming data transformations with Apache Spark and Kafka in Azure HDInsight