Bagikan melalui


Menggunakan Skema JSON dengan aplikasi Apache Kafka

Tutorial ini memancarkan Anda melalui skenario di mana Anda menggunakan Skema JSON untuk menserialisasikan dan mendeserialisasi peristiwa menggunakan Azure Schema Registry di Azure Event Hubs.

Dalam kasus penggunaan ini, aplikasi produsen Kafka menggunakan skema JSON yang disimpan di Azure Schema Registry untuk, menserialisasikan peristiwa dan menerbitkannya ke topik/hub peristiwa Kafka di Azure Event Hubs. Konsumen Kafka mendeserialisasi peristiwa yang dikonsumsinya dari Azure Event Hubs. Untuk itu menggunakan ID skema peristiwa dan skema JSON, yang disimpan di Azure Schema Registry. Diagram memperlihatkan serialisasi/de-serialisasi skema untuk aplikasi Kafka menggunakan skema JSON.

Prasyarat

Jika Anda baru menggunakan Azure Event Hubs, lihat Ringkasan Event Hubs sebelum Anda melakukan mulai cepat ini.

Untuk menyelesaikan mulai cepat ini, Anda memerlukan prasyarat berikut:

Membuat pusat aktivitas

Ikuti instruksi dari mulai cepat: Membuat namespace layanan Azure Event Hubs dan hub peristiwa untuk membuat namespace layanan Azure Event Hubs dan pusat aktivitas. Kemudian, ikuti instruksi dari Dapatkan string koneksi untuk mendapatkan string koneksi ke namespace Layanan Pusat Aktivitas Anda.

Catat pengaturan berikut yang Anda gunakan dalam mulai cepat saat ini:

  • String koneksi untuk namespace Azure Event Hubs
  • Nama hub peristiwa

Buat grup skema

Ikuti instruksi dari Membuat skema menggunakan Schema Registry untuk membuat grup skema dan skema.

  1. Buat grup skema bernama contoso-sg menggunakan portal Schema Registry. Gunakan Skema JSON sebagai jenis serialisasi.

  2. Dalam grup skema tersebut, buat skema JSON baru dengan nama skema: Microsoft.Azure.Data.SchemaRegistry.example.CustomerInvoice menggunakan konten skema berikut.

    {
      "$id": "https://example.com/person.schema.json",
      "$schema": "https://json-schema.org/draft/2020-12/schema",
      "title": "CustomerInvoice",
      "type": "object",
      "properties": {
        "invoiceId": {
          "type": "string"
        },
        "merchantId": {
          "type": "string"
        },
        "transactionValueUsd": {
          "type": "integer"
        },
        "userId": {
          "type": "string"
        }
      }
    } 
    

Mendaftarkan aplikasi untuk mengakses registri skema

Anda dapat menggunakan ID Microsoft Entra untuk mengotorisasi aplikasi produsen dan konsumen Kafka Anda untuk mengakses sumber daya Azure Schema Registry. Untuk mengaktifkannya, Anda perlu mendaftarkan aplikasi klien Anda dengan penyewa Microsoft Entra dari portal Azure.

Untuk mendaftarkan aplikasi Microsoft Entra bernama example-app lihat Mendaftarkan aplikasi Anda dengan penyewa Microsoft Entra.

  • tenant.id - mengatur ID penyewa aplikasi
  • client.id - mengatur ID klien aplikasi
  • client.secret - mengatur rahasia klien untuk autentikasi

Dan jika Anda menggunakan identitas terkelola, Anda akan memerlukan:

  • use.managed.identity.credential - menunjukkan bahwa kredensial MSI harus digunakan, harus digunakan untuk VM yang mendukung MSI
  • managed.identity.clientId - jika ditentukan, ia membangun kredensial MSI dengan ID klien yang diberikan managed.identity.resourceId - jika ditentukan, ia membangun kredensial MSI dengan ID sumber daya tertentu

Menambahkan pengguna ke peran Pembaca Registri Skema

Tambahkan akun pengguna Anda ke peran Pembaca Registri Skema di tingkat namespace. Anda juga dapat menggunakan peran Kontributor Registri Skema, tetapi itu tidak diperlukan untuk mulai cepat ini.

  1. Pada halaman Namespace Layanan Pusat Aktivitas, pilih Kontrol akses (IAM) di menu sebelah kiri.
  2. Pada halaman Kontrol akses (IAM), pilih + Tambahkan ->Tambahkan penetapan peran pada menu.
  3. Pada halaman Jenis penugasan, pilih Berikutnya.
  4. Pada halaman Peran, pilih Pembaca Registri Skema, lalu pilih Berikutnya di bagian bawah halaman.
  5. Gunakan tautan + Pilih anggota untuk menambahkan example-app aplikasi yang Anda buat di langkah sebelumnya ke peran, lalu pilih Berikutnya.
  6. Pada halaman Tinjau + tetapkan , pilih Tinjau + tetapkan.

Memperbarui konfigurasi aplikasi klien aplikasi Kafka

Anda perlu memperbarui konfigurasi klien aplikasi produsen dan konsumen Kafka dengan detail aplikasi Microsoft Entra dan dengan informasi registri skema.

Untuk memperbarui konfigurasi Produsen Kafka, navigasikan ke azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-producer.

  1. Perbarui konfigurasi aplikasi Kafka di src/main/resources/app.properties dengan mengikuti panduan Mulai Cepat Kafka untuk Azure Event Hubs.

  2. Perbarui detail konfigurasi untuk produsen di src/main/resources/app.properties menggunakan konfigurasi terkait registri skema dan aplikasi Microsoft Entra yang Anda buat di langkah sebelumnya sebagai berikut:

    schema.group=contoso-sg
    schema.registry.url=https://<NAMESPACENAME>.servicebus.windows.net
    
     tenant.id=<>
     client.id=<>
     client.secret=<>
    
  3. Ikuti instruksi yang sama dan perbarui konfigurasi azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-consumer .

  4. Untuk aplikasi produsen dan konsumen Kafka, skema JSON berikut digunakan:

    {
      "$id": "https://example.com/person.schema.json",
      "$schema": "https://json-schema.org/draft/2020-12/schema",
      "title": "CustomerInvoice",
      "type": "object",
      "properties": {
        "invoiceId": {
          "type": "string"
        },
        "merchantId": {
          "type": "string"
        },
        "transactionValueUsd": {
          "type": "integer"
        },
        "userId": {
          "type": "string"
        }
      }
    }
    

Menggunakan produsen Kafka dengan validasi skema JSON

Untuk menjalankan aplikasi produsen Kafka, navigasikan ke azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-producer.

  1. Anda dapat menjalankan aplikasi produsen sehingga dapat menghasilkan rekaman khusus Skema JSON atau rekaman generik. Untuk mode rekaman tertentu, Anda harus terlebih dahulu menghasilkan kelas terhadap skema produsen menggunakan perintah maven berikut:

    mvn generate-sources
    
  2. Kemudian Anda dapat menjalankan aplikasi produsen menggunakan perintah berikut.

    mvn clean package
    mvn -e clean compile exec:java -Dexec.mainClass="com.azure.schemaregistry.samples.producer.App"
    
  3. Setelah keberhasilan eksekusi aplikasi produsen, aplikasi ini meminta Anda untuk memilih skenario produsen. Untuk mulai cepat ini, Anda dapat memilih opsi 1 - menghasilkan SpecificRecords.

    Enter case number:
    1 - produce SpecificRecords
    
  4. Setelah serialisasi dan penerbitan data berhasil, Anda akan melihat log konsol berikut di aplikasi produsen Anda:

    INFO com.azure.schemaregistry.samples.producer.KafkaJsonSpecificRecord - Sent Order Invoice 0
    INFO com.azure.schemaregistry.samples.producer.KafkaJsonSpecificRecord - Sent Order Invoice 1
    INFO com.azure.schemaregistry.samples.producer.KafkaJsonSpecificRecord - Sent Order Invoice 2
    

Menggunakan konsumen Kafka dengan validasi skema JSON

Untuk menjalankan aplikasi konsumen Kafka, navigasikan ke azure-schema-registry-for-kafka/tree/master/java/json/samples/kafka-consumer.

  1. Anda dapat menjalankan aplikasi konsumen sehingga dapat menggunakan rekaman khusus Skema JSON atau rekaman generik. Untuk mode rekaman tertentu, Anda harus terlebih dahulu menghasilkan kelas terhadap skema produsen menggunakan perintah maven berikut:

    mvn generate-sources
    
  2. Kemudian Anda dapat menjalankan aplikasi konsumen menggunakan perintah berikut.

    mvn clean package
    mvn -e clean compile exec:java -Dexec.mainClass="com.azure.schemaregistry.samples.consumer.App"
    
  3. Setelah keberhasilan eksekusi aplikasi konsumen, aplikasi ini meminta Anda untuk memilih skenario produsen. Untuk mulai cepat ini, Anda dapat memilih opsi 1 - menggunakan SpecificRecords.

    Enter case number:
    1 - consume SpecificRecords
    
  4. Setelah konsumsi dan deserialisasi data berhasil, Anda akan melihat log konsol berikut di aplikasi produsen Anda:

    INFO com.azure.schemaregistry.samples.consumer.KafkaJsonSpecificRecord - Invoice received: {invoiceId=Invoice 0, merchantId=Merchant Id 0, transactionValueUsd=0, userId=User Id 0}
    INFO com.azure.schemaregistry.samples.consumer.KafkaJsonSpecificRecord - Invoice received: {invoiceId=Invoice 1, merchantId=Merchant Id 1, transactionValueUsd=1, userId=User Id 1}
    INFO com.azure.schemaregistry.samples.consumer.KafkaJsonSpecificRecord - Invoice received: {invoiceId=Invoice 2, merchantId=Merchant Id 2, transactionValueUsd=2, userId=User Id 2}
    
    

Membersihkan sumber daya

Hapus namespace layanan Azure Event Hubs atau hapus grup sumber daya yang berisi namespace layanan.