Apache Kafka'dan Azure Veri Gezgini'a veri alma
Apache Kafka , verileri sistemler veya uygulamalar arasında güvenilir bir şekilde taşınan gerçek zamanlı akış verisi işlem hatları oluşturmaya yönelik bir dağıtılmış akış platformudur. Kafka Connect , Apache Kafka ile diğer veri sistemleri arasında ölçeklenebilir ve güvenilir veri akışı sağlayan bir araçtır. Kusto Kafka Havuzu, Kafka'dan bağlayıcı görevi görür ve kod kullanılmasını gerektirmez. Git deposundan veya Confluent Connector Hub'dan havuz bağlayıcısı jar dosyasını indirin.
Bu makalede, Kafka kümesini ve Kafka bağlayıcı kümesi kurulumunu basitleştirmek için bağımsız bir Docker kurulumu kullanarak Kafka ile veri alma işlemi gösterilmektedir.
Daha fazla bilgi için bkz. Bağlayıcı Git deposu ve sürüm ayrıntıları.
Önkoşullar
- Azure aboneliği. Ücretsiz bir Azure hesabı oluşturun.
- Microsoft Fabric'te varsayılan önbellek ve bekletme ilkelerine veyaKQL veritabanına sahip bir Azure Veri Gezgini kümesi ve veritabanı.
- Azure CLI.
- Docker ve Docker Compose.
Microsoft Entra hizmet sorumlusu oluşturma
Microsoft Entra hizmet sorumlusu, aşağıdaki örnekte olduğu gibi Azure portal aracılığıyla veya program aracılığıyla oluşturulabilir.
Bu hizmet sorumlusu, bağlayıcı tarafından Kusto'da tablonuza veri yazmak için kullanılan kimlik olacaktır. Daha sonra bu hizmet sorumlusuna Kusto kaynaklarına erişmek için izinler verirsiniz.
Azure CLI aracılığıyla Azure aboneliğinizde oturum açın. Ardından tarayıcıda kimlik doğrulaması yapın.
az login
Sorumluyu barındıracak aboneliği seçin. Bu adım, birden çok aboneliğiniz olduğunda gereklidir.
az account set --subscription YOUR_SUBSCRIPTION_GUID
Hizmet sorumlusunu oluşturun. Bu örnekte hizmet sorumlusu olarak adlandırılır
my-service-principal
.az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
Döndürülen JSON verilerinden, gelecekte kullanmak üzere ,
password
vetenant
değerini kopyalayınappId
.{ "appId": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn", "displayName": "my-service-principal", "name": "my-service-principal", "password": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn", "tenant": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn" }
Microsoft Entra uygulamanızı ve hizmet sorumlunuzu oluşturdunuz.
Hedef tablo oluşturma
Sorgu ortamınızdan aşağıdaki komutu kullanarak adlı
Storms
bir tablo oluşturun:.create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)
Aşağıdaki komutu kullanarak alınan veriler için ilgili tablo eşlemesini
Storms_CSV_Mapping
oluşturun:.create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'
Yapılandırılabilir kuyruğa alınmış alım gecikmesi için tabloda bir alma toplu işlemi ilkesi oluşturun.
İpucu
Alma toplu işleme ilkesi bir performans iyileştiricidir ve üç parametre içerir. Karşılanan ilk koşul, Azure Veri Gezgini tablosuna alımı tetikler.
.alter table Storms policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:15", "MaximumNumberOfItems": 100, "MaximumRawDataSizeMB": 300}'
Veritabanıyla çalışma izni vermek için Microsoft Entra hizmet sorumlusu oluşturma bölümünden hizmet sorumlusunu kullanın.
.add database YOUR_DATABASE_NAME admins ('aadapp=YOUR_APP_ID;YOUR_TENANT_ID') 'AAD App'
Laboratuvarı çalıştırma
Aşağıdaki laboratuvar, veri oluşturmaya başlama, Kafka bağlayıcısını ayarlama ve bu verileri bağlayıcı ile Azure Veri Gezgini akışla aktarma deneyimi sunmak için tasarlanmıştır. Daha sonra alınan verilere bakabilirsiniz.
Git deposunu kopyalama
Laboratuvarın git deposunu kopyalayın.
Makinenizde yerel bir dizin oluşturun.
mkdir ~/kafka-kusto-hol cd ~/kafka-kusto-hol
Depoyu kopyalayın.
cd ~/kafka-kusto-hol git clone https://github.com/Azure/azure-kusto-labs cd azure-kusto-labs/kafka-integration/dockerized-quickstart
Kopyalanan deponun içeriği
Kopyalanan deponun içeriğini listelemek için aşağıdaki komutu çalıştırın:
cd ~/kafka-kusto-hol/azure-kusto-labs/kafka-integration/dockerized-quickstart
tree
Bu aramanın sonucu:
├── README.md
├── adx-query.png
├── adx-sink-config.json
├── connector
│ └── Dockerfile
├── docker-compose.yaml
└── storm-events-producer
├── Dockerfile
├── StormEvents.csv
├── go.mod
├── go.sum
├── kafka
│ └── kafka.go
└── main.go
Kopyalanan depodaki dosyaları gözden geçirme
Aşağıdaki bölümlerde, yukarıdaki dosya ağacındaki dosyaların önemli bölümleri açıklanmaktadır.
adx-sink-config.json
Bu dosya, belirli yapılandırma ayrıntılarını güncelleştireceğiniz Kusto havuz özellikleri dosyasını içerir:
{
"name": "storm",
"config": {
"connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
"flush.size.bytes": 10000,
"flush.interval.ms": 10000,
"tasks.max": 1,
"topics": "storm-events",
"kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
"aad.auth.authority": "<enter tenant ID>",
"aad.auth.appid": "<enter application ID>",
"aad.auth.appkey": "<enter client secret>",
"kusto.ingestion.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
"kusto.query.url": "https://<name of cluster>.<region>.kusto.windows.net",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}
Azure Veri Gezgini kurulumunuza göre aşağıdaki özniteliklerin değerlerini değiştirin: , , , kusto.tables.topics.mapping
(veritabanı adı), kusto.ingestion.url
ve kusto.query.url
. aad.auth.appkey
aad.auth.appid
aad.auth.authority
Bağlayıcı - Dockerfile
Bu dosya, bağlayıcı örneği için docker görüntüsünü oluşturmaya yönelik komutlara sahiptir. Git deposu yayın dizininden bağlayıcı indirmesini içerir.
Storm-events-producer dizini
Bu dizinde yerel bir "StormEvents.csv" dosyasını okuyan ve verileri kafka konusuna yayımlayan bir Go programı vardır.
docker-compose.yaml
version: "2"
services:
zookeeper:
image: debezium/zookeeper:1.2
ports:
- 2181:2181
kafka:
image: debezium/kafka:1.2
ports:
- 9092:9092
links:
- zookeeper
depends_on:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
kusto-connect:
build:
context: ./connector
args:
KUSTO_KAFKA_SINK_VERSION: 1.0.1
ports:
- 8083:8083
links:
- kafka
depends_on:
- kafka
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=adx
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
events-producer:
build:
context: ./storm-events-producer
links:
- kafka
depends_on:
- kafka
environment:
- KAFKA_BOOTSTRAP_SERVER=kafka:9092
- KAFKA_TOPIC=storm-events
- SOURCE_FILE=StormEvents.csv
Kapsayıcıları başlatma
Terminalde kapsayıcıları başlatın:
docker-compose up
Üretici uygulaması konuya olay göndermeye
storm-events
başlar. Aşağıdaki günlüklere benzer günlükler görmeniz gerekir:.... events-producer_1 | sent message to partition 0 offset 0 events-producer_1 | event 2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public events-producer_1 | events-producer_1 | sent message to partition 0 offset 1 events-producer_1 | event 2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer ....
Günlükleri denetlemek için aşağıdaki komutu ayrı bir terminalde çalıştırın:
docker-compose logs -f | grep kusto-connect
Bağlayıcıyı başlatma
Bağlayıcıyı başlatmak için Kafka Connect REST çağrısı kullanın.
Ayrı bir terminalde aşağıdaki komutla havuz görevini başlatın:
curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8083/connectors
Durumu denetlemek için aşağıdaki komutu ayrı bir terminalde çalıştırın:
curl http://localhost:8083/connectors/storm/status
Bağlayıcı, azure Veri Gezgini alma işlemlerini kuyruğa almaya başlar.
Not
Günlük bağlayıcısı sorunlarınız varsa bir sorun oluşturun.
Verileri sorgulama ve gözden geçirme
Veri alımını onaylama
Verilerin tabloya ulaşmasını
Storms
bekleyin. Veri aktarımını onaylamak için satır sayısını denetleyin:Storms | count
Alma işleminde hata olmadığını onaylayın:
.show ingestion failures
Verileri gördüğünüzde birkaç sorgu deneyin.
Verileri sorgulama
Tüm kayıtları görmek için aşağıdaki sorguyu çalıştırın:
Storms
Belirli verileri filtrelemek için ve
project
kullanınwhere
:Storms | where EventType == 'Drought' and State == 'TEXAS' | project StartTime, EndTime, Source, EventId
summarize
şu işleci kullanın:Storms | summarize event_count=count() by State | where event_count > 10 | project State, event_count | render columnchart
Daha fazla sorgu örneği ve kılavuz için bkz. KQL'de sorgu yazma ve Kusto Sorgu Dili belgeleri.
Sıfırla
Sıfırlamak için aşağıdaki adımları uygulayın:
- Kapsayıcıları durdurma (
docker-compose down -v
) - Sil (
drop table Storms
) - Tabloyu yeniden oluşturma
Storms
- Tablo eşlemesini yeniden oluşturma
- Kapsayıcıları yeniden başlatma (
docker-compose up
)
Kaynakları temizleme
Azure Veri Gezgini kaynaklarını silmek için az cluster delete veya az Kusto database delete kullanın:
az kusto cluster delete -n <cluster name> -g <resource group name>
az kusto database delete -n <database name> --cluster-name <cluster name> -g <resource group name>
Kafka Havuz bağlayıcısını ayarlama
Kafka Havuz bağlayıcısını alma toplu işlemi ilkesiyle çalışacak şekilde ayarlayın:
- Kafka Havuzu
flush.size.bytes
boyut sınırını 1 MB'tan başlayarak 10 MB veya 100 MB'lık artışlarla ayarlayın. - Kafka Havuzu kullanılırken veriler iki kez toplanır. Bağlayıcı tarafındaki veriler temizleme ayarlarına göre ve Azure Veri Gezgini hizmet tarafında toplu işlem ilkesine göre toplanır. Toplu işlem süresi çok kısaysa ve hem bağlayıcı hem de hizmet tarafından veri alınamıyorsa, toplu işlem süresi artırılmalıdır. Toplu işlem boyutunu 1 GB olarak ayarlayın ve gerektiğinde 100 MB artış artırın veya azaltın. Örneğin, boşaltma boyutu 1 MB ve toplu işlem ilkesi boyutu 100 MB ise, Kafka Havuzu bağlayıcısı tarafından 100 MB toplu iş toplandıktan sonra, Azure Veri Gezgini hizmeti tarafından 100 MB toplu iş alınır. Toplu işlem ilkesi süresi 20 saniyeyse ve Kafka Havuzu bağlayıcısı 20 saniyelik bir süre içinde 50 MB'ı temizlerse, hizmet 50 MB'lık bir toplu işlemi alır.
- Örnekleri ve Kafka bölümlerini ekleyerek ölçeklendirme yapabilirsiniz. Bölüm sayısına kadar artırın
tasks.max
. Ayarın boyutundaflush.size.bytes
bir blob oluşturmak için yeterli veriniz varsa bir bölüm oluşturun. Blob daha küçükse, toplu iş zaman sınırına ulaştığında işlenir, bu nedenle bölüm yeterli aktarım hızı almaz. Çok sayıda bölüm daha fazla işlem yükü anlamına gelir.
İlgili içerik
- Büyük veri mimarisi hakkında daha fazla bilgi edinin.
- JSON biçimli örnek verileri Azure Veri Gezgini'a nasıl alacağınızı öğrenin.
- Ek Kafka laboratuvarları için:
Geri Bildirim
https://aka.ms/ContentUserFeedback.
Çok yakında: 2024 boyunca, içerik için geri bildirim mekanizması olarak GitHub Sorunları’nı kullanımdan kaldıracak ve yeni bir geri bildirim sistemiyle değiştireceğiz. Daha fazla bilgi için bkz.Gönderin ve geri bildirimi görüntüleyin