Ingerir dados do Apache Kafka no Azure Cosmos DB para Apache Cassandra com o Kafka Connect
APLICA-SE A: Cassandra
As aplicações Cassandra existentes podem funcionar facilmente com o Azure Cosmos DB para Apache Cassandra devido à compatibilidade do controlador CQLv4. Tira partido desta capacidade para integrar em plataformas de transmissão em fluxo como o Apache Kafka e trazer dados para o Azure Cosmos DB.
Os dados no Apache Kafka (tópicos) só são úteis quando consumidos por outras aplicações ou ingeridos noutros sistemas. É possível criar uma solução com as APIs de Produtor/Consumidor do Kafkacom um SDK de cliente e idioma à sua escolha. O Kafka Connect fornece uma solução alternativa. É uma plataforma para transmitir dados entre o Apache Kafka e outros sistemas de forma dimensionável e fiável. Uma vez que o Kafka Connect suporta conectores que incluem o Cassandra, não precisa de escrever código personalizado para integrar o Kafka no Azure Cosmos DB para Apache Cassandra.
Neste artigo, vamos utilizar o conector DataStax do Apache Kafka de código aberto, que funciona sobre a arquitetura do Kafka Connect para ingerir registos de um tópico do Kafka em linhas de uma ou mais tabelas do Cassandra. O exemplo fornece uma configuração reutilizável com o Docker Compose. Isto é bastante conveniente, uma vez que lhe permite iniciar o bootstrap de todos os componentes necessários localmente com um único comando. Estes componentes incluem o Kafka, o Zookeeper, a função de trabalho do Kafka Connect e a aplicação de gerador de dados de exemplo.
Eis uma discriminação dos componentes e das respetivas definições de serviço. Pode consultar o ficheiro completo docker-compose
no repositório do GitHub.
- O Kafka e o Zookeeper utilizam imagens de debezium .
- Para ser executado como um contentor do Docker, o Conector Do Apache Kafka DataStax é criado sobre uma imagem do Docker existente – debezium/connect-base. Esta imagem inclui uma instalação do Kafka e das respetivas bibliotecas do Kafka Connect, o que torna realmente conveniente adicionar conectores personalizados. Pode consultar o Dockerfile.
- O
data-generator
serviço gera dados gerados aleatoriamente (JSON) no tópico doweather-data
Kafka. Pode consultar o código eDockerfile
no repositório do GitHub
Pré-requisitos
Aprovisionar uma conta do Azure Cosmos DB para Apache Cassandra
Instalar o Docker e o Docker Compose
Criar Keyspace, tabelas e iniciar o pipeline de integração
Com o portal do Azure, crie o Cassandra Keyspace e as tabelas necessárias para a aplicação de demonstração.
Nota
Utilize os mesmos nomes de Keyspace e tabela que abaixo
CREATE KEYSPACE weather WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1};
CREATE TABLE weather.data_by_state (station_id text, temp int, state text, ts timestamp, PRIMARY KEY (state, ts)) WITH CLUSTERING ORDER BY (ts DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;
CREATE TABLE weather.data_by_station (station_id text, temp int, state text, ts timestamp, PRIMARY KEY (station_id, ts)) WITH CLUSTERING ORDER BY (ts DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;
Clone o repositório do GitHub:
git clone https://github.com/Azure-Samples/cosmosdb-cassandra-kafka
cd cosmosdb-cassandra-kafka
Inicie todos os serviços:
docker-compose --project-name kafka-cosmos-cassandra up --build
Nota
Pode demorar algum tempo a transferir e iniciar os contentores: este é apenas um processo único.
Para confirmar se todos os contentores foram iniciados:
docker-compose -p kafka-cosmos-cassandra ps
A aplicação geradora de dados começará a bombear dados para o weather-data
tópico no Kafka. Também pode fazer uma verificação de sanidade rápida para confirmar. Espreite o contentor do Docker com a função de trabalho de ligação do Kafka:
docker exec -it kafka-cosmos-cassandra_cassandra-connector_1 bash
Depois de largar na shell do contentor, basta iniciar o processo de consumidor habitual da consola kafka e deverá ver os dados meteorológicos (no formato JSON) a fluir.
cd ../bin
./kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic weather-data
Configuração do conector Sink para Cassandra
Copie os conteúdos JSON abaixo para um ficheiro (pode dar-lhe o nome cassandra-sink-config.json
). Terá de atualizá-lo de acordo com a sua configuração e o resto desta secção fornecerá orientações sobre este tópico.
{
"name": "kafka-cosmosdb-sink",
"config": {
"connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
"tasks.max": "1",
"topics": "weather-data",
"contactPoints": "<cosmos db account name>.cassandra.cosmos.azure.com",
"port": 10350,
"loadBalancing.localDc": "<cosmos db region e.g. Southeast Asia>",
"auth.username": "<enter username for cosmosdb account>",
"auth.password": "<enter password for cosmosdb account>",
"ssl.hostnameValidation": true,
"ssl.provider": "JDK",
"ssl.keystore.path": "/etc/alternatives/jre/lib/security/cacerts/",
"ssl.keystore.password": "changeit",
"datastax-java-driver.advanced.connection.init-query-timeout": 5000,
"maxConcurrentRequests": 500,
"maxNumberOfRecordsInBatch": 32,
"queryExecutionTimeout": 30,
"connectionPoolLocalSize": 4,
"topic.weather-data.weather.data_by_state.mapping": "station_id=value.stationid, temp=value.temp, state=value.state, ts=value.created",
"topic.weather-data.weather.data_by_station.mapping": "station_id=value.stationid, temp=value.temp, state=value.state, ts=value.created",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"offset.flush.interval.ms": 10000
}
}
Eis um resumo dos atributos:
Conectividade básica
contactPoints
: introduza o ponto de contacto do Cassandra do Azure Cosmos DBloadBalancing.localDc
: introduza a região da conta do Azure Cosmos DB, por exemplo, Sudeste Asiáticoauth.username
: introduza o nome de utilizadorauth.password
: introduza a palavra-passeport
: introduza o valor da porta (isto é10350
, não9042
. deixe-o tal como está)
Configuração SSL
O Azure Cosmos DB impõe conectividade segura através do conector SSL e o Conector kafka também suporta SSL.
ssl.keystore.path
: caminho para o keystore do JDK no contentor –/etc/alternatives/jre/lib/security/cacerts/
ssl.keystore.password
: palavra-passe do keystore JDK (predefinição)ssl.hostnameValidation
: Transformamos a validação do nome de anfitrião do nó própriossl.provider
:JDK
é utilizado como o fornecedor SSL
Parâmetros genéricos
key.converter
: Utilizamos o conversor de cadeiasorg.apache.kafka.connect.storage.StringConverter
value.converter
: uma vez que os dados nos tópicos do Kafka são JSON, utilizamosorg.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable
: uma vez que o nosso payload JSON não tem um esquema associado (para efeitos da aplicação de demonstração), temos de instruir o Kafka Connect a não procurar um esquema ao definir este atributo comofalse
. Não o fazer resultará em falhas.
Instalar o conector
Instale o conector com o ponto final REST do Kafka Connect:
curl -X POST -H "Content-Type: application/json" --data @cassandra-sink-config.json http://localhost:8083/connectors
Para verificar o estado:
curl http://localhost:8080/connectors/kafka-cosmosdb-sink/status
Se tudo correr bem, o conector deve começar a tecer a sua magia. Deve autenticar-se no Azure Cosmos DB e começar a ingerir dados do tópico do Kafka (weather-data
) em tabelas do Cassandra e weather.data_by_state
weather.data_by_station
Agora pode consultar dados nas tabelas. Aceda à portal do Azure, traga a Shell CQL alojada para a sua conta do Azure Cosmos DB.
Consultar dados do Azure Cosmos DB
Verifique as data_by_state
tabelas e data_by_station
. Seguem-se algumas consultas de exemplo para começar:
select * from weather.data_by_state where state = 'state-1';
select * from weather.data_by_state where state IN ('state-1', 'state-2');
select * from weather.data_by_state where state = 'state-3' and ts > toTimeStamp('2020-11-26');
select * from weather.data_by_station where station_id = 'station-1';
select * from weather.data_by_station where station_id IN ('station-1', 'station-2');
select * from weather.data_by_station where station_id IN ('station-2', 'station-3') and ts > toTimeStamp('2020-11-26');
Limpar os recursos
Quando terminar a aplicação e a conta do Azure Cosmos DB, pode eliminar os recursos do Azure que criou para não incorrer em mais custos. Para eliminar os recursos:
Na barra de Pesquisa do portal do Azure, procure e selecione Grupos de recursos.
Na lista, selecione o grupo de recursos que criou para este início rápido.
Na página Descrição geral do grupo de recursos, selecione Eliminar grupo de recursos.
Na janela seguinte, introduza o nome do grupo de recursos a eliminar e, em seguida, selecione Eliminar.