Start with Apache Kafka on HDInsight

Learn how to create and use an Apache Kafka cluster on Azure HDInsight. Kafka is an open-source, distributed streaming platform that is available with HDInsight. It is often used as a message broker, as it provides functionality similar to a publish-subscribe message queue.

Note

There are currently two versions of Kafka available with HDInsight; 0.9.0 (HDInsight 3.4) and 0.10.0 (HDInsight 3.5 and 3.6). The steps in this document assume that you are using Kafka on HDInsight 3.6.

Warning

Billing for HDInsight clusters is prorated per minute, whether you are using them or not. Be sure to delete your cluster after you have finished using it. For more information, see How to delete an HDInsight cluster.

Create a Kafka cluster

Use the following steps to create a Kafka on HDInsight cluster:

  1. From the Azure portal, select + NEW, Intelligence + Analytics, and then select HDInsight.

    Create a HDInsight cluster

  2. From Basics, enter the following information:

    • Cluster Name: The name of the HDInsight cluster.
    • Subscription: Select the subscription to use.
    • Cluster login username and Cluster login password: The login when accessing the cluster over HTTPS. You use these credentials to access services such as the Ambari Web UI or REST API.
    • Secure Shell (SSH) username: The login used when accessing the cluster over SSH. By default the password is the same as the cluster login password.
    • Resource Group: The resource group to create the cluster in.
    • Location: The Azure region to create the cluster in.

      Important

      For high availability of data, we recommend selecting a location (region) that contains three fault domains. For more information, see the Data high availability section.

    Select subscription

  3. Select Cluster type, and then set the following values from Cluster configuration:

    • Cluster Type: Kafka

    • Version: Kafka 0.10.0 (HDI 3.6)

    • Cluster Tier: Standard

    Finally, use the Select button to save settings.

    Select cluster type

  4. After selecting the cluster type, use the Select button to set the cluster type. Next, use the Next button to finish basic configuration.

  5. From Storage, select or create a Storage account. For the steps in this document, leave the other fields at the default values. Use the Next button to save storage configuration.

    Set the storage account settings for HDInsight

  6. From Applications (optional), select Next to continue. No applications are required for this example.

  7. From Cluster size, select Next to continue.

    Warning

    To guarantee availability of Kafka on HDInsight, your cluster must contain at least three worker nodes. For more information, see the Data high availability section.

    Set the Kafka cluster size

    Important

    The disks per worker node entry controls the scalability of Kafka on HDInsight. Kafka on HDInsight uses the local disk of the virtual machines in the cluster. Kafka is I/O heavy, so Azure Managed Disks are used to provide high throughput and provide more storage per node. The type of managed disk can be either Standard (HDD) or Premium (SSD). Premium disks are used with DS and GS series VMs. All other VM types use standard.

  8. From Advanced settings, select Next to continue.

  9. From the Summary, review the configuration for the cluster. Use the Edit links to change any settings that are incorrect. Finally, use the__Create__ button to create the cluster.

    Cluster configuration summary

    Note

    It can take up to 20 minutes to create the cluster.

Connect to the cluster

Important

When performing the following steps, you must use an SSH client. For more information, see the Use SSH with HDInsight document.

From your client, use SSH to connect to the cluster:

ssh SSHUSER@CLUSTERNAME-ssh.azurehdinsight.net

Replace SSHUSER with the SSH username you provided during cluster creation. Replace CLUSTERNAME with the name of the cluster.

When prompted, enter the password you used for the SSH account.

For information, see Use SSH with HDInsight.

Get the Zookeeper and Broker host information

When working with Kafka, you must know two host values; the Zookeeper hosts and the Broker hosts. These hosts are used with the Kafka API and many of the utilities that ship with Kafka.

Use the following steps to create environment variables that contain the host information. These environment variables are used in the steps in this document.

  1. From an SSH connection to the cluster, use the following command to install the jq utility. This utility is used to parse JSON documents, and is useful in retrieving the broker host information:

    sudo apt -y install jq
    
  2. To set the environment variables with information retrieved from Ambari, use the following commands:

    CLUSTERNAME='your cluster name'
    PASSWORD='your cluster password'
    export KAFKAZKHOSTS=`curl -sS -u admin:$PASSWORD -G https://$CLUSTERNAME.azurehdinsight.net/api/v1/clusters/$CLUSTERNAME/services/ZOOKEEPER/components/ZOOKEEPER_SERVER | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2`
    
    export KAFKABROKERS=`curl -sS -u admin:$PASSWORD -G https://$CLUSTERNAME.azurehdinsight.net/api/v1/clusters/$CLUSTERNAME/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2`
    
    echo '$KAFKAZKHOSTS='$KAFKAZKHOSTS
    echo '$KAFKABROKERS='$KAFKABROKERS
    

    Important

    Set CLUSTERNAME= to the name of the Kafka cluster. Set PASSWORD= to the login (admin) password you used when creating the cluster.

    The following text is an example of the contents of $KAFKAZKHOSTS:

    zk0-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181,zk2-kafka.eahjefxxp1netdbyklgqj5y1ud.ex.internal.cloudapp.net:2181

    The following text is an example of the contents of $KAFKABROKERS:

    wn1-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092,wn0-kafka.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092

    Note

    The cut command is used to trim the list of hosts to two host entries. You do not need to provide the full list of hosts when creating a Kafka consumer or producer.

    Warning

    Do not rely on the information returned from this session to always be accurate. If you scale the cluster, new brokers are added or removed. If a failure occurs and a node is replaced, the host name for the node may change.

    You should retrieve the Zookeeper and broker hosts information shortly before you use it to ensure you have valid information.

Create a topic

Kafka stores streams of data in categories called topics. From An SSH connection to a cluster headnode, use a script provided with Kafka to create a topic:

/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic test --zookeeper $KAFKAZKHOSTS

This command connects to Zookeeper using the host information stored in $KAFKAZKHOSTS, and then create Kafka topic named test. You can verify that the topic was created by using the following script to list topics:

/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --list --zookeeper $KAFKAZKHOSTS

The output of this command lists Kafka topics, which contains the test topic.

Produce and consume records

Kafka stores records in topics. Records are produced by producers, and consumed by consumers. Producers retrieve records from Kafka brokers. Each worker node in your HDInsight cluster is a Kafka broker.

Use the following steps to store records into the test topic you created earlier, and then read them using a consumer:

  1. From the SSH session, use a script provided with Kafka to write records to the topic:

    /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $KAFKABROKERS --topic test
    

    You are not returned to the prompt after this command. Instead, type a few text messages and then use Ctrl + C to stop sending to the topic. Each line is sent as a separate record.

  2. Use a script provided with Kafka to read records from the topic:

    /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $KAFKABROKERS --topic test --from-beginning
    

    This command retrieves the records from the topic and displays them. Using --from-beginning tells the consumer to start from the beginning of the stream, so all records are retrieved.

  3. Use Ctrl + C to stop the consumer.

Producer and consumer API

You can also programmatically produce and consume records using the Kafka APIs. To build a Java producer and consumer, use the following steps from your development environment.

Important

You must have the following components installed in your development environment:

  1. Download the examples from https://github.com/Azure-Samples/hdinsight-kafka-java-get-started. For the producer/consumer example, use the project in the Producer-Consumer directory. This example contains the following classes:

    • Run - starts either the consumer or producer.

    • Producer - stores 1,000,000 records to the topic.

    • Consumer - reads records from the topic.

  2. To create a jar package, change directories to the location of the Producer-Consumer directory and use the following command:

    mvn clean package
    

    This command creates a directory named target, that contains a file named kafka-producer-consumer-1.0-SNAPSHOT.jar.

  3. Use the following commands to copy the kafka-producer-consumer-1.0-SNAPSHOT.jar file to your HDInsight cluster:

    scp ./target/kafka-producer-consumer-1.0-SNAPSHOT.jar SSHUSER@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
    

    Replace SSHUSER with the SSH user for your cluster, and replace CLUSTERNAME with the name of your cluster. When prompted enter the password for the SSH user.

  4. Once the scp command finishes copying the file, connect to the cluster using SSH. Use the following command to write records to the test topic:

    java -jar kafka-producer-consumer.jar producer $KAFKABROKERS
    
  5. Once the process has finished, use the following command to read from the topic:

    java -jar kafka-producer-consumer.jar consumer $KAFKABROKERS
    

    The records read, along with a count of records, is displayed. You may see a few more than 1,000,000 logged as you sent several records to the topic using a script in an earlier step.

  6. Use Ctrl + C to exit the consumer.

Multiple consumers

Kafka consumers use a consumer group when reading records. Using the same group with multiple consumers results in load balanced reads from a topic. Each consumer in the group receives a portion of the records. To see this process in action, use the following steps:

  1. Open a new SSH session to the cluster, so that you have two of them. In each session, use the following to start a consumer with the same consumer group ID:

    java -jar kafka-producer-consumer.jar consumer $KAFKABROKERS mygroup
    

    This command starts a consumer using the group ID mygroup.

    Note

    Use the commands in the Get the Zookeeper and Broker host information section to set $KAFKABROKERS for this SSH session.

  2. Watch as each session counts the records it receives from the topic. The total of both sessions should be the same as you received previously from one consumer.

Consumption by clients within the same group is handled through the partitions for the topic. For the test topic created earlier, it has eight partitions. If you open eight SSH sessions and launch a consumer in all sessions, each consumer reads records from a single partition for the topic.

Important

There cannot be more consumer instances in a consumer group than partitions. In this example, one consumer group can contain up to eight consumers since that is the number of partitions in the topic. Or you can have multiple consumer groups, each with no more than eight consumers.

Records stored in Kafka are stored in the order they are received within a partition. To achieve in-ordered delivery for records within a partition, create a consumer group where the number of consumer instances matches the number of partitions. To achieve in-ordered delivery for records within the topic, create a consumer group with only one consumer instance.

Streaming API

The streaming API was added to Kafka in version 0.10.0; earlier versions rely on Apache Spark or Storm for stream processing.

  1. If you haven't already done so, download the examples from https://github.com/Azure-Samples/hdinsight-kafka-java-get-started to your development environment. For the streaming example, use the project in the streaming directory.

    This project contains only one class, Stream, which reads records from the test topic created previously. It counts the words read, and emits each word and count to a topic named wordcounts. The wordcounts topic is created in a later step in this section.

  2. From the command line in your development environment, change directories to the location of the Streaming directory, and then use the following command to create a jar package:

    mvn clean package
    

    This command creates a directory named target, which contains a file named kafka-streaming-1.0-SNAPSHOT.jar.

  3. Use the following commands to copy the kafka-streaming-1.0-SNAPSHOT.jar file to your HDInsight cluster:

    scp ./target/kafka-streaming-1.0-SNAPSHOT.jar SSHUSER@CLUSTERNAME-ssh.azurehdinsight.net:kafka-streaming.jar
    

    Replace SSHUSER with the SSH user for your cluster, and replace CLUSTERNAME with the name of your cluster. When prompted enter the password for the SSH user.

  4. Once the scp command finishes copying the file, connect to the cluster using SSH, and then use the following command to create the wordcounts topic:

    /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic wordcounts --zookeeper $KAFKAZKHOSTS
    
  5. Next, start the streaming process by using the following command:

    java -jar kafka-streaming.jar $KAFKABROKERS $KAFKAZKHOSTS 2>/dev/null &
    

    This command starts the streaming process in the background.

  6. Use the following command to send messages to the test topic. These messages are processed by the streaming example:

    java -jar kafka-producer-consumer.jar producer $KAFKABROKERS &>/dev/null &
    
  7. Use the following command to view the output that is written to the wordcounts topic by the streaming process:

    /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server $KAFKABROKERS --topic wordcounts --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
    

    Note

    To view the data, you must tell the consumer to print the key and the deserializer to use for the key and value. The key name is the word, and the key value contains the count.

    The output is similar to the following text:

     dwarfs  13635
     ago     13664
     snow    13636
     dwarfs  13636
     ago     13665
     a       13803
     ago     13666
     a       13804
     ago     13667
     ago     13668
     jumped  13640
     jumped  13641
     a       13805
     snow    13637
    

    Note

    The count increments each time a word is encountered.

  8. Use the Ctrl + C to exit the consumer, then use the fg command to bring the streaming background task back to the foreground. Use Ctrl + C to exit it also.

Data high availability

Each Azure region (location) provides fault domains. A fault domain is a logical grouping of underlying hardware in an Azure data center. Each fault domain shares a common power source and network switch. The virtual machines and managed disks that implement the nodes within an HDInsight cluster are distributed across these fault domains. This architecture limits the potential impact of physical hardware failures.

For information on the number of fault domains in a region, see the Availability of Linux virtual machines document.

Important

We recommend using an Azure region that contains three fault domains, and using a replication factor of 3.

If you must use a region that contains only two fault domains, use a replication factor of 4 to spread the replicas evenly across the two fault domains.

Kafka and fault domains

Kafka is not aware of fault domains. When creating partition replicas for topics, it may not distribute replicas properly for high availability. To ensure high availability, use the Kafka partition rebalance tool. This tool must be ran from an SSH session to the head node of your Kafka cluster.

To ensure the highest availability of your Kafka data, you should rebalance the partition replicas for your topic at the following times:

  • When a new topic or partition is created

  • When you scale up a cluster

Delete the cluster

Warning

Billing for HDInsight clusters is prorated per minute, whether you are using them or not. Be sure to delete your cluster after you have finished using it. For more information, see How to delete an HDInsight cluster.

Troubleshoot

If you run into issues with creating HDInsight clusters, see access control requirements.

Next steps

In this document, you have learned the basics of working with Apache Kafka on HDInsight. Use the following to learn more about working with Kafka: