Erfassen von Daten mit dem Kusto Java SDK

Azure-Daten-Explorer ist ein schneller und hochgradig skalierbarer Dienst zur Untersuchung von Daten (Protokoll- und Telemetriedaten). Die Java-Clientbibliothek kann verwendet werden, um Daten zu erfassen, Befehle zur Problemverwaltung zu verwenden und Daten in Azure Data Explorer-Clustern abzufragen.

In diesem Artikel erfahren Sie, wie Sie Daten mithilfe der Azure Data Explorer-Java-Bibliothek erfassen. Sie erstellen zunächst eine Tabelle und eine Datenzuordnung in einem Testcluster. Anschließend reihen Sie eine Erfassung aus Blobspeicher für den Cluster mithilfe des Java SDK in die Warteschlange ein und überprüfen die Ergebnisse.

Voraussetzungen

Überprüfen des Codes

Dieser Abschnitt ist optional. Anhand der folgenden Codeausschnitte können Sie sich mit der Funktionsweise des Codes vertraut machen. Wenn Sie Abschnitt überspringen möchten, gehen Sie zu Ausführen der Anwendung.

Authentifizierung

Das Programm verwendet Microsoft Entra Anmeldeinformationen für die Authentifizierung mit ConnectionStringBuilder'.

  1. Erstellen Sie com.microsoft.azure.kusto.data.Client für Abfrage und Verwaltung.

    static Client getClient() throws Exception {
        ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(endpoint, clientID, clientSecret, tenantID);
        return ClientFactory.createClient(csb);
    }
    
  2. Erstellen und verwenden Sie ein Element vom Typ com.microsoft.azure.kusto.ingest.IngestClient, um die Datenerfassung in Azure Data Explorer in eine Warteschlange einzureihen:

    static IngestClient getIngestionClient() throws Exception {
        String ingestionEndpoint = "https://ingest-" + URI.create(endpoint).getHost();
        ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(ingestionEndpoint, clientID, clientSecret);
        return IngestClientFactory.createClient(csb);
    }
    

Befehle für Verwaltung

Verwaltungsbefehle wie .drop und .createwerden ausgeführt, indem für ein com.microsoft.azure.kusto.data.Client -Objekt aufgerufen execute wird.

Die Tabelle StormEvents wird beispielsweise wie folgt erstellt:

static final String createTableCommand = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)";

static void createTable(String database) {
    try {
        getClient().execute(database, createTableCommand);
        System.out.println("Table created");
    } catch (Exception e) {
        System.out.println("Failed to create table: " + e.getMessage());
        return;
    }

}

Datenerfassung

Reihen Sie eine Erfassung mithilfe einer Datei aus einem vorhandenen Azure Blob Storage-Container in eine Warteschlange ein.

  • Verwenden Sie BlobSourceInfo zum Angeben des Blob Storage-Pfads.
  • Verwenden Sie IngestionProperties zum Definieren der Tabelle, der Datenbank, des Zuordnungsnamens und des Datentyps. Im folgenden Beispiel lautet der Datentyp CSV.
    ...
    static final String blobPathFormat = "https://%s.blob.core.windows.net/%s/%s%s";
    static final String blobStorageAccountName = "kustosamples";
    static final String blobStorageContainer = "samplefiles";
    static final String fileName = "StormEvents.csv";
    static final String blobStorageToken = ""; //If relevant add SAS token
    ....

    static void ingestFile(String database) throws InterruptedException {
        String blobPath = String.format(blobPathFormat, blobStorageAccountName, blobStorageContainer,
                fileName, blobStorageToken);
        BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath);

        IngestionProperties ingestionProperties = new IngestionProperties(database, tableName);
        ingestionProperties.setDataFormat(DATA_FORMAT.csv);
        ingestionProperties.setIngestionMapping(ingestionMappingRefName, IngestionMappingKind.Csv);
        ingestionProperties.setReportLevel(IngestionReportLevel.FailuresAndSuccesses);
        ingestionProperties.setReportMethod(IngestionReportMethod.QueueAndTable);
    ....

Der Erfassungsprozess startet in einem separaten Thread, und der main-Thread wartet auf den Abschluss des Erfassungsthreads. Bei diesem Prozess wird CountdownLatch verwendet. Die Erfassungs-API (IngestClient#ingestFromBlob) ist nicht asynchron. Mit einer while-Schleife wird alle fünf Sekunden der aktuelle Status abgefragt, und es wird gewartet, bis der Erfassungsstatus Pending in einen anderen Status geändert wird. Der endgültige Status kann Succeeded, Failed oder PartiallySucceeded lauten.

        ....
        CountDownLatch ingestionLatch = new CountDownLatch(1);
        new Thread(new Runnable() {
            @Override
            public void run() {
                IngestionResult result = null;
                try {
                    result = getIngestionClient().ingestFromBlob(blobSourceInfo, ingestionProperties);
                } catch (Exception e) {
                    ingestionLatch.countDown();
                }
                try {
                    IngestionStatus status = result.getIngestionStatusCollection().get(0);
                    while (status.status == OperationStatus.Pending) {
                        Thread.sleep(5000);
                        status = result.getIngestionStatusCollection().get(0);
                    }
                    ingestionLatch.countDown();
                } catch (Exception e) {
                    ingestionLatch.countDown();
                }
            }
        }).start();
        ingestionLatch.await();
    }

Tipp

Es gibt noch weitere Methoden, um die Erfassung für verschiedene Anwendungen asynchron zu verarbeiten. Beispielsweise können Sie CompletableFuture verwenden, um eine Pipeline zu erstellen, die die Aktion nach der Erfassung definiert, etwa das Abfragen der Tabelle oder das Behandeln von Ausnahmen, die an IngestionStatus gemeldet wurden.

Ausführen der Anwendung

Allgemein

Wenn Sie den Beispielcode ausführen, werden folgende Aktionen ausgeführt:

  1. Tabellenlöschung: Die Tabelle StormEvents wird gelöscht (sofern vorhanden).
  2. Tabellenerstellung: Die Tabelle StormEvents wird erstellt.
  3. Zuordnungserstellung: Die Zuordnung StormEvents_CSV_Mapping wird erstellt.
  4. Dateierfassung: Eine CSV-Datei (in Azure Blob Storage) wird zur Erfassung in eine Warteschlange eingereiht.

Der folgende Beispielcode stammt aus App.java:

public static void main(final String[] args) throws Exception {
    dropTable(database);
    createTable(database);
    createMapping(database);
    ingestFile(database);
}

Tipp

Sie können verschiedene Vorgangskombinationen ausprobieren, indem Sie die entsprechenden Methoden in App.java auskommentieren bzw. kommentieren.

Ausführen der Anwendung

  1. Klonen Sie den Beispielcode von GitHub:

    git clone https://github.com/Azure-Samples/azure-data-explorer-java-sdk-ingest.git
    cd azure-data-explorer-java-sdk-ingest
    
  2. Legen Sie die Dienstprinzipalinformationen mit den folgenden Informationen als Umgebungsvariablen fest, die vom Programm verwendet werden:

    • Clusterendpunkt
    • Datenbankname
    export AZURE_SP_CLIENT_ID="<replace with appID>"
    export AZURE_SP_CLIENT_SECRET="<replace with password>"
    export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net"
    export KUSTO_DB="name of the database"
    
  3. Erstellen und führen Sie Folgendes aus:

    mvn clean package
    java -jar target/adx-java-ingest-jar-with-dependencies.jar
    

    Die Ausgabe ähnelt der folgenden:

    Table dropped
    Table created
    Mapping created
    Waiting for ingestion to complete...
    

Warten Sie einige Minuten, bis die Erfassung abgeschlossen ist. Bei erfolgreicher Ausführung wird die folgende Protokollmeldung angezeigt: Ingestion completed successfully. Sie können das Programm an dieser Stelle beenden und mit dem nächsten Schritt fortfahren, ohne den Erfassungsprozess zu beeinträchtigen, der bereits in die Warteschlange eingereiht wurde.

Überprüfen

Warten Sie fünf bis zehn Minuten, bis die in der Warteschlange befindliche Erfassung geplant wurde und die Daten in Azure Data Explorer geladen wurden.

  1. Melden Sie sich bei https://dataexplorer.azure.com an, und stellen Sie eine Verbindung mit Ihrem Cluster her.

  2. Führen Sie den folgenden Befehl aus, um die Anzahl von Datensätzen in der Tabelle StormEvents zu erhalten:

    StormEvents | count
    

Problembehandlung

  1. Führen Sie den folgenden Befehl in Ihrer Datenbank aus, um festzustellen, ob in den letzten vier Stunden Erfassungsfehler aufgetreten sind:

    .show ingestion failures
    | where FailedOn > ago(4h) and Database == "<DatabaseName>"
    
  2. Führen Sie den folgenden Befehl aus, um den Status aller Erfassungsvorgänge in den letzten vier Stunden anzuzeigen:

    .show operations
    | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
    | summarize arg_max(LastUpdatedOn, *) by OperationId
    

Bereinigen von Ressourcen

Wenn Sie die erstellten Ressourcen nicht verwenden möchten, führen Sie in Ihrer Datenbank den folgenden Befehl aus, um die Tabelle StormEvents zu löschen:

.drop table StormEvents