Tutorial: Use Apache Kafka streams API em Azure HDInsight

Saiba como criar uma aplicação que use a API Apache Kafka Streams e execute-a com a Kafka em HDInsight.

A aplicação utilizada neste tutorial é uma contagem de palavras de transmissão em fluxo. Lê os dados do texto de um tópico do Kafka, extrai palavras individuais e, em seguida, armazena as palavras e as contagens noutro tópico.

O processamento de fluxo de Kafka é frequentemente feito usando Apache Spark ou Apache Storm. A versão Kafka 1.1.0 (em HDInsight 3.5 e 3.6) introduziu a API kafka Streams. Esta API permite-lhe transformar fluxos de dados entre tópicos de entrada e de saída. Em alguns casos, esta pode ser uma alternativa à criação de uma solução de transmissão em fluxo no Spark ou no Storm.

Para obter mais informações sobre as Transmissões em Fluxo do Kafka, veja a documentação Intro to Streams (Introdução às Transmissões em Fluxo) em Apache.org.

Neste tutorial, ficará a saber como:

  • Compreender o código
  • Criar e implementar a aplicação
  • Configurar tópicos do Kafka
  • Executar o código

Pré-requisitos

Compreender o código

A aplicação de exemplo está localizada em https://github.com/Azure-Samples/hdinsight-kafka-java-get-started, no subdiretório Streaming. A aplicação é composta por dois ficheiros:

  • pom.xml: este ficheiro define as dependências do projeto, a versão de Java e os métodos de empacotamento.
  • Stream.java: este ficheiro implementa a lógica de transmissão em fluxo.

Pom.xml

Seguem-se os aspetos importantes a compreender em relação ao ficheiro pom.xml:

  • Dependências: este projeto depende das APIs de Streams do Kafka, que são disponibilizadas pelo pacote kafka-clients. Esta dependência é definida pelo seguinte código XML:

    <!-- Kafka client for producer/consumer operations -->
    <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
    </dependency>
    

    A entrada ${kafka.version} é declarada na secção <properties>..</properties> de pom.xml e está configurada para a versão de Kafka do cluster do HDInsight.

  • Plug-ins: os plug-ins de Maven proporcionam diversas funcionalidades. Neste projeto, são utilizados os seguintes plug-ins:

    • maven-compiler-plugin: utilizado para definir a versão de Java utilizada pelo projeto para a versão 8. O HDInsight 3.6 requer o Java 8.
    • maven-shade-plugin: Usado para gerar um frasco uber que contém esta aplicação, e quaisquer dependências. Também é usado para definir o ponto de entrada da aplicação, para que você possa executar diretamente o ficheiro Jar sem ter que especificar a classe principal.

Stream.java

O ficheiro Stream.java utiliza a API Streams para implementar uma aplicação de contagem de palavras. Lê os dados de um tópico do Kafka, denominado test, e escreve as contagens de palavras num tópico com o nome wordcounts.

O código seguinte define a aplicação de contagem de palavras:

package com.microsoft.example;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;

import java.util.Arrays;
import java.util.Properties;

public class Stream
{
    public static void main( String[] args ) {
        Properties streamsConfig = new Properties();
        // The name must be unique on the Kafka cluster
        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-example");
        // Brokers
        streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, args[0]);
        // SerDes for key and values
        streamsConfig.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        streamsConfig.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

        // Serdes for the word and count
        Serde<String> stringSerde = Serdes.String();
        Serde<Long> longSerde = Serdes.Long();

        KStreamBuilder builder = new KStreamBuilder();
        KStream<String, String> sentences = builder.stream(stringSerde, stringSerde, "test");
        KStream<String, Long> wordCounts = sentences
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .map((key, word) -> new KeyValue<>(word, word))
                .countByKey("Counts")
                .toStream();
        wordCounts.to(stringSerde, longSerde, "wordcounts");

        KafkaStreams streams = new KafkaStreams(builder, streamsConfig);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

Criar e implementar o exemplo

Para compilar e implementar o projeto no cluster do Kafka no HDInsight, siga os passos abaixo:

  1. Desaça o seu diretório atual para a localização do hdinsight-kafka-java-get-started-master\Streaming diretório e, em seguida, use o seguinte comando para criar um pacote de frascos:

    mvn clean package
    

    Este comando cria o pacote em target/kafka-streaming-1.0-SNAPSHOT.jar.

  2. Substitua sshuser pelo utilizador SSH do seu cluster e clustername pelo nome do seu cluster. Utilize o seguinte comando para copiar o ficheiro para o kafka-streaming-1.0-SNAPSHOT.jar seu cluster HDInsight. Se tal lhe for pedido, introduza a palavra-passe da conta de utilizador SSH.

    scp ./target/kafka-streaming-1.0-SNAPSHOT.jar sshuser@clustername-ssh.azurehdinsight.net:kafka-streaming.jar
    

Criar tópicos Apache Kafka

  1. Substitua sshuser pelo utilizador SSH do seu cluster e CLUSTERNAME pelo nome do seu cluster. Abra uma ligação SSH ao cluster, introduzindo o seguinte comando. Se tal lhe for pedido, introduza a palavra-passe da conta de utilizador SSH.

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Instale o JQ, um processador JSON de linha de comando. A partir da ligação SSH aberta, insira o seguinte comando para instalar jq:

    sudo apt -y install jq
    
  3. Configurar a variável de senha. Substitua-a PASSWORD pela palavra-passe de login do cluster e, em seguida, introduza o comando:

    export password='PASSWORD'
    
  4. Extrair o nome do cluster corretamente arquivado. O invólucro real do nome do cluster pode ser diferente do que se espera, dependendo da forma como o cluster foi criado. Este comando obterá o invólucro real e, em seguida, armazená-lo-á numa variável. Introduza o seguinte comando:

    export clusterName=$(curl -u admin:$password -sS -G "http://headnodehost:8080/api/v1/clusters" | jq -r '.items[].Clusters.cluster_name')
    

    Nota

    Se você está fazendo este processo de fora do cluster, há um procedimento diferente para armazenar o nome do cluster. Obtenha o nome do cluster em minúsculas a partir do portal Azure. Em seguida, substitua o nome do cluster pelo <clustername> seguinte comando e execute-o: export clusterName='<clustername>'.

  5. Para obter os anfitriões corretores Kafka e os anfitriões do Apache Zookeeper, use os seguintes comandos. Quando lhe for pedido, introduza a palavra-passe para o início de sessão (administrador) do cluster.

    export KAFKAZKHOSTS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2);
    
    export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
    

    Nota

    Estes comandos requerem acesso a Ambari. Se o seu cluster estiver por trás de um NSG, executar estes comandos a partir de uma máquina que pode aceder a Ambari.

  6. Para criar os tópicos que a operação de transmissão em fluxo vai utilizar, utilize os seguintes comandos:

    Nota

    Poderá receber um erro que diz que o tópico test já existe. Esse erro não é problemático, porque o tópico pode já ter sido criado no tutorial da API Producer and Consumer.

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic test --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcounts --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
    

    Os tópicos são utilizados para os seguintes fins:

    • test: é neste tópico onde os registos são recebidos. A aplicação de transmissão em fluxo lê a partir deste tópico.
    • wordcounts: é neste tópico que a aplicação de transmissão em fluxo armazena a respetiva saída.
    • RekeyedIntermediateTopic: este tópico é utilizado para reparticionar os dados à medida que a contagem é atualizada pelo operador countByKey.
    • wordcount-example-Counts-changelog: este tópico é um arquivo de estado que a operação countByKey utiliza.

    O Kafka no HDInsight também pode ser configurado para criar tópicos automaticamente. Para obter mais informações, veja o documento Configure automatic topic creation (Configurar a criação automática de tópicos).

Executar o código

  1. Para iniciar a aplicação de transmissão em fluxo como um processo em segundo plano, utilize o seguinte comando:

    java -jar kafka-streaming.jar $KAFKABROKERS $KAFKAZKHOSTS &
    

    Pode receber um aviso sobre Apache log4j. Pode ignorá-lo.

  2. Para enviar os registos para o tópico test, utilize o seguinte comando para iniciar a aplicação de produtor:

    java -jar kafka-producer-consumer.jar producer test $KAFKABROKERS
    
  3. Após a conclusão do produtor, utilize o seguinte comando par ver as informações armazenada no tópico wordcounts:

    /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $KAFKABROKERS --topic wordcounts --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --from-beginning
    

    Os parâmetros --property dizem ao consumidor da consola para imprimir a chave (palavra), juntamente com a contagem (valor). Este parâmetro também configura o desserializador que vai ser utilizado para ler esses valores a partir do Kafka.

    O resultado é semelhante ao seguinte texto:

    dwarfs  13635
    ago     13664
    snow    13636
    dwarfs  13636
    ago     13665
    a       13803
    ago     13666
    a       13804
    ago     13667
    ago     13668
    jumped  13640
    jumped  13641
    

    O parâmetro --from-beginning configura o consumidor para começar no início dos registos armazenados no tópico. A contagem aumenta sempre que for encontrada uma palavra, de modo que o tópico contém múltiplas entradas para cada palavra, com uma contagem progressiva.

  4. Utilize Ctrl + C para sair do produtor. Continue a utilizar Ctrl + C para sair da aplicação e do consumidor.

  5. Para eliminar os tópicos utilizados pela operação de streaming, utilize os seguintes comandos:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic test --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcounts --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic RekeyedIntermediateTopic --zookeeper $KAFKAZKHOSTS
    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete --topic wordcount-example-Counts-changelog --zookeeper $KAFKAZKHOSTS
    

Limpar os recursos

Para limpar os recursos criados por este tutorial, pode eliminar o grupo de recursos. Ao eliminar o grupo de recursos também elimina o cluster do HDInsight associado e quaisquer outros recursos associados ao grupo de recursos.

Para remover o grupo de recursos através do Portal do Azure:

  1. No Portal do Azure, expanda o menu no lado esquerdo para abrir o menu de serviços e, em seguida, escolha Grupos de Recursos, para apresentar a lista dos seus grupos de recursos.
  2. Encontre o grupo de recursos a eliminar e, em seguida, clique com o botão direito do rato em Mais (...) no lado direito da lista.
  3. Selecione Eliminar grupo de recursos e, em seguida, confirme.

Passos seguintes

Neste documento, aprendeu a usar a API Apache Kafka Streams com Kafka em HDInsight. Use o seguinte para saber mais sobre trabalhar com Kafka.