Partilhar via


Ingerir dados do Apache Kafka no Azure Cosmos DB para Apache Cassandra com o Kafka Connect

APLICA-SE A: Cassandra

As aplicações Cassandra existentes podem funcionar facilmente com o Azure Cosmos DB para Apache Cassandra devido à compatibilidade do controlador CQLv4. Tira partido desta capacidade para integrar em plataformas de transmissão em fluxo como o Apache Kafka e trazer dados para o Azure Cosmos DB.

Os dados no Apache Kafka (tópicos) só são úteis quando consumidos por outras aplicações ou ingeridos noutros sistemas. É possível criar uma solução com as APIs de Produtor/Consumidor do Kafkacom um SDK de cliente e idioma à sua escolha. O Kafka Connect fornece uma solução alternativa. É uma plataforma para transmitir dados entre o Apache Kafka e outros sistemas de forma dimensionável e fiável. Uma vez que o Kafka Connect suporta conectores que incluem o Cassandra, não precisa de escrever código personalizado para integrar o Kafka no Azure Cosmos DB para Apache Cassandra.

Neste artigo, vamos utilizar o conector DataStax do Apache Kafka de código aberto, que funciona sobre a arquitetura do Kafka Connect para ingerir registos de um tópico do Kafka em linhas de uma ou mais tabelas do Cassandra. O exemplo fornece uma configuração reutilizável com o Docker Compose. Isto é bastante conveniente, uma vez que lhe permite iniciar o bootstrap de todos os componentes necessários localmente com um único comando. Estes componentes incluem o Kafka, o Zookeeper, a função de trabalho do Kafka Connect e a aplicação de gerador de dados de exemplo.

Eis uma discriminação dos componentes e das respetivas definições de serviço. Pode consultar o ficheiro completo docker-composeno repositório do GitHub.

  • O Kafka e o Zookeeper utilizam imagens de debezium .
  • Para ser executado como um contentor do Docker, o Conector Do Apache Kafka DataStax é criado sobre uma imagem do Docker existente – debezium/connect-base. Esta imagem inclui uma instalação do Kafka e das respetivas bibliotecas do Kafka Connect, o que torna realmente conveniente adicionar conectores personalizados. Pode consultar o Dockerfile.
  • O data-generator serviço gera dados gerados aleatoriamente (JSON) no tópico do weather-data Kafka. Pode consultar o código e Dockerfileno repositório do GitHub

Pré-requisitos

Criar Keyspace, tabelas e iniciar o pipeline de integração

Com o portal do Azure, crie o Cassandra Keyspace e as tabelas necessárias para a aplicação de demonstração.

Nota

Utilize os mesmos nomes de Keyspace e tabela que abaixo

