Tutorial: Use the Apache Kafka Producer and Consumer APIs

Learn how to use the Kafka Producer and Consumer APIs with Kafka on HDInsight.

The Kafka Producer API allows applications to send streams of data to the Kafka cluster. The Kafka Consumer API allows applications to read streams of data from the cluster.

In this tutorial, you learn how to:

  • Set up your development environment
  • Set up your deployment environment
  • Understand the code
  • Build and deploy the application
  • Run the application on the cluster

For more information on the APIs, see Apache documentation on the Producer API and Consumer API.

Set up your development environment

You must have the following components installed in your development environment:

The following environment variables may be set when you install Java and the JDK on your development workstation. However, you should check that they exist and that they contain the correct values for your system.

  • JAVA_HOME - should point to the directory where the JDK is installed.
  • PATH - should contain the following paths:

    • JAVA_HOME (or the equivalent path).
    • JAVA_HOME\bin (or the equivalent path).
    • The directory where Maven is installed.

Set up your deployment environment

This tutorial requires Kafka on HDInsight 3.6. To learn how to create a Kafka on HDInsight cluster, see the Start with Kafka on HDInsight document.

Understand the code

The example application is located at https://github.com/Azure-Samples/hdinsight-kafka-java-get-started, in the Producer-Consumer subdirectory. The application consists primarily of four files:

  • pom.xml: This file defines the project dependencies, Java version, and packaging methods.
  • Producer.java: This file sends 1 million (1,000,000) random sentences to Kafka using the producer API.
  • Consumer.java: This file uses the consumer API to read data from Kafka and emit it to STDOUT.
  • Run.java: The command-line interface used to run the producer and consumer code.

Pom.xml

The important things to understand in the pom.xml file are:

  • Dependencies: This project relies on the Kafka producer and consumer APIs, which are provided by the kafka-clients package. The following XML code defines this dependency:

    <!-- Kafka client for producer/consumer operations -->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>${kafka.version}</version>
    </dependency>
    

    Note

    The ${kafka.version} entry is declared in the <properties>..</properties> section of pom.xml, and is configured to the Kafka version of the HDInsight cluster.

  • Plugins: Maven plugins provide various capabilities. In this project, the following plugins are used:

    • maven-compiler-plugin: Used to set the Java version used by the project to 8. This is the version of Java used by HDInsight 3.6.
    • maven-shade-plugin: Used to generate an uber jar that contains this application as well as any dependencies. It is also used to set the entry point of the application, so that you can directly run the Jar file without having to specify the main class.

Producer.java

The producer communicates with the Kafka broker hosts (worker nodes) to store data into a Kafka topic. The following code snippet from is from the Producer.java file:

package com.microsoft.example;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.Random;
import java.io.IOException;

public class Producer
{
    public static void produce(String brokers) throws IOException
    {

        // Set properties used to configure the producer
        Properties properties = new Properties();
        // Set the brokers (bootstrap servers)
        properties.setProperty("bootstrap.servers", brokers);
        // Set how to serialize key/value pairs
        properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        // So we can generate random sentences
        Random random = new Random();
        String[] sentences = new String[] {
                "the cow jumped over the moon",
                "an apple a day keeps the doctor away",
                "four score and seven years ago",
                "snow white and the seven dwarfs",
                "i am at two with nature"
        };

        String progressAnimation = "|/-\\";
        // Produce a bunch of records
        for(int i = 0; i < 1000000; i++) {
            // Pick a sentence at random
            String sentence = sentences[random.nextInt(sentences.length)];
            // Send the sentence to the test topic
            producer.send(new ProducerRecord<String, String>("test", sentence));
            String progressBar = "\r" + progressAnimation.charAt(i % progressAnimation.length()) + " " + i;
            System.out.write(progressBar.getBytes());
        }
    }
}

This code connects to the Kafka broker hosts (worker nodes), and then sends 1,000,000 sentences to Kafka using the Producer API.

Consumer.java

The consumer communicates with the Kafka broker hosts (worker nodes), and reads records in a loop. The following code snippet is from the Consumer.java file:

package com.microsoft.example;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.Properties;
import java.util.Arrays;

public class Consumer {
    public static void consume(String brokers, String groupId) {
        // Create a consumer
        KafkaConsumer<String, String> consumer;
        // Configure the consumer
        Properties properties = new Properties();
        // Point it to the brokers
        properties.setProperty("bootstrap.servers", brokers);
        // Set the consumer group (all consumers must belong to a group).
        properties.setProperty("group.id", groupId);
        // Set how to serialize key/value pairs
        properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        // When a group is first created, it has no offset stored to start reading from. This tells it to start
        // with the earliest record in the stream.
        properties.setProperty("auto.offset.reset","earliest");
        consumer = new KafkaConsumer<>(properties);

        // Subscribe to the 'test' topic
        consumer.subscribe(Arrays.asList("test"));

        // Loop until ctrl + c
        int count = 0;
        while(true) {
            // Poll for records
            ConsumerRecords<String, String> records = consumer.poll(200);
            // Did we get any?
            if (records.count() == 0) {
                // timeout/nothing to read
            } else {
                // Yes, loop over records
                for(ConsumerRecord<String, String> record: records) {
                    // Display record and count
                    count += 1;
                    System.out.println( count + ": " + record.value());
                }
            }
        }
    }
}

