共用方式為


使用 Kafka Connect 將資料從 Apache Kafka 擷取至 Azure Cosmos DB for Apache Cassandra

適用於: Cassandra

現有的 Cassandra 應用程式可以輕易地使用 Azure Cosmos DB for Apache Cassandra,因為其 CQLv4 驅動程式相容性。 您可以利用此功能來與串流平台 (例如 Apache Kafka) 整合,並且將資料帶入 Azure Cosmos DB。

Apache Kafka (主題) 的資料僅適用於由其他應用程式使用或擷取到其他系統時。 您可以使用 Kafka 生產者/取用者 API,使用您選擇的語言和用戶端 SDK,來建置解決方案。 Kafka Connect 提供替代解決方案。 這個平台能透過可調整且可靠的方式,在 Apache Kafka 與其他系統之間串流資料。 因為 Kafka Connect 支援現成的連接器 (包括 Cassandra),您不需要撰寫自訂程式碼來整合 Kafka 與 Azure Cosmos DB for Apache Cassandra。

在本文中,我們會使用開放原始碼 DataStax Apache Kafka 連接器,此連接器可以在 Kafka Connect 架構的基礎上運作,將記錄從 Kafka 主題擷取至一或多個 Cassandra 資料表的資料列。 此範例會使用 Docker Compose 提供可重複使用的設定。 這相當方便,因為可讓您使用單一命令,在本機啟動所有必要的元件。 這些元件包括 Kafka、Zookeeper、Kafka Connect 背景工作角色和範例資料產生器應用程式。

以下是元件及其服務定義的明細,您可以參考 GitHub 存放庫中的完整 docker-compose 檔案。

  • Kafka 和 Zookeeper 會使用 debezium 映像。
  • 為了以 Docker 容器的形式執行,DataStax Apache Kafka 連接器是架設在現有 Docker 映像的基礎上 - debezium/connect-base。 此映像包括 Kafka 及其 Kafka Connect 程式庫的安裝,因此讓新增自訂連接器更為便利。 您可以參考 Dockerfile
  • data-generator 服務會將隨機產生 (JSON) 資料植入 weather-data Kafka 主題。 您可以參考 GitHub 存放庫 中的程式碼和 Dockerfile

必要條件

建立 Keyspace、資料表和啟動整合管線

使用 Azure 入口網站建立示範應用程式所需的 Cassandra Keyspace 和資料表。

注意

使用如下的相同 Keyspace 和資料表名稱

CREATE KEYSPACE weather WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1};

CREATE TABLE weather.data_by_state (station_id text, temp int, state text, ts timestamp, PRIMARY KEY (state, ts)) WITH CLUSTERING ORDER BY (ts DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;

CREATE TABLE weather.data_by_station (station_id text, temp int, state text, ts timestamp, PRIMARY KEY (station_id, ts)) WITH CLUSTERING ORDER BY (ts DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;

複製 GitHub 存放庫:

git clone https://github.com/Azure-Samples/cosmosdb-cassandra-kafka
cd cosmosdb-cassandra-kafka

啟動所有服務:

docker-compose --project-name kafka-cosmos-cassandra up --build

注意

下載並啟動容器可能需要一段時間:這只是一次性程序。

若要確認是否已啟動所有容器:

docker-compose -p kafka-cosmos-cassandra ps

資料產生器應用程式會開始將資料提取到 Kafka 中的 weather-data 主題。 您也可以進行快速的健全性檢查來確認。 查看執行 Kafka Connect 背景工作角色的 Docker 容器:

docker exec -it kafka-cosmos-cassandra_cassandra-connector_1 bash

一旦您進入容器殼層,只要啟動一般的 Kafka 主控台取用者程序,您應該會看到天氣資料 (JSON 格式) 流入。

cd ../bin
./kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic weather-data

Cassandra 接收器連接器設定

將以下的 JSON 內容複製到檔案 (您可以將其命名為 cassandra-sink-config.json)。 您需要根據您的設定來更新,本節的其餘部分將會提供本主題的指引。

{
    "name": "kafka-cosmosdb-sink",
    "config": {
        "connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
        "tasks.max": "1",
        "topics": "weather-data",
        "contactPoints": "<cosmos db account name>.cassandra.cosmos.azure.com",
        "port": 10350,
        "loadBalancing.localDc": "<cosmos db region e.g. Southeast Asia>",
        "auth.username": "<enter username for cosmosdb account>",
        "auth.password": "<enter password for cosmosdb account>",
        "ssl.hostnameValidation": true,
        "ssl.provider": "JDK",
        "ssl.keystore.path": "/etc/alternatives/jre/lib/security/cacerts/",
        "ssl.keystore.password": "changeit",
        "datastax-java-driver.advanced.connection.init-query-timeout": 5000,
        "maxConcurrentRequests": 500,
        "maxNumberOfRecordsInBatch": 32,
        "queryExecutionTimeout": 30,
        "connectionPoolLocalSize": 4,
        "topic.weather-data.weather.data_by_state.mapping": "station_id=value.stationid, temp=value.temp, state=value.state, ts=value.created",
        "topic.weather-data.weather.data_by_station.mapping": "station_id=value.stationid, temp=value.temp, state=value.state, ts=value.created",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false,
        "offset.flush.interval.ms": 10000
    }
}

以下是屬性的摘要:

基本連線

  • contactPoints:輸入 Azure Cosmos DB Cassandra 的聯絡點
  • loadBalancing.localDc:輸入 Azure Cosmos DB 帳戶的區域,例如東南亞
  • auth.username:輸入使用者名稱
  • auth.password:輸入密碼
  • port:輸入連接埠值,(此為 10350,而非9042。請保持原狀)

SSL 組態

Azure Cosmos DB 會強制執行透過 SSL 的安全連線,Kafka Connect 連接器也支援 SSL。

  • ssl.keystore.path:容器中 JDK 金鑰儲存區的路徑 - /etc/alternatives/jre/lib/security/cacerts/
  • ssl.keystore.password:JDK 金鑰儲存區 (預設) 密碼
  • ssl.hostnameValidation:我們會開啟自己的節點主機名稱驗證
  • ssl.providerJDK 是用來作為 SSL 提供者

泛型參數

  • key.converter:我們會使用字串轉換器 org.apache.kafka.connect.storage.StringConverter
  • value.converter:因為 Kafka 主題中的資料是 JSON,所以我們會使用 org.apache.kafka.connect.json.JsonConverter
  • value.converter.schemas.enable:因為我們的 JSON 承載沒有與其相關聯的結構描述 (針對示範應用程式的目的),我們必須藉由將此屬性設定為 false,指示 Kafka Connect 不要尋找結構描述。 若未這麼做,將會導致失敗。

安裝連接器

使用 Kafka Connect REST 端點來安裝連接器:

curl -X POST -H "Content-Type: application/json" --data @cassandra-sink-config.json http://localhost:8083/connectors

若要檢查狀態:

curl http://localhost:8080/connectors/kafka-cosmosdb-sink/status

如果一切順利,連接器應該會開始施展魔術。 應該會向 Azure Cosmos DB 進行驗證,並且開始將資料從 Kafka 主題 (weather-data) 擷取至 Cassandra 資料表 - weather.data_by_stateweather.data_by_station

您現在可以查詢資料表中的資料。 移至 Azure 入口網站,為您的 Azure Cosmos DB 帳戶啟動裝載的 CQL Shell。

Open CQLSH

從 Azure Cosmos DB 查詢資料

檢查 data_by_statedata_by_station 資料表。 以下是一些可協助您開始使用的範例查詢:

select * from weather.data_by_state where state = 'state-1';
select * from weather.data_by_state where state IN ('state-1', 'state-2');
select * from weather.data_by_state where state = 'state-3' and ts > toTimeStamp('2020-11-26');

select * from weather.data_by_station where station_id = 'station-1';
select * from weather.data_by_station where station_id IN ('station-1', 'station-2');
select * from weather.data_by_station where station_id IN ('station-2', 'station-3') and ts > toTimeStamp('2020-11-26');

清除資源

完成您的應用程式和 Azure Cosmos DB 帳戶之後,您可以將建立的 Azure 資源刪除,以免產生更多費用。 若要刪除資源:

  1. 在 Azure 入口網站的 [搜尋] 列中,搜尋並選取 [資源群組]

  2. 在該清單中,選取您在本快速入門中建立的資源群組。

    Select the resource group to delete

  3. 在 [資源群組] 的 [概觀] 頁面中,選取 [刪除資源群組]

    Delete the resource group

  4. 在下個視窗中輸入要刪除的資源群組名稱,然後選取 [刪除]

下一步