CREATE KEYSPACE weather WITH REPLICATION = {'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1};

CREATE TABLE weather.data_by_state (station_id text, temp int, state text, ts timestamp, PRIMARY KEY (state, ts)) WITH CLUSTERING ORDER BY (ts DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;

CREATE TABLE weather.data_by_station (station_id text, temp int, state text, ts timestamp, PRIMARY KEY (station_id, ts)) WITH CLUSTERING ORDER BY (ts DESC) AND cosmosdb_cell_level_timestamp=true AND cosmosdb_cell_level_timestamp_tombstones=true AND cosmosdb_cell_level_timetolive=true;

Clone o repositório do GitHub:

git clone https://github.com/Azure-Samples/cosmosdb-cassandra-kafka
cd cosmosdb-cassandra-kafka

Inicie todos os serviços:

docker-compose --project-name kafka-cosmos-cassandra up --build

Nota

Pode demorar algum tempo a transferir e iniciar os contentores: este é apenas um processo único.

Para confirmar se todos os contentores foram iniciados:

docker-compose -p kafka-cosmos-cassandra ps

A aplicação geradora de dados começará a bombear dados para o weather-data tópico no Kafka. Também pode fazer uma verificação de sanidade rápida para confirmar. Espreite o contentor do Docker com a função de trabalho de ligação do Kafka:

docker exec -it kafka-cosmos-cassandra_cassandra-connector_1 bash

Depois de largar na shell do contentor, basta iniciar o processo de consumidor habitual da consola kafka e deverá ver os dados meteorológicos (no formato JSON) a fluir.

cd ../bin
./kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic weather-data

Configuração do conector Sink para Cassandra

Copie os conteúdos JSON abaixo para um ficheiro (pode dar-lhe o nome cassandra-sink-config.json). Terá de atualizá-lo de acordo com a sua configuração e o resto desta secção fornecerá orientações sobre este tópico.

{
    "name": "kafka-cosmosdb-sink",
    "config": {
        "connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
        "tasks.max": "1",
        "topics": "weather-data",
        "contactPoints": "<cosmos db account name>.cassandra.cosmos.azure.com",
        "port": 10350,
        "loadBalancing.localDc": "<cosmos db region e.g. Southeast Asia>",
        "auth.username": "<enter username for cosmosdb account>",
        "auth.password": "<enter password for cosmosdb account>",
        "ssl.hostnameValidation": true,
        "ssl.provider": "JDK",
        "ssl.keystore.path": "/etc/alternatives/jre/lib/security/cacerts/",
        "ssl.keystore.password": "changeit",
        "datastax-java-driver.advanced.connection.init-query-timeout": 5000,
        "maxConcurrentRequests": 500,
        "maxNumberOfRecordsInBatch": 32,
        "queryExecutionTimeout": 30,
        "connectionPoolLocalSize": 4,
        "topic.weather-data.weather.data_by_state.mapping": "station_id=value.stationid, temp=value.temp, state=value.state, ts=value.created",
        "topic.weather-data.weather.data_by_station.mapping": "station_id=value.stationid, temp=value.temp, state=value.state, ts=value.created",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": false,
        "offset.flush.interval.ms": 10000
    }
}

Eis um resumo dos atributos:

Conectividade básica

  • contactPoints: introduza o ponto de contacto do Cassandra do Azure Cosmos DB
  • loadBalancing.localDc: introduza a região da conta do Azure Cosmos DB, por exemplo, Sudeste Asiático
  • auth.username: introduza o nome de utilizador
  • auth.password: introduza a palavra-passe
  • port: introduza o valor da porta (isto é 10350, não 9042. deixe-o tal como está)

Configuração SSL

O Azure Cosmos DB impõe conectividade segura através do conector SSL e o Conector kafka também suporta SSL.

  • ssl.keystore.path: caminho para o keystore do JDK no contentor – /etc/alternatives/jre/lib/security/cacerts/
  • ssl.keystore.password: palavra-passe do keystore JDK (predefinição)
  • ssl.hostnameValidation: Transformamos a validação do nome de anfitrião do nó próprio
  • ssl.provider: JDK é utilizado como o fornecedor SSL

Parâmetros genéricos

  • key.converter: Utilizamos o conversor de cadeias org.apache.kafka.connect.storage.StringConverter
  • value.converter: uma vez que os dados nos tópicos do Kafka são JSON, utilizamos org.apache.kafka.connect.json.JsonConverter
  • value.converter.schemas.enable: uma vez que o nosso payload JSON não tem um esquema associado (para efeitos da aplicação de demonstração), temos de instruir o Kafka Connect a não procurar um esquema ao definir este atributo como false. Não o fazer resultará em falhas.

Instalar o conector

Instale o conector com o ponto final REST do Kafka Connect:

curl -X POST -H "Content-Type: application/json" --data @cassandra-sink-config.json http://localhost:8083/connectors

Para verificar o estado:

curl http://localhost:8080/connectors/kafka-cosmosdb-sink/status

Se tudo correr bem, o conector deve começar a tecer a sua magia. Deve autenticar-se no Azure Cosmos DB e começar a ingerir dados do tópico do Kafka (weather-data) em tabelas do Cassandra e weather.data_by_stateweather.data_by_station

Agora pode consultar dados nas tabelas. Aceda à portal do Azure, traga a Shell CQL alojada para a sua conta do Azure Cosmos DB.

Abrir CQLSH

Consultar dados do Azure Cosmos DB

Verifique as data_by_state tabelas e data_by_station . Seguem-se algumas consultas de exemplo para começar:

select * from weather.data_by_state where state = 'state-1';
select * from weather.data_by_state where state IN ('state-1', 'state-2');
select * from weather.data_by_state where state = 'state-3' and ts > toTimeStamp('2020-11-26');

select * from weather.data_by_station where station_id = 'station-1';
select * from weather.data_by_station where station_id IN ('station-1', 'station-2');
select * from weather.data_by_station where station_id IN ('station-2', 'station-3') and ts > toTimeStamp('2020-11-26');

Limpar os recursos

Quando terminar a aplicação e a conta do Azure Cosmos DB, pode eliminar os recursos do Azure que criou para não incorrer em mais custos. Para eliminar os recursos:

  1. Na barra de Pesquisa do portal do Azure, procure e selecione Grupos de recursos.

  2. Na lista, selecione o grupo de recursos que criou para este início rápido.

    Selecione o grupo de recursos a eliminar

  3. Na página Descrição geral do grupo de recursos, selecione Eliminar grupo de recursos.

    Eliminar o grupo de recursos

  4. Na janela seguinte, introduza o nome do grupo de recursos a eliminar e, em seguida, selecione Eliminar.

Passos seguintes