In this code, the consumer is configured to read from the start of the topic (auto.offset.reset is set to earliest.)

Run.java

The Run.java file provides a command line interface that runs either the producer or consumer code. You must provide the Kafka broker host information as a parameter. You can optionally include a group id value, which is used by the consumer process. If you create multiple consumer instances using the same group id, they will load balance reading from the topic.

package com.microsoft.example;

import java.io.IOException;
import java.util.UUID;

// Handle starting producer or consumer
public class Run {
    public static void main(String[] args) throws IOException {
        if(args.length < 2) {
            usage();
        }

        // Get the brokers
        String brokers = args[1];
        switch(args[0].toLowerCase()) {
            case "producer":
                Producer.produce(brokers);
                break;
            case "consumer":
                // Either a groupId was passed in, or we need a random one
                String groupId;
                if(args.length == 3) {
                    groupId = args[2];
                } else {
                    groupId = UUID.randomUUID().toString();
                }
                Consumer.consume(brokers, groupId);
                break;
            default:
                usage();
        }
        System.exit(0);
    }
    // Display usage
    public static void usage() {
        System.out.println("Usage:");
        System.out.println("kafka-example.jar <producer|consumer> brokerhosts [groupid]");
        System.exit(1);
    }
}

Build and deploy the example

  1. Download the examples from https://github.com/Azure-Samples/hdinsight-kafka-java-get-started.

  2. Change directories to the location of the Producer-Consumer directory and use the following command:

    mvn clean package
    

    This command creates a directory named target, that contains a file named kafka-producer-consumer-1.0-SNAPSHOT.jar.

  3. Use the following commands to copy the kafka-producer-consumer-1.0-SNAPSHOT.jar file to your HDInsight cluster:

    scp ./target/kafka-producer-consumer-1.0-SNAPSHOT.jar SSHUSER@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
    

    Replace SSHUSER with the SSH user for your cluster, and replace CLUSTERNAME with the name of your cluster. When prompted enter the password for the SSH user.

Run the example

  1. To open an SSH connection to the cluster, use the following command:

    ssh SSHUSER@CLUSTERNAME-ssh.azurehdinsight.net
    

    Replace SSHUSER with the SSH user for your cluster, and replace CLUSTERNAME with the name of your cluster. If prompted, enter the password for the SSH user account. For more information on using scp with HDInsight, see Use SSH with HDInsight.

  2. To create the Kafka topics that are used by this example, use the following steps:

    1. To save the cluster name to a variable and install a JSON parsing utility (jq), use the following commands. When prompted, enter the Kafka cluster name:

      sudo apt -y install jq
      read -p 'Enter your Kafka cluster name:' CLUSTERNAME
      
    2. To get the Kafka broker hosts and the Zookeeper hosts, use the following commands. When prompted, enter the password for the cluster login (admin) account.

      export KAFKAZKHOSTS=`curl -sS -u admin -G https://$CLUSTERNAME.azurehdinsight.net/api/v1/clusters/$CLUSTERNAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`; \
      export KAFKABROKERS=`curl -sS -u admin -G https://$CLUSTERNAME.azurehdinsight.net/api/v1/clusters/$CLUSTERNAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`; \
      
    3. To create the test topic, use the following command:

      /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic test --zookeeper $KAFKAZKHOSTS
      
  3. To run the producer and write data to the topic, use the following command:

    java -jar kafka-producer-consumer.jar producer $KAFKABROKERS
    
  4. Once the producer has finished, use the following command to read from the topic:

    java -jar kafka-producer-consumer.jar consumer $KAFKABROKERS
    

    The records read, along with a count of records, is displayed.

  5. Use Ctrl + C to exit the consumer.

Multiple consumers

Kafka consumers use a consumer group when reading records. Using the same group with multiple consumers results in load balanced reads from a topic. Each consumer in the group receives a portion of the records.

The consumer application accepts a parameter that is used as the group ID. For example, the following command starts a consumer using a group ID of mygroup:

java -jar kafka-producer-consumer.jar consumer $KAFKABROKERS mygroup

To see this process in action, use the following command:

tmux new-session 'java -jar kafka-producer-consumer.jar consumer $KAFKABROKERS mygroup' \; split-w
indow -h 'java -jar kafka-producer-consumer.jar consumer $KAFKABROKERS mygroup' \; attach

This command uses tmux to split the terminal into two columns. A consumer is started in each column, with the same group ID value. Once the consumers finish reading, notice that each read only a portion of the records. Use Ctrl + C twice to exit tmux.

Consumption by clients within the same group is handled through the partitions for the topic. For the test topic created earlier, it has eight partitions. If you start eight consumers, each consumer reads records from a single partition for the topic.

Important

There cannot be more consumer instances in a consumer group than partitions. In this example, one consumer group can contain up to eight consumers since that is the number of partitions in the topic. Or you can have multiple consumer groups, each with no more than eight consumers.

Records stored in Kafka are stored in the order they are received within a partition. To achieve in-ordered delivery for records within a partition, create a consumer group where the number of consumer instances matches the number of partitions. To achieve in-ordered delivery for records within the topic, create a consumer group with only one consumer instance.

Next steps

In this document, you learned how to use the Kafka Producer and Consumer API with Kafka on HDInsight. Use the following to learn more about working with Kafka: