Tutorial: Use the Apache Kafka Producer and Consumer APIs

Learn how to use the Apache Kafka Producer and Consumer APIs with Kafka on HDInsight.

The Kafka Producer API allows applications to send streams of data to the Kafka cluster. The Kafka Consumer API allows applications to read streams of data from the cluster.

In this tutorial, you learn how to:

  • Prerequisites
  • Understand the code
  • Build and deploy the application
  • Run the application on the cluster

For more information on the APIs, see Apache documentation on the Producer API and Consumer API.

Prerequisites

Understand the code

The example application is located at https://github.com/Azure-Samples/hdinsight-kafka-java-get-started, in the Producer-Consumer subdirectory. The application consists primarily of four files:

  • pom.xml: This file defines the project dependencies, Java version, and packaging methods.
  • Producer.java: This file sends random sentences to Kafka using the producer API.
  • Consumer.java: This file uses the consumer API to read data from Kafka and emit it to STDOUT.
  • Run.java: The command-line interface used to run the producer and consumer code.

Pom.xml

The important things to understand in the pom.xml file are:

  • Dependencies: This project relies on the Kafka producer and consumer APIs, which are provided by the kafka-clients package. The following XML code defines this dependency:

    <!-- Kafka client for producer/consumer operations -->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>${kafka.version}</version>
    </dependency>
    

    Note

    The ${kafka.version} entry is declared in the <properties>..</properties> section of pom.xml, and is configured to the Kafka version of the HDInsight cluster.

  • Plugins: Maven plugins provide various capabilities. In this project, the following plugins are used:

    • maven-compiler-plugin: Used to set the Java version used by the project to 8. This is the version of Java used by HDInsight 3.6.
    • maven-shade-plugin: Used to generate an uber jar that contains this application as well as any dependencies. It is also used to set the entry point of the application, so that you can directly run the Jar file without having to specify the main class.

Producer.java

The producer communicates with the Kafka broker hosts (worker nodes) and sends data to a Kafka topic. The following code snippet is from the Producer.java file from the GitHub repository and shows how to set the producer properties:

Properties properties = new Properties();
// Set the brokers (bootstrap servers)
properties.setProperty("bootstrap.servers", brokers);
// Set how to serialize key/value pairs
properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

Consumer.java

The consumer communicates with the Kafka broker hosts (worker nodes), and reads records in a loop. The following code snippet from the Consumer.java file sets the consumer properties:

KafkaConsumer<String, String> consumer;
// Configure the consumer
Properties properties = new Properties();
// Point it to the brokers
properties.setProperty("bootstrap.servers", brokers);
// Set the consumer group (all consumers must belong to a group).
properties.setProperty("group.id", groupId);
// Set how to serialize key/value pairs
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
// When a group is first created, it has no offset stored to start reading from. This tells it to start
// with the earliest record in the stream.
properties.setProperty("auto.offset.reset","earliest");

consumer = new KafkaConsumer<>(properties);

In this code, the consumer is configured to read from the start of the topic (auto.offset.reset is set to earliest.)

Run.java

The Run.java file provides a command-line interface that runs either the producer or consumer code. You must provide the Kafka broker host information as a parameter. You can optionally include a group ID value, which is used by the consumer process. If you create multiple consumer instances using the same group ID, they will load balance reading from the topic.

Build and deploy the example

  1. Download and extract the examples from https://github.com/Azure-Samples/hdinsight-kafka-java-get-started.

  2. Set your current directory to the location of the hdinsight-kafka-java-get-started\Producer-Consumer directory and use the following command:

    mvn clean package
    

    This command creates a directory named target, that contains a file named kafka-producer-consumer-1.0-SNAPSHOT.jar.

  3. Replace sshuser with the SSH user for your cluster, and replace CLUSTERNAME with the name of your cluster. Enter the following command to copy the kafka-producer-consumer-1.0-SNAPSHOT.jar file to your HDInsight cluster. When prompted enter the password for the SSH user.

    scp ./target/kafka-producer-consumer-1.0-SNAPSHOT.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
    

Run the example

  1. Replace sshuser with the SSH user for your cluster, and replace CLUSTERNAME with the name of your cluster. Open an SSH connection to the cluster, by entering the following command. If prompted, enter the password for the SSH user account.

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Install jq, a command-line JSON processor. From the open SSH connection, enter following command to install jq:

    sudo apt -y install jq
    
  3. Set up environment variables. Replace PASSWORD and CLUSTERNAME with the cluster login password and cluster name respectively, then enter the command:

    export password='PASSWORD'
    export clusterNameA='CLUSTERNAME'
    
  4. Extract correctly cased cluster name. The actual casing of the cluster name may be different than you expect, depending on how the cluster was created. This command will obtain the actual casing, store it in a variable, and then display the correctly cased name, and the name you provided earlier. Enter the following command:

    export clusterName=$(curl -u admin:$password -sS -G "https://$clusterNameA.azurehdinsight.net/api/v1/clusters" \
    | jq -r '.items[].Clusters.cluster_name')
    echo $clusterName, $clusterNameA
    
  5. To get the Kafka broker hosts and the Apache Zookeeper hosts, use the following command:

    export KAFKABROKERS=`curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER \
    | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
    
  6. Create Kafka topic, myTest, by entering the following command:

    java -jar kafka-producer-consumer.jar create myTest $KAFKABROKERS
    
  7. To run the producer and write data to the topic, use the following command:

    java -jar kafka-producer-consumer.jar producer myTest $KAFKABROKERS
    
  8. Once the producer has finished, use the following command to read from the topic:

    java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS
    

    The records read, along with a count of records, is displayed.

  9. Use Ctrl + C to exit the consumer.

Multiple consumers

Kafka consumers use a consumer group when reading records. Using the same group with multiple consumers results in load balanced reads from a topic. Each consumer in the group receives a portion of the records.

The consumer application accepts a parameter that is used as the group ID. For example, the following command starts a consumer using a group ID of myGroup:

java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup

Use Ctrl + C to exit the consumer.

To see this process in action, use the following command:

tmux new-session 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; split-window -h 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; attach

This command uses tmux to split the terminal into two columns. A consumer is started in each column, with the same group ID value. Once the consumers finish reading, notice that each read only a portion of the records. Use Ctrl + C twice to exit tmux.

Consumption by clients within the same group is handled through the partitions for the topic. In this code sample, the test topic created earlier has eight partitions. If you start eight consumers, each consumer reads records from a single partition for the topic.

Important

There cannot be more consumer instances in a consumer group than partitions. In this example, one consumer group can contain up to eight consumers since that is the number of partitions in the topic. Or you can have multiple consumer groups, each with no more than eight consumers.

Records stored in Kafka are stored in the order they are received within a partition. To achieve in-ordered delivery for records within a partition, create a consumer group where the number of consumer instances matches the number of partitions. To achieve in-ordered delivery for records within the topic, create a consumer group with only one consumer instance.

Next steps

In this document, you learned how to use the Apache Kafka Producer and Consumer API with Kafka on HDInsight. Use the following to learn more about working with Kafka: