Aracılığıyla paylaş


AKS üzerinde HDInsight üzerinde Apache Flink® üzerinde Apache Kafka® tablosu oluşturma

Önemli

Bu özellik şu anda önizlemededir. Microsoft Azure Önizlemeleri için Ek Kullanım Koşulları, beta, önizleme aşamasında olan veya henüz genel kullanıma sunulmamış Azure özellikleri için geçerli olan daha fazla yasal hüküm içerir. Bu belirli önizleme hakkında bilgi için bkz . AKS üzerinde Azure HDInsight önizleme bilgileri. Sorular veya özellik önerileri için lütfen AskHDInsight'ta ayrıntıları içeren bir istek gönderin ve Azure HDInsight Topluluğu hakkında daha fazla güncelleştirme için bizi takip edin.

Bu örneği kullanarak Apache FlinkSQL'de Kafka tablosu oluşturmayı öğrenin.

Önkoşullar

Kafka bağlayıcısı, Kafka konu başlıklarından veri okumanızı ve kafka konularına veri yazmanızı sağlar. Daha fazla bilgi için Apache Kafka SQL Bağlan veya bakın.

HDInsight Kafka'da konu ve veri hazırlama

weblog.py ile iletileri hazırlama

import random
import json
import time
from datetime import datetime

user_set = [
        'John',
        'XiaoMing',
        'Mike',
        'Tom',
        'Machael',
        'Zheng Hu',
        'Zark',
        'Tim',
        'Andrew',
        'Pick',
        'Sean',
        'Luke',
        'Chunck'
]

web_set = [
        'https://google.com',
        'https://facebook.com?id=1',
        'https://tmall.com',
        'https://baidu.com',
        'https://taobao.com',
        'https://aliyun.com',
        'https://apache.com',
        'https://flink.apache.com',
        'https://hbase.apache.com',
        'https://github.com',
        'https://gmail.com',
        'https://stackoverflow.com',
        'https://python.org'
]

def main():
        while True:
                if random.randrange(10) < 4:
                        url = random.choice(web_set[:3])
                else:
                        url = random.choice(web_set)

                log_entry = {
                        'userName': random.choice(user_set),
                        'visitURL': url,
                        'ts': datetime.now().strftime("%m/%d/%Y %H:%M:%S")
                }

                print(json.dumps(log_entry))
                time.sleep(0.05)

if __name__ == "__main__":
    main()

Kafka'ya işlem hattı konusu

sshuser@hn0-contsk:~$ python weblog.py | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic click_events

Diğer komutlar:

-- create topic
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 2 --partitions 3 --topic click_events --bootstrap-server wn0-contsk:9092

-- delete topic
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --delete  --topic click_events --bootstrap-server wn0-contsk:9092

-- consume topic
sshuser@hn0-contsk:~$ /usr/hdp/current/kafka-broker/bin/kafka-console-consumer.sh --bootstrap-server wn0-contsk:9092 --topic click_events --from-beginning
{"userName": "Luke", "visitURL": "https://flink.apache.com", "ts": "06/26/2023 14:33:43"}
{"userName": "Tom", "visitURL": "https://stackoverflow.com", "ts": "06/26/2023 14:33:43"}
{"userName": "Chunck", "visitURL": "https://google.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Chunck", "visitURL": "https://facebook.com?id=1", "ts": "06/26/2023 14:33:44"}
{"userName": "John", "visitURL": "https://tmall.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Andrew", "visitURL": "https://facebook.com?id=1", "ts": "06/26/2023 14:33:44"}
{"userName": "John", "visitURL": "https://tmall.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Pick", "visitURL": "https://google.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Mike", "visitURL": "https://tmall.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Zheng Hu", "visitURL": "https://tmall.com", "ts": "06/26/2023 14:33:44"}
{"userName": "Luke", "visitURL": "https://facebook.com?id=1", "ts": "06/26/2023 14:33:44"}
{"userName": "John", "visitURL": "https://flink.apache.com", "ts": "06/26/2023 14:33:44"}

Flink SQL istemcisi için Secure Shell'in nasıl kullanılacağı hakkında ayrıntılı yönergeler sağlanır.

Kafka SQL Bağlan veya Bağımlılıklarını SSH'ye indirin

Aşağıdaki adımda Kafka 3.2.0 bağımlılıklarını kullanıyoruz. KOMUTU HDInsight kümesindeki Kafka sürümünüz temelinde güncelleştirmeniz gerekir.

wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/3.2.0/kafka-clients-3.2.0.jar
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka/1.17.0/flink-connector-kafka-1.17.0.jar

Şimdi Kafka SQL istemci jar'ları ile Flink SQL İstemcisi'ne bağlanalım.

msdata@pod-0 [ /opt/flink-webssh ]$ bin/sql-client.sh -j flink-connector-kafka-1.17.0.jar -j kafka-clients-3.2.0.jar

Şimdi Flink SQL'de Kafka tablosunu oluşturalım ve Flink SQL'de Kafka tablosunu seçelim.

Aşağıdaki kod parçacığında Kafka bootstrap sunucu IP'lerinizi güncelleştirmeniz gerekir.

CREATE TABLE KafkaTable (
`userName` STRING,
`visitURL` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'click_events',
'properties.bootstrap.servers' = '<update-kafka-bootstrapserver-ip>:9092,<update-kafka-bootstrapserver-ip>:9092,<update-kafka-bootstrapserver-ip>:9092',
'properties.group.id' = 'my_group',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);

select * from KafkaTable;

Flink SQL'de Kafka tablosu oluşturma ve seçme adımlarını gösteren ekran görüntüsü.

Kafka iletileri oluşturma

Şimdi HDInsight Kafka kullanarak aynı konuya Kafka iletileri üretelim.

python weblog.py | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --bootstrap-server wn0-contsk:9092 --topic click_events

Tabloyu Flink SQL'de izleyebilirsiniz.

Flink SQL'de tablo tarihini izleme adımlarını gösteren ekran görüntüsü.

Flink Web kullanıcı arabirimindeki akış işleri aşağıdadır.

Flink web kullanıcı arabirimindeki işleri gösteren ekran görüntüsü.

Başvuru

  • Apache Kafka SQL Bağlan or
  • Apache, Apache Kafka, Kafka, Apache Flink, Flink ve ilişkili açık kaynak proje adları Apache Software Foundation'ın (ASF) ticari markalarıdır.