Share via


Erfassen von Daten mit Apache Flink in Azure Data Explorer

Apache Flink ist ein Framework und eine verteilte Verarbeitungs-Engine für zustandsbehaftete Berechnungen über ungebundene und begrenzte Datenströme.

Der Flink-Connector ist ein Open Source-Projekt, das auf jedem Flink-Cluster ausgeführt werden kann. Es implementiert Datensenke zum Verschieben von Daten aus einem Flink-Cluster. Mit dem Connector für Apache Flink können Sie schnelle und skalierbare Anwendungen für datengesteuerte Szenarien erstellen, z. B. Machine Learning (ML), Extract-Transform-Load (ETL) und Log Analytics.

In diesem Artikel erfahren Sie, wie Sie den Flink-Connector verwenden, um Daten von Flink an Ihre Tabelle zu senden. Sie erstellen eine Tabellen- und Datenzuordnung, weisen Flink an, Daten in die Tabelle zu senden, und überprüfen dann die Ergebnisse.

Voraussetzungen

Integrieren Sie für Flink-Projekte, die Maven zum Verwalten von Abhängigkeiten verwenden, die Flink Connector Core Sink For Azure Data Explorer, indem Sie sie als Abhängigkeit hinzufügen:

<dependency>
    <groupId>com.microsoft.azure.kusto</groupId>
    <artifactId>flink-connector-kusto</artifactId>
    <version>1.0.0</version>
</dependency>

Für Projekte, die Maven nicht zum Verwalten von Abhängigkeiten verwenden, klonen Sie das Repository für den Azure Data Explorer Connector für Apache Flink, und erstellen Sie es lokal. Mit diesem Ansatz können Sie den Connector mithilfe des Befehls mvn clean install -DskipTestsmanuell zu Ihrem lokalen Maven-Repository hinzufügen.

Sie können sich über Flink mit einer Microsoft Entra ID-Anwendung oder einer verwalteten Identität authentifizieren.

Dieser Dienstprinzipal ist die Identität, die vom Connector zum Schreiben von Daten in Ihrer Tabelle in Kusto verwendet wird. Später erteilen Sie diesem Dienstprinzipal Berechtigungen für den Zugriff auf Kusto-Ressourcen.

  1. Melden Sie sich über die Azure CLI bei Ihrem Azure-Abonnement an. Führen Sie anschließend im Browser die Authentifizierung durch.

    az login
    
  2. Wählen Sie das Abonnement aus, das den Prinzipal hosten soll. Dieser Schritt ist erforderlich, wenn Sie über mehrere Abonnements verfügen.

    az account set --subscription YOUR_SUBSCRIPTION_GUID
    
  3. Erstellen Sie den Dienstprinzipal. In diesem Beispiel wird der Dienstprinzipal als my-service-principal bezeichnet.

    az ad sp create-for-rbac -n "my-service-principal" --role Contributor --scopes /subscriptions/{SubID}
    
  4. Kopieren Sie aus den zurückgegebenen JSON-Daten die appId, passwordund tenant zur zukünftigen Verwendung.

    {
      "appId": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "displayName": "my-service-principal",
      "name": "my-service-principal",
      "password": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn",
      "tenant": "1234abcd-e5f6-g7h8-i9j0-1234kl5678mn"
    }
    

Sie haben Ihre Microsoft Entra-Anwendung und den Dienstprinzipal erstellt.

  1. Erteilen Sie der Anwendung Benutzerberechtigungen für die Datenbank:

    // Grant database user permissions
    .add database <MyDatabase> users ('aadapp=<Application ID>;<Tenant ID>')
    
  2. Erteilen Sie der Anwendung entweder Ingestor- oder Administratorberechtigungen für die Tabelle. Die erforderlichen Berechtigungen hängen von der ausgewählten Datenschreibmethode ab. Ingestor-Berechtigungen sind für SinkV2 ausreichend, während WriteAndSink Administratorberechtigungen erfordert.

    // Grant table ingestor permissions (SinkV2)
    .add table <MyTable> ingestors ('aadapp=<Application ID>;<Tenant ID>')
    
    // Grant table admin permissions (WriteAheadSink)
    .add table <MyTable> admins ('aadapp=<Application ID>;<Tenant ID>')
    

Weitere Informationen zur Autorisierung finden Sie unter Rollenbasierte Zugriffssteuerung in Kusto.

So schreiben Sie Daten aus Flink:

  1. Importieren Sie die erforderlichen Optionen:

    import com.microsoft.azure.flink.config.KustoConnectionOptions;
    import com.microsoft.azure.flink.config.KustoWriteOptions;
    
  2. Verwenden Sie Ihre Anwendung oder verwaltete Identität, um sich zu authentifizieren.

    Für die Anwendungsauthentifizierung:

    KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
    .setAppId("<Application ID>")
    .setAppKey("<Application key>")
    .setTenantId("<Tenant ID>")
    .setClusterUrl("<Cluster URI>").build();
    

    Für die Authentifizierung mit verwalteter Identität:

    KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
    .setManagedIdentityAppId("<Object ID>")
    .setClusterUrl("<Cluster URI>").build();
    
  1. Konfigurieren Sie die Senkeparameter wie Datenbank und Tabelle:

    KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
        .withDatabase("<Database name>").withTable("<Table name>").build();
    

    Sie können weitere Optionen hinzufügen, wie in der folgenden Tabelle beschrieben:

    Option BESCHREIBUNG Standardwert
    ErfassungMappingRef Verweist auf eine vorhandene Erfassungszuordnung.
    FlushImmediately Löscht Daten sofort und kann Leistungsprobleme verursachen. Diese Methode wird nicht empfohlen.
    BatchIntervalMs Steuert, wie oft Daten geleert werden. 30 Sekunden
    BatchSize Legt die Batchgröße für das Puffern von Datensätzen vor dem Leeren fest. 1.000 Datensätze
    ClientBatchSizeLimit Gibt die Größe aggregierter Daten vor der Erfassung in MB an. 300MB
    PollForIngestionStatus Wenn true, fragt der Connector nach der Erfassung ab, status nach der Datenlöschung. false
    DeliveryGuarantee Bestimmt die Semantik der Übermittlungsgarantie. Verwenden Sie WriteAheadSink, um genau einmal Semantik zu erzielen. AT_LEAST_ONCE
  2. Schreiben sie Streamingdaten mit einer der folgenden Methoden:

    • SinkV2: Dies ist eine zustandslose Option, die Daten auf prüfpunktlöscht, um mindestens einmal Konsistenz zu gewährleisten. Wir empfehlen diese Option für die Datenerfassung mit hohem Volumen.
    • WriteAheadSink: Diese Methode gibt Daten an einen KustoSink aus. Es ist in das Prüfpunktsystem von Flink integriert und bietet genau einmal Garantien. Daten werden in einem AbstractStateBackend gespeichert und erst nach Abschluss eines Prüfpunkts committet.

    Im folgenden Beispiel wird SinkV2 verwendet. Um WriteAheadSink zu verwenden, verwenden Sie die buildWriteAheadSink -Methode anstelle von build:

    KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
        .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
        , 2 /*Parallelism to use*/);
    

Der vollständige Code sollte etwa wie folgt aussehen:

import com.microsoft.azure.flink.config.KustoConnectionOptions;
import com.microsoft.azure.flink.config.KustoWriteOptions;

KustoConnectionOptions kustoConnectionOptions = KustoConnectionOptions.builder()
.setAppId("<Application ID>")
.setAppKey("<Application key>")
.setTenantId("<Tenant ID>")
.setClusterUrl("<Cluster URI>").build();

KustoWriteOptions kustoWriteOptions = KustoWriteOptions.builder()
    .withDatabase("<Database name>").withTable("<Table name>").build();

KustoWriteSink.builder().setWriteOptions(kustoWriteOptions)
    .setConnectionOptions(kustoConnectionOptions).build("<Flink source datastream>" /*Flink source data stream, example messages de-queued from Kafka*/
    , 2 /*Parallelism to use*/);

Überprüfen, ob Daten erfasst werden

Sobald die Verbindung konfiguriert ist, werden Daten an Ihre Tabelle gesendet. Sie können überprüfen, ob die Daten erfasst werden, indem Sie eine KQL-Abfrage ausführen.

  1. Führen Sie die folgende Abfrage aus, um zu überprüfen, ob Daten in der Tabelle erfasst werden:

    <MyTable>
    | count
    
  2. Führen Sie die folgende Abfrage aus, um die Daten anzuzeigen:

    <MyTable>
    | take 100