Apache Kafka'dan Azure Veri Gezgini'a veri alma

Azure Veri Gezgini Apache Kafka'danveri alımını destekler. Apache Kafka, sistemler veya uygulamalar arasında güvenilir bir şekilde veri taşınan gerçek zamanlı akış veri işlem hatları oluşturmaya yönelik bir dağıtılmış akış platformudur. Kafka Bağlan, Apache Kafka ve diğer veri sistemleri arasında ölçeklenebilir ve güvenilir veri akışı için bir araçtır. Azure Veri Gezgini Kafka Havuzu, Kafka'dan bağlayıcı görevi görür ve kod kullanılmasını gerektirmez. Bu Git deposundan veya Confluent Connector Hub'dan havuz bağlayıcısı jar dosyasını indirin.

Bu makalede, Kafka kümesi ve Kafka bağlayıcı kümesi kurulumunu basitleştirmek için bağımsız docker kurulumu kullanarak Kafka ile Azure Veri Gezgini'ye veri alma işlemi gösterilmektedir.

Daha fazla bilgi için bkz. Bağlayıcı Git deposu ve sürüm özellikleri.

Önkoşullar

Azure Active Directory hizmet sorumlusu oluşturma

Azure Active Directory hizmet sorumlusu, aşağıdaki örnekte olduğu gibi Azure portal veya program aracılığıyla oluşturulabilir.

Bu hizmet sorumlusu, bağlayıcının Azure Veri Gezgini tablosuna yazmak için kullandığı kimlik olacaktır. Daha sonra bu hizmet sorumlusuna Azure Veri Gezgini erişim izinleri vereceğiz.

  1. Azure CLI aracılığıyla Azure aboneliğinizde oturum açın. Ardından tarayıcıda kimlik doğrulaması yapın.

    az login
    
  2. Laboratuvarı çalıştırmak için kullanmak istediğiniz aboneliği seçin. Bu adım, birden çok aboneliğiniz olduğunda gereklidir.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Hizmet sorumlusunu oluşturun. Bu örnekte hizmet sorumlusu olarak adlandırılır kusto-kafka-spn.

    az ad sp create-for-rbac -n "kusto-kafka-spn" --role Contributor --scopes /subscriptions/{SubID}
    
  4. Aşağıda gösterildiği gibi bir JSON yanıtı alırsınız. appIdSonraki adımlarda ihtiyacınız olacak şekilde , passwordve tenant' yi kopyalayın.

    {
      "appId": "fe7280c7-5705-4789-b17f-71a472340429",
      "displayName": "kusto-kafka-spn",
      "name": "http://kusto-kafka-spn",
      "password": "29c719dd-f2b3-46de-b71c-4004fb6116ee",
      "tenant": "42f988bf-86f1-42af-91ab-2d7cd011db42"
    }
    

Azure Veri Gezgini'nde hedef tablo oluşturma

  1. Azure portalda oturum açma

  2. Azure Veri Gezgini kümenize gidin.

  3. Aşağıdaki komutu kullanarak adlı Storms bir tablo oluşturun:

    .create table Storms (StartTime: datetime, EndTime: datetime, EventId: int, State: string, EventType: string, Source: string)
    

    Create a table in Azure Data Explorer portal .

  4. Aşağıdaki komutu kullanarak alınan veriler için ilgili tablo eşlemesini Storms_CSV_Mapping oluşturun:

    .create table Storms ingestion csv mapping 'Storms_CSV_Mapping' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EventId","datatype":"int","Ordinal":2},{"Name":"State","datatype":"string","Ordinal":3},{"Name":"EventType","datatype":"string","Ordinal":4},{"Name":"Source","datatype":"string","Ordinal":5}]'
    
  5. Yapılandırılabilir alım gecikme süresi için tabloda bir toplu alım ilkesi oluşturun.

    İpucu

    Alma toplu işlemi ilkesi bir performans iyileştiricidir ve üç parametre içerir. İlk koşul, Azure Veri Gezgini tablosuna alımı tetikler.

    .alter table Storms policy ingestionbatching @'{"MaximumBatchingTimeSpan":"00:00:15", "MaximumNumberOfItems": 100, "MaximumRawDataSizeMB": 300}'
    
  6. Veritabanıyla çalışma izni vermek için Azure Active Directory hizmet sorumlusu oluşturma bölümünden hizmet sorumlusunu kullanın.

    .add database YOUR_DATABASE_NAME admins  ('aadapp=YOUR_APP_ID;YOUR_TENANT_ID') 'AAD App'
    

Laboratuvarı çalıştırma

Aşağıdaki laboratuvar, veri oluşturmaya başlama, Kafka bağlayıcısını ayarlama ve bu verileri bağlayıcıyla Azure Veri Gezgini akışla aktarma deneyimi sunmak için tasarlanmıştır. Daha sonra alınan verilere bakabilirsiniz.

Git deposunu kopyalama

Laboratuvarın git deposunu kopyalayın.

  1. Makinenizde yerel bir dizin oluşturun.

    mkdir ~/kafka-kusto-hol
    cd ~/kafka-kusto-hol
    
  2. Depoyu kopyalayın.

    cd ~/kafka-kusto-hol
    git clone https://github.com/Azure/azure-kusto-labs
    cd azure-kusto-labs/kafka-integration/dockerized-quickstart
    

Kopyalanan deponun içeriği

Kopyalanan deponun içeriğini listelemek için aşağıdaki komutu çalıştırın:

cd ~/kafka-kusto-hol/azure-kusto-labs/kafka-integration/dockerized-quickstart
tree

Bu aramanın sonucu:

├── README.md
├── adx-query.png
├── adx-sink-config.json
├── connector
│   └── Dockerfile
├── docker-compose.yaml
└── storm-events-producer
    ├── Dockerfile
    ├── StormEvents.csv
    ├── go.mod
    ├── go.sum
    ├── kafka
    │   └── kafka.go
    └── main.go

Kopyalanan depodaki dosyaları gözden geçirme

Aşağıdaki bölümlerde, yukarıdaki dosya ağacındaki dosyaların önemli bölümleri açıklanmaktadır.

adx-sink-config.json

Bu dosya, belirli yapılandırma ayrıntılarını güncelleştireceğiniz Kusto havuz özellikleri dosyasını içerir:

{
    "name": "storm",
    "config": {
        "connector.class": "com.microsoft.azure.kusto.kafka.connect.sink.KustoSinkConnector",
        "flush.size.bytes": 10000,
        "flush.interval.ms": 10000,
        "tasks.max": 1,
        "topics": "storm-events",
        "kusto.tables.topics.mapping": "[{'topic': 'storm-events','db': '<enter database name>', 'table': 'Storms','format': 'csv', 'mapping':'Storms_CSV_Mapping'}]",
        "aad.auth.authority": "<enter tenant ID>",
        "aad.auth.appid": "<enter application ID>",
        "aad.auth.appkey": "<enter client secret>",
        "kusto.ingestion.url": "https://ingest-<name of cluster>.<region>.kusto.windows.net",
        "kusto.query.url": "https://<name of cluster>.<region>.kusto.windows.net",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter"
    }
}

Azure Veri Gezgini kurulumunuza göre aşağıdaki özniteliklerin değerlerini değiştirin: , , , kusto.tables.topics.mapping (veritabanı adı), kusto.ingestion.urlve kusto.query.url. aad.auth.appkeyaad.auth.appidaad.auth.authority

Bağlayıcı - Dockerfile

Bu dosya, bağlayıcı örneği için docker görüntüsünü oluşturmaya yönelik komutlara sahiptir. Git deposu yayın dizininden bağlayıcı indirmesini içerir.

Storm-events-producer dizini

Bu dizinde yerel bir "StormEvents.csv" dosyasını okuyan ve verileri kafka konusuna yayımlayan bir Go programı vardır.

docker-compose.yaml

version: "2"
services:
  zookeeper:
    image: debezium/zookeeper:1.2
    ports:
      - 2181:2181
  kafka:
    image: debezium/kafka:1.2
    ports:
      - 9092:9092
    links:
      - zookeeper
    depends_on:
      - zookeeper
    environment:
      - ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
  kusto-connect:
    build:
      context: ./connector
      args:
        KUSTO_KAFKA_SINK_VERSION: 1.0.1
    ports:
      - 8083:8083
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=adx
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
  events-producer:
    build:
      context: ./storm-events-producer
    links:
      - kafka
    depends_on:
      - kafka
    environment:
      - KAFKA_BOOTSTRAP_SERVER=kafka:9092
      - KAFKA_TOPIC=storm-events
      - SOURCE_FILE=StormEvents.csv

Kapsayıcıları başlatma

  1. Terminalde kapsayıcıları başlatın:

    docker-compose up
    

    Üretici uygulaması konuya olay göndermeye storm-events başlar. Aşağıdaki günlüklere benzer günlükler görmeniz gerekir:

    ....
    events-producer_1  | sent message to partition 0 offset 0
    events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 00:00:00.0000000,13208,NORTH CAROLINA,Thunderstorm Wind,Public
    events-producer_1  | 
    events-producer_1  | sent message to partition 0 offset 1
    events-producer_1  | event  2007-01-01 00:00:00.0000000,2007-01-01 05:00:00.0000000,23358,WISCONSIN,Winter Storm,COOP Observer
    ....
    
  2. Günlükleri denetlemek için aşağıdaki komutu ayrı bir terminalde çalıştırın:

    docker-compose logs -f | grep kusto-connect
    

Bağlayıcıyı başlatma

Bağlayıcıyı başlatmak için Kafka Bağlan REST çağrısı kullanın.

  1. Ayrı bir terminalde havuz görevini aşağıdaki komutla başlatın:

    curl -X POST -H "Content-Type: application/json" --data @adx-sink-config.json http://localhost:8083/connectors
    
  2. Durumu denetlemek için aşağıdaki komutu ayrı bir terminalde çalıştırın:

    curl http://localhost:8083/connectors/storm/status
    

Bağlayıcı, azure Veri Gezgini alma işlemlerini kuyruğa almaya başlar.

Not

Günlük bağlayıcısı sorunlarınız varsa bir sorun oluşturun.

Verileri sorgulama ve gözden geçirme

Veri alımını onaylama

  1. Verilerin tabloya ulaşmasını Storms bekleyin. Veri aktarımını onaylamak için satır sayısını denetleyin:

    Storms | count
    
  2. Alma işleminde hata olmadığını onaylayın:

    .show ingestion failures
    

    Verileri gördüğünüzde birkaç sorgu deneyin.

Verileri sorgulama

  1. Tüm kayıtları görmek için aşağıdaki sorguyu çalıştırın:

    Storms
    
  2. Belirli verileri filtrelemek için ve project kullanınwhere:

    Storms
    | where EventType == 'Drought' and State == 'TEXAS'
    | project StartTime, EndTime, Source, EventId
    
  3. işlecini summarize kullanın:

    Storms
    | summarize event_count=count() by State
    | where event_count > 10
    | project State, event_count
    | render columnchart
    

    Kafka query column chart results in Azure Data Explorer.

Daha fazla sorgu örneği ve kılavuz için bkz. Azure Veri Gezgini için sorgu yazma ve sorgu dili belgeleri Kusto.

Sıfırla

Sıfırlamak için aşağıdaki adımları uygulayın:

  1. Kapsayıcıları durdurma (docker-compose down -v)
  2. Sil (drop table Storms)
  3. Tabloyu yeniden oluşturma Storms
  4. Tablo eşlemesini yeniden oluşturma
  5. Kapsayıcıları yeniden başlatma (docker-compose up)

Kaynakları temizleme

Azure Veri Gezgini kaynaklarını silmek için az cluster delete veya az Kusto database delete kullanın:

az kusto cluster delete -n <cluster name> -g <resource group name>
az kusto database delete -n <database name> --cluster-name <cluster name> -g <resource group name>

Kafka Havuzu bağlayıcısını ayarlama

Kafka Havuzu bağlayıcısını alma toplu işlemi ilkesiyle çalışacak şekilde ayarlayın:

  • Kafka Havuzu flush.size.bytes boyut sınırını 1 MB'tan başlayarak 10 MB veya 100 MB'lık artışlarla ayarlayın.
  • Kafka Havuzu kullanılırken veriler iki kez toplanır. Bağlayıcı tarafındaki veriler temizleme ayarlarına ve Azure Veri Gezgini hizmeti tarafında toplu iş ilkesine göre toplanır. Toplu işlem süresi çok kısaysa ve hem bağlayıcı hem de hizmet tarafından veri alınamıyorsa, toplu işlem süresi artırılmalıdır. Toplu işlem boyutunu 1 GB olarak ayarlayın ve gerektiğinde 100 MB artış artırın veya azaltın. Örneğin, boşaltma boyutu 1 MB ve toplu işlem ilkesi boyutu 100 MB ise, Kafka Havuzu bağlayıcısı tarafından 100 MB toplu iş toplandıktan sonra, Azure Veri Gezgini hizmeti tarafından 100 MB toplu iş alınacaktır. Toplu işlem ilkesi süresi 20 saniyeyse ve Kafka Havuzu bağlayıcısı 20 saniyelik bir süre içinde 50 MB temizleniyorsa, hizmet 50 MB'lık bir toplu iş alır.
  • Örnekleri ve Kafka bölümlerini ekleyerek ölçeklendirme yapabilirsiniz. Bölüm sayısına kadar artırın tasks.max . Ayarın boyutunda flush.size.bytes bir blob oluşturmak için yeterli veriniz varsa bir bölüm oluşturun. Blob daha küçükse, toplu iş zaman sınırına ulaştığında işlenir, bu nedenle bölüm yeterli aktarım hızı almaz. Çok sayıda bölüm daha fazla işlem yükü anlamına gelir.

Sonraki Adımlar