Öğretici: Apache Kafka Üretici ve Tüketici API’lerini kullanma
HDInsight’ta Apache Kafka Üretici ve Tüketici API’lerini kullanmayı öğrenin.
Kafka Üretici API’si, uygulamaların Kafka kümesine veri akışları göndermesine olanak tanır. Kafka Tüketici API’si, uygulamaların kümeden veri akışları okumasına olanak tanır.
Bu öğreticide şunların nasıl yapıldığını öğreneceksiniz:
- Ön koşullar
- Kodu anlama
- Uygulama derleme ve dağıtma
- Uygulamayı küme üzerinde çalıştırma
API’ler hakkında daha fazla bilgi için Üretici API’si ve Tüketici API’si hakkında Apache belgelerine bakın.
Önkoşullar
- Apache Kafka HDInsight kümesinde. Kümeyi oluşturma hakkında bilgi edinmek için bkz. HDInsight'ta Apache Kafka ile başlama.
- Java Developer Kit (JDK) sürüm 8 veya OpenJDK gibi bir eşdeğeri.
- Apache Maven, Apache'ye göre düzgün bir şekilde yüklenir. Maven, Java projeleri için bir proje derleme sistemidir.
- Putty gibi bir SSH istemcisi. Daha fazla bilgi için bkz. SSH kullanarak HDInsight'a (Apache Hadoop) bağlanma.
Kodu anlama
Örnek uygulama, alt https://github.com/Azure-Samples/hdinsight-kafka-java-get-started dizininde Producer-Consumer bulunur. Güvenlik Paketi (ESP) Enterprise Kafka kümesi kullanıyorsanız, alt dizinde bulunan uygulama DomainJoined-Producer-Consumer sürümünü kullanabilirsiniz.
Uygulama öncelikli olarak dört dosyadan oluşur:
pom.xml: Bu dosya, proje bağımlılıklarını, Java sürümünü ve paketleme yöntemlerini tanımlar.Producer.java: Bu dosya, üretici API’sini kullanarak Kafka’ya rastgele tümceler gönderir.Consumer.java: Bu dosya, tüketici API’sini kullanarak Kafka’dan verileri okur ve STDOUT’a yayar.AdminClientWrapper.java: Bu dosya, Kafka konularını oluşturmak, açıklamak ve silmek için yönetici API'sini kullanır.Run.java: Üretici ve tüketici kodunu çalıştırmak için kullanılan komut satırı arabirimi.
Pom.xml
pom.xml dosyasında aşağıdaki önemli şeyler anlaşılır:
Bağımlılıklar: Bu proje,
kafka-clientspaketi tarafından sağlanan Kafka üretici ve tüketici API’lerini kullanır. Aşağıdaki XML kodu, bu bağımlılığı tanımlar:<!-- Kafka client for producer/consumer operations --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency>${kafka.version}girişi,pom.xmldosyasının<properties>..</properties>bölümünde bildirilir ve HDInsight kümesinin Kafka sürümüne yapılandırılır.Eklentiler: Maven eklentileri çeşitli özellikler sağlar. Bu projede aşağıdaki eklentiler kullanılır:
maven-compiler-plugin: Proje tarafından kullanılan Java sürümünü 8 olarak ayarlamak için kullanılır. HDInsight 3.6 tarafından kullanılan Java sürümüdür.maven-shade-plugin: Bu uygulamayı ve tüm bağımlılıkları içeren bir uber jar oluşturmak için kullanılır. Ana sınıfı belirtmek zorunda olmadan doğrudan Jar dosyasını çalıştırabilmeniz için uygulamanın giriş noktasını ayarlamak için de kullanılır.
Producer.java
Üretici, Kafka aracı konakları (çalışan düğümleri) ile iletişim kurar ve verileri bir Kafka konusuna gönderir. Aşağıdaki kod parçacığı, GitHub deposundaki Producer.java dosyasındandır ve üretici özelliklerini ayarlamayı gösterir. Güvenlik Enterprise kümeleri için ek bir özellik "properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");" ek bir özellik eklenmeli
Properties properties = new Properties();
// Set the brokers (bootstrap servers)
properties.setProperty("bootstrap.servers", brokers);
// Set how to serialize key/value pairs
properties.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
Consumer.java
Tüketici, Kafka aracı konakları (çalışan düğümleri) ile iletişim kurar ve bir döngüdeki kayıtları okur. Consumer.java dosyasından gelen aşağıdaki kod parçacığı tüketici özelliklerini ayarlar. Güvenlik Enterprise kümeleri için ek bir özellik "properties.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");" ek bir özellik eklenmeli
KafkaConsumer<String, String> consumer;
// Configure the consumer
Properties properties = new Properties();
// Point it to the brokers
properties.setProperty("bootstrap.servers", brokers);
// Set the consumer group (all consumers must belong to a group).
properties.setProperty("group.id", groupId);
// Set how to serialize key/value pairs
properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
// When a group is first created, it has no offset stored to start reading from. This tells it to start
// with the earliest record in the stream.
properties.setProperty("auto.offset.reset","earliest");
consumer = new KafkaConsumer<>(properties);
Bu kodda tüketici, konu başlangıcından okumak üzere yapılandırılmıştır (auto.offset.reset değeri earliest olarak ayarlanır.)
Run.java
Run.java dosyası, üretici veya tüketici kodunu çalıştıran bir komut satırı arabirimi sağlar. Kafka aracı konağı bilgilerini bir parametre olarak sağlamanız gerekir. İsteğe bağlı olarak, tüketici işlemi tarafından kullanılan bir grup kimliği değeri dahil olabilir. Aynı grup kimliğini kullanarak birden çok tüketici örneği oluşturmanız, konu başlığından okunan yük dengelemesi sağlar.
Örnek derleme ve dağıtma
Önceden yapılmış JAR dosyalarını kullanma
Jar'ları Kafka Başlarken Azure örneğinden indirin. Kümeniz Güvenlik Enterprise (ESP) etkinse kafka-producer-consumer-esp.jar kullanın. Jar dosyaları kümenize kopyalamak için aşağıdaki komutu kullanın.
scp kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
Koddan JAR dosyalarını derleme
Bu adımı atlamak için önceden oluşturulmuş jar dosyaları alt dizininden Prebuilt-Jars indirilebilir. kafka-producer-consumer.jar'ı indirin. Kümeniz Güvenlik Enterprise (ESP) etkinse kafka-producer-consumer-esp.jar kullanın. Jar'ı HDInsight kümenize kopyalamak için 3. adımı yürütün.
örneklerini indirin ve ayıklayın. https://github.com/Azure-Samples/hdinsight-kafka-java-get-started
Geçerli dizininizi dizinin konumu olarak
hdinsight-kafka-java-get-started\Producer-Consumerayarlayın. Güvenlik Paketi (ESP) Enterprise Kafka kümesi kullanıyorsanız, konumu alt dizinDomainJoined-Producer-Consumerolarak ayarlayabilirsiniz. Uygulamayı derlemek için aşağıdaki komutu kullanın:mvn clean packageBu komut,
kafka-producer-consumer-1.0-SNAPSHOT.jaradlı dosyayı içerentargetadlı bir dizin oluşturur. ESP kümeleri için dosya şu şekilde olur:kafka-producer-consumer-esp-1.0-SNAPSHOT.jarsshuserdeğerini, kümenizin SSH kullanıcısı ile,CLUSTERNAMEdeğerini kümenizin adıyla değiştirin. Dosyayıkafka-producer-consumer-1.0-SNAPSHOT.jarHDInsight kümenize kopyalamak için aşağıdaki komutu girin. İstendiğinde, SSH kullanıcısının parolasını girin.scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jar
Örneği çalıştırma
sshuserdeğerini, kümenizin SSH kullanıcısı ile,CLUSTERNAMEdeğerini kümenizin adıyla değiştirin. Aşağıdaki komutu girerek kümeye bir SSH bağlantısı açın. İstendiğinde, SSH kullanıcı hesabının parolasını girin.ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.netKafka aracı konaklarını almak için aşağıdaki komutta ve
<clustername><password>değerlerinin yerine yazın ve yürütün. aşağıdaki gibi için<clustername>büyük/büyük/Azure portal. yerine<password>küme oturum açma parolasını yazın ve yürütün:sudo apt -y install jq export clusterName='<clustername>' export password='<password>' export KAFKABROKERS=$(curl -sS -u admin:$password -G https://$clusterName.azurehdinsight.net/api/v1/clusters/$clusterName/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);Not
Bu komut Ambari erişimi gerektirir. Kümeniz bir NSG'nin arkasında ise, Ambari'ye erişen bir makineden bu komutu çalıştırın.
Aşağıdaki komutu girerek Kafka
myTestkonu başlığı oluşturun:java -jar kafka-producer-consumer.jar create myTest $KAFKABROKERSÜreticiyi çalıştırmak ve konuya veri yazmak için aşağıdaki komutu kullanın:
java -jar kafka-producer-consumer.jar producer myTest $KAFKABROKERSÜretici tamamlandıktan sonra, konu başlığından okumak için aşağıdaki komutu kullanın:
java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS scp ./target/kafka-producer-consumer*.jar sshuser@CLUSTERNAME-ssh.azurehdinsight.net:kafka-producer-consumer.jarOkunan kayıtlar, kayıt sayısıyla birlikte gösterilir.
Tüketiciden çıkış yapmak için Ctrl + C tuşlarını kullanın.
Birden çok tüketici
Kafka tüketicileri kayıtları okurken bir tüketici grubu kullanır. Birden çok tüketiciyle aynı grubun kullanılması, konu başlığından yük dengeli okuma yapılmasına neden olur. Gruptaki her bir tüketici, kayıtların bir kısmını alır.
Tüketici uygulaması, grup kimliği olarak kullanılan bir parametre kabul eder Örneğin, aşağıdaki komut bir myGroup grup kimliği kullanarak tüketiciyi başlatır:
java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup
Tüketiciden çıkış yapmak için Ctrl + C tuşlarını kullanın.
Bu işlemi uygulamada görmek için aşağıdaki komutu kullanın:
tmux new-session 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; split-window -h 'java -jar kafka-producer-consumer.jar consumer myTest $KAFKABROKERS myGroup' \
\; attach
Bu komut, terminali iki sütuna bölmek için tmux kullanır. Her sütunda aynı grup kimliği değerine sahip bir tüketici başlatılır. Tüketici okumayı tamamladıktan sonra her birinin yalnızca kayıtların bir bölümünü okuduğuna dikkat edin. çıkmak için Ctrl + C tuşlarını iki kez tmux kullanın.
Aynı gruptaki istemcilerin tüketimi, konu başlığının bölümleri aracılığıyla işlenir. Bu kod örneğinde, daha önce oluşturulan test konusunda sekiz bölüm vardır. Sekiz tüketici başlatırsanız, her tüketici konunun tek bir bölümünden kayıtları okur.
Önemli
Bir tüketici grubunda bölümden daha fazla tüketici örneği olamaz. Bu örnekte, konu başlığındaki bölüm sayısı sekiz olduğu için bir tüketici grubu en fazla bu sayıda tüketici içerebilir. Ya da her biri en fazla sekiz tüketici içeren birden fazla tüketici grubunuz olabilir.
Kafka içinde depolanan kayıtlar bir bölüm içinde alındıkları sırada depolanır. Bir bölüm içindeki kayıtlar için sıralı teslim sağlamak üzere, tüketici örneklerinin bölüm sayısıyla eşleştiği bir tüketici grubu oluşturun. Konu başlığı içindeki kayıtların sıralı teslim edilmesini sağlayabilmek için, yalnızca bir tüketici örneği içeren bir tüketici grubu oluşturun.
Karşılaştığı yaygın sorunlar
Konu oluşturma başarısız kümeniz Enterprise güvenlik paketi etkinse, üretici ve tüketici için önceden oluşturulmuş JAR dosyalarınıkullanın. ESP jar,
DomainJoined-Producer-Consumeralt dizindekikoddan oluşturulabilir. Üretici ve tüketici özellikleri,CommonClientConfigs.SECURITY_PROTOCOL_CONFIGESP etkin kümeler için ek bir özelliğe sahiptir.ESP etkin kümelerde hata: oluşturma ve kullanma işlemleri başarısız olursa ve bir ESP etkin küme kullanıyorsanız, kullanıcının
kafkaTüm Ranger ilkelerinde mevcut olup olmadığını denetleyin. Mevcut değilse, tüm Ranger ilkelerine ekleyin.
Kaynakları temizleme
Bu öğretici ile oluşturulan kaynakları temizlemek için kaynak grubunu silebilirsiniz. Kaynak grubunun silinmesi, ilişkili HDInsight kümesini ve kaynak grubuyla ilişkili diğer tüm kaynakları da siler.
Azure portalını kullanarak kaynak grubunu kaldırmak için:
- Azure portalında sol taraftaki menüyü genişleterek hizmet menüsünü açın ve sonra Kaynak Grupları'nı seçerek kaynak gruplarınızın listesini görüntüleyin.
- Silinecek kaynak grubunu bulun ve sonra listenin sağ tarafındaki Daha fazla düğmesine (...) sağ tıklayın.
- Kaynak grubunu sil'i seçip onaylayın.
Sonraki adımlar
Bu belgede, HDInsight üzerinde Kafka ile Apache Kafka üreticisi ve tüketici API 'sini kullanmayı öğrendiniz. Kafka ile çalışma hakkında daha fazla bilgi için aşağıdakileri kullanın: