Apache Kafka'dan Azure Veri Gezgini'a veri alma

Apache Kafka , verileri sistemler veya uygulamalar arasında güvenilir bir şekilde taşınan gerçek zamanlı akış verisi işlem hatları oluşturmaya yönelik bir dağıtılmış akış platformudur. Kafka Connect , Apache Kafka ile diğer veri sistemleri arasında ölçeklenebilir ve güvenilir veri akışı sağlayan bir araçtır. Kusto Kafka Havuzu, Kafka'dan bağlayıcı görevi görür ve kod kullanılmasını gerektirmez. Git deposundan veya Confluent Connector Hub'dan havuz bağlayıcısı jar dosyasını indirin.

Bu makalede, Kafka kümesini ve Kafka bağlayıcı kümesi kurulumunu basitleştirmek için bağımsız bir Docker kurulumu kullanarak Kafka ile veri alma işlemi gösterilmektedir.

Daha fazla bilgi için bkz. Bağlayıcı Git deposu ve sürüm ayrıntıları.

Önkoşullar

Microsoft Entra hizmet sorumlusu oluşturma

Microsoft Entra hizmet sorumlusu, aşağıdaki örnekte olduğu gibi Azure portal aracılığıyla veya program aracılığıyla oluşturulabilir.

Bu hizmet sorumlusu, bağlayıcı tarafından Kusto'da tablonuza veri yazmak için kullanılan kimlik olacaktır. Daha sonra bu hizmet sorumlusuna Kusto kaynaklarına erişmek için izinler verirsiniz.

  1. Azure CLI aracılığıyla Azure aboneliğinizde oturum açın. Ardından tarayıcıda kimlik doğrulaması yapın.

    az login
    
  2. Sorumluyu barındıracak aboneliği seçin. Bu adım, birden çok aboneliğiniz olduğunda gereklidir.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Hizmet sorumlusunu oluşturun. Bu örnekte hizmet sorumlusu olarak adlandırılır my-service-principal.

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. Döndürülen JSON verilerinden, gelecekte kullanmak üzere , passwordve tenant değerini kopyalayınappId.

    {
      "appId": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "tenant": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn"
    }
    

Microsoft Entra uygulamanızı ve hizmet sorumlunuzu oluşturdunuz.

Hedef tablo oluşturma

  1. Sorgu ortamınızdan aşağıdaki komutu kullanarak adlı Storms bir tablo oluşturun:

    .create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)
    
  2. Aşağıdaki komutu kullanarak alınan veriler için ilgili tablo eşlemesini Storms_CSV_Mapping oluşturun:

    .create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'
    
  3. Yapılandırılabilir kuyruğa alınmış alım gecikmesi için tabloda bir alma toplu işlemi ilkesi oluşturun.

    İpucu

    Alma toplu işleme ilkesi bir performans iyileştiricidir ve üç parametre içerir. Karşılanan ilk koşul, Azure Veri Gezgini tablosuna alımı tetikler.

    .alter table Storms policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:15", "MaximumNumberOfItems": 100, "MaximumRawDataSizeMB": 300}'
    
  4. Veritabanıyla çalışma izni vermek için Microsoft Entra hizmet sorumlusu oluşturma bölümünden hizmet sorumlusunu kullanın.

    .add database YOUR_DATABASE_NAME admins  ('aadapp=YOUR_APP_ID;YOUR_TENANT_ID') 'AAD App'
    

Laboratuvarı çalıştırma

Aşağıdaki laboratuvar, veri oluşturmaya başlama, Kafka bağlayıcısını ayarlama ve bu verileri bağlayıcı ile Azure Veri Gezgini akışla aktarma deneyimi sunmak için tasarlanmıştır. Daha sonra alınan verilere bakabilirsiniz.

Git deposunu kopyalama

Laboratuvarın git deposunu kopyalayın.

  1. Makinenizde yerel bir dizin oluşturun.

    mkdir ~/kafka-kusto-hol
    cd ~/kafka-kusto-hol
    
  2. Depoyu kopyalayın.

    cd ~/kafka-kusto-hol
    git clone https://github.com/Azure/azure-kusto-labs
    cd azure-kusto-labs/kafka-integration/dockerized-quickstart
    

Kopyalanan deponun içeriği

Kopyalanan deponun içeriğini listelemek için aşağıdaki komutu çalıştırın:

cd ~/kafka-kusto-hol/azure-kusto-labs/kafka-integration/dockerized-quickstart
tree

Bu aramanın sonucu:

├── README.md
├── adx-query.png
├── adx-sink-config.json
├── connector
│   └── Dockerfile
├── docker-compose.yaml
└── storm-events-producer
    ├── Dockerfile
    ├── StormEvents.csv
    ├── go.mod
    ├── go.sum
    ├── kafka
    │   └── kafka.go
    └── main.go

Kopyalanan depodaki dosyaları gözden geçirme

Aşağıdaki bölümlerde, yukarıdaki dosya ağacındaki dosyaların önemli bölümleri açıklanmaktadır.

adx-sink-config.json

Bu dosya, belirli yapılandırma ayrıntılarını güncelleştireceğiniz Kusto havuz özellikleri dosyasını içerir:

{
    "name": "storm",
    "config": {
        "connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
        "flush.size.bytes": 10000,
        "flush.interval.ms": 10000,
        "tasks.max": 1,
        "topics": "storm-events",
        "kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
        "aad.auth.authority": "<enter tenant ID>",
        "aad.auth.appid": "<enter application ID>",
        "aad.auth.appkey": "<enter client secret>",
        "kusto.ingestion.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
        "kusto.query.url": "https://<name of cluster>.<region>.kusto.windows.net",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter"
    }
}

Azure Veri Gezgini kurulumunuza göre aşağıdaki özniteliklerin değerlerini değiştirin: , , , kusto.tables.topics.mapping (veritabanı adı), kusto.ingestion.urlve kusto.query.url. aad.auth.appkeyaad.auth.appidaad.auth.authority

Bağlayıcı - Dockerfile

Bu dosya, bağlayıcı örneği için docker görüntüsünü oluşturmaya yönelik komutlara sahiptir. Git deposu yayın dizininden bağlayıcı indirmesini içerir.

Storm-events-producer dizini

Bu dizinde yerel bir "StormEvents.csv" dosyasını okuyan ve verileri kafka konusuna yayımlayan bir Go programı vardır.

docker-compose.yaml

version: "2"
services:
  zookeeper:
    image: debezium/zookeeper:1.2
    ports:
      - 2181:2181
  kafka:
    image: debezium/kafka:1.2
    ports:
      - 9092:9092
    links:
      - zookeeper
    depends_on:
      - zookeeper
    environment:
      - ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
  kusto-connect:
    build:
      context: ./connector
      args:
        KUSTO_KAFKA_SINK_VERSION: 1.0.1
    ports:
      - 8083:8083
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=adx
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
  events-producer:
    build:
      context: ./storm-events-producer
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - KAFKA_BOOTSTRAP_SERVER=kafka:9092
      - KAFKA_TOPIC=storm-events
      - SOURCE_FILE=StormEvents.csv

Kapsayıcıları başlatma

  1. Terminalde kapsayıcıları başlatın:

    docker-compose up
    

    Üretici uygulaması konuya olay göndermeye storm-events başlar. Aşağıdaki günlüklere benzer günlükler görmeniz gerekir:

    ....
    events-producer_1  | sent message to partition 0 offset 0
    events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public
    events-producer_1  |
    events-producer_1  | sent message to partition 0 offset 1
    events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer
    ....
    
  2. Günlükleri denetlemek için aşağıdaki komutu ayrı bir terminalde çalıştırın:

    docker-compose logs -f | grep kusto-connect
    

Bağlayıcıyı başlatma

Bağlayıcıyı başlatmak için Kafka Connect REST çağrısı kullanın.

  1. Ayrı bir terminalde aşağıdaki komutla havuz görevini başlatın:

    curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8083/connectors
    
  2. Durumu denetlemek için aşağıdaki komutu ayrı bir terminalde çalıştırın:

    curl http://localhost:8083/connectors/storm/status
    

Bağlayıcı, azure Veri Gezgini alma işlemlerini kuyruğa almaya başlar.

Not

Günlük bağlayıcısı sorunlarınız varsa bir sorun oluşturun.

Verileri sorgulama ve gözden geçirme

Veri alımını onaylama

  1. Verilerin tabloya ulaşmasını Storms bekleyin. Veri aktarımını onaylamak için satır sayısını denetleyin:

    Storms | count
    
  2. Alma işleminde hata olmadığını onaylayın:

    .show ingestion failures
    

    Verileri gördüğünüzde birkaç sorgu deneyin.

Verileri sorgulama

  1. Tüm kayıtları görmek için aşağıdaki sorguyu çalıştırın:

    Storms
    
  2. Belirli verileri filtrelemek için ve project kullanınwhere:

    Storms
    | where EventType == 'Drought' and State == 'TEXAS'
    | project StartTime, EndTime, Source, EventId
    
  3. summarize şu işleci kullanın:

    Storms
    | summarize event_count=count() by State
    | where event_count > 10
    | project State, event_count
    | render columnchart
    

    Azure Veri Gezgini'daki Kafka sorgu sütun grafiği sonuçlarının ekran görüntüsü.

Daha fazla sorgu örneği ve kılavuz için bkz. KQL'de sorgu yazma ve Kusto Sorgu Dili belgeleri.

Sıfırla

Sıfırlamak için aşağıdaki adımları uygulayın:

  1. Kapsayıcıları durdurma (docker-compose down -v)
  2. Sil (drop table Storms)
  3. Tabloyu yeniden oluşturma Storms
  4. Tablo eşlemesini yeniden oluşturma
  5. Kapsayıcıları yeniden başlatma (docker-compose up)

Kaynakları temizleme

Azure Veri Gezgini kaynaklarını silmek için az cluster delete veya az Kusto database delete kullanın:

az kusto cluster delete -n <cluster name> -g <resource group name>
az kusto database delete -n <database name> --cluster-name <cluster name> -g <resource group name>

Kafka Havuz bağlayıcısını ayarlama

Kafka Havuz bağlayıcısını alma toplu işlemi ilkesiyle çalışacak şekilde ayarlayın:

  • Kafka Havuzu flush.size.bytes boyut sınırını 1 MB'tan başlayarak 10 MB veya 100 MB'lık artışlarla ayarlayın.
  • Kafka Havuzu kullanılırken veriler iki kez toplanır. Bağlayıcı tarafındaki veriler temizleme ayarlarına göre ve Azure Veri Gezgini hizmet tarafında toplu işlem ilkesine göre toplanır. Toplu işlem süresi çok kısaysa ve hem bağlayıcı hem de hizmet tarafından veri alınamıyorsa, toplu işlem süresi artırılmalıdır. Toplu işlem boyutunu 1 GB olarak ayarlayın ve gerektiğinde 100 MB artış artırın veya azaltın. Örneğin, boşaltma boyutu 1 MB ve toplu işlem ilkesi boyutu 100 MB ise, Kafka Havuzu bağlayıcısı tarafından 100 MB toplu iş toplandıktan sonra, Azure Veri Gezgini hizmeti tarafından 100 MB toplu iş alınır. Toplu işlem ilkesi süresi 20 saniyeyse ve Kafka Havuzu bağlayıcısı 20 saniyelik bir süre içinde 50 MB'ı temizlerse, hizmet 50 MB'lık bir toplu işlemi alır.
  • Örnekleri ve Kafka bölümlerini ekleyerek ölçeklendirme yapabilirsiniz. Bölüm sayısına kadar artırın tasks.max . Ayarın boyutunda flush.size.bytes bir blob oluşturmak için yeterli veriniz varsa bir bölüm oluşturun. Blob daha küçükse, toplu iş zaman sınırına ulaştığında işlenir, bu nedenle bölüm yeterli aktarım hızı almaz. Çok sayıda bölüm daha fazla işlem yükü anlamına gelir.