Ingérer des données à l’aide du Kit de développement logiciel (SDK) Java Kusto

L’Explorateur de données Azure est un service d’exploration de données rapide et hautement évolutive pour les données des journaux et les données de télémétrie. La bibliothèque cliente Java peut être utilisée pour ingérer des données, émettre des commandes de gestion et interroger des données dans des clusters Azure Data Explorer.

Dans cet article, vous allez découvrir comment ingérer des données à l’aide de la bibliothèque Java d’Azure Data Explorer. Tout d’abord, vous allez créer une table et un mappage de données dans un cluster de test. Ensuite, vous effectuerez la mise en file d’attente d’une ingestion du stockage d’objets blob vers le cluster à l’aide du kit SDK Java, et vous validerez les résultats.

Conditions préalables requises

Vérifier le code

Cette section est facultative. Passez en revue les extraits de code suivants pour découvrir comment fonctionne le code. Pour ignorer cette section, accédez à Exécuter l’application.

Authentification

Le programme utilise Microsoft Entra informations d’identification d’authentification avec ConnectionStringBuilder'.

  1. Créez un com.microsoft.azure.kusto.data.Client pour la requête et la gestion.

    static Client getClient() throws Exception {
        ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(endpoint, clientID, clientSecret, tenantID);
        return ClientFactory.createClient(csb);
    }
    
  2. Créez et utilisez un com.microsoft.azure.kusto.ingest.IngestClient pour la mise en file d’attente de l’ingestion des données dans Azure Data Explorer :

    static IngestClient getIngestionClient() throws Exception {
        String ingestionEndpoint = "https://ingest-" + URI.create(endpoint).getHost();
        ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(ingestionEndpoint, clientID, clientSecret);
        return IngestClientFactory.createClient(csb);
    }
    

Commandes de gestion

Les commandes de gestion, telles que .drop et .create, sont exécutées en appelant execute sur un com.microsoft.azure.kusto.data.Client objet.

Par exemple, la table StormEvents est créée comme suit :

static final String createTableCommand = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)";

static void createTable(String database) {
    try {
        getClient().execute(database, createTableCommand);
        System.out.println("Table created");
    } catch (Exception e) {
        System.out.println("Failed to create table: " + e.getMessage());
        return;
    }

}

Ingestion de données

Placez l’ingestion en file d’attente à l’aide d’un fichier provenant d’un conteneur Stockage Blob Azure existant.

  • Utilisez BlobSourceInfo pour spécifier le chemin du Stockage Blob.
  • Utilisez IngestionProperties pour définir la table, la base de données, le nom du mappage et le type de données. Dans l’exemple suivant, le type de données est CSV.
    ...
    static final String blobPathFormat = "https://%s.blob.core.windows.net/%s/%s%s";
    static final String blobStorageAccountName = "kustosamples";
    static final String blobStorageContainer = "samplefiles";
    static final String fileName = "StormEvents.csv";
    static final String blobStorageToken = ""; //If relevant add SAS token
    ....

    static void ingestFile(String database) throws InterruptedException {
        String blobPath = String.format(blobPathFormat, blobStorageAccountName, blobStorageContainer,
                fileName, blobStorageToken);
        BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath);

        IngestionProperties ingestionProperties = new IngestionProperties(database, tableName);
        ingestionProperties.setDataFormat(DATA_FORMAT.csv);
        ingestionProperties.setIngestionMapping(ingestionMappingRefName, IngestionMappingKind.Csv);
        ingestionProperties.setReportLevel(IngestionReportLevel.FailuresAndSuccesses);
        ingestionProperties.setReportMethod(IngestionReportMethod.QueueAndTable);
    ....

Le processus d’ingestion démarre dans un thread distinct, et le thread main attend que le thread d’ingestion se termine. Ce processus utilise CountdownLatch. L’API d’ingestion (IngestClient#ingestFromBlob) n’est pas asynchrone. Une boucle while est utilisée pour interroger l’état actuel tous les cinq secondes, et attend que l’état d’ingestion passe de Pending à un état différent. L’état final peut être Succeeded, Failed ou PartiallySucceeded.

        ....
        CountDownLatch ingestionLatch = new CountDownLatch(1);
        new Thread(new Runnable() {
            @Override
            public void run() {
                IngestionResult result = null;
                try {
                    result = getIngestionClient().ingestFromBlob(blobSourceInfo, ingestionProperties);
                } catch (Exception e) {
                    ingestionLatch.countDown();
                }
                try {
                    IngestionStatus status = result.getIngestionStatusCollection().get(0);
                    while (status.status == OperationStatus.Pending) {
                        Thread.sleep(5000);
                        status = result.getIngestionStatusCollection().get(0);
                    }
                    ingestionLatch.countDown();
                } catch (Exception e) {
                    ingestionLatch.countDown();
                }
            }
        }).start();
        ingestionLatch.await();
    }

Conseil

Il existe d’autres méthodes pour gérer l’ingestion asynchrone pour différentes applications. Par exemple, vous pouvez utiliser CompletableFuture pour créer un pipeline définissant l’action post-ingestion, telle que l’interrogation de la table, ou pour gérer les exceptions qui ont été signalées à IngestionStatus.

Exécution de l'application

Général

Lorsque vous exécutez l’exemple de code, les actions suivantes sont effectuées :

  1. Supprimer une table : la table StormEvents est supprimée (si elle existe).
  2. Création de la table : la table StormEvents est créée.
  3. Création du mappage : le mappage StormEvents_CSV_Mapping est créé.
  4. Ingestion de fichiers : un fichier CSV (dans le Stockage Blob Azure) est mis en file d’attente à des fins d’ingestion.

L’exemple de code suivant provient de App.java :

public static void main(final String[] args) throws Exception {
    dropTable(database);
    createTable(database);
    createMapping(database);
    ingestFile(database);
}

Conseil

Pour tester différentes combinaisons d’opérations, commentez/décommentez les méthodes respectives dans App.java.

Exécution de l'application

  1. Clonez l’exemple de code depuis GitHub :

    git clone https://github.com/Azure-Samples/azure-data-explorer-java-sdk-ingest.git
    cd azure-data-explorer-java-sdk-ingest
    
  2. Définissez les informations du principal de service avec les informations suivantes en tant que variables d’environnement utilisées par le programme :

    • Point de terminaison de cluster
    • Nom de la base de données
    export AZURE_SP_CLIENT_ID="<replace with appID>"
    export AZURE_SP_CLIENT_SECRET="<replace with password>"
    export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net"
    export KUSTO_DB="name of the database"
    
  3. Générez et exécutez :

    mvn clean package
    java -jar target/adx-java-ingest-jar-with-dependencies.jar
    

    La sortie doit ressembler à ceci :

    Table dropped
    Table created
    Mapping created
    Waiting for ingestion to complete...
    

Attendez quelques minutes que le processus d’ingestion se termine. Une fois l’opération terminée, le message de journal suivant s’affiche : Ingestion completed successfully. Vous pouvez quitter le programme à ce stade et passer à l’étape suivante sans affecter le processus d’ingestion, qui a déjà été mis en file d’attente.

Valider

Attendez cinq à dix minutes que l’ingestion en file d’attente planifie le processus d’ingestion et charge les données dans Azure Data Explorer.

  1. Connectez-vous à https://dataexplorer.azure.com et à votre cluster.

  2. Exécutez la commande suivante pour obtenir le nombre d’enregistrements de la table StormEvents :

    StormEvents | count
    

Dépanner

  1. Pour voir les échecs d’ingestion des quatre dernières heures, exécutez la commande suivante sur votre base de données :

    .show ingestion failures
    | where FailedOn > ago(4h) and Database == "<DatabaseName>"
    
  2. Pour voir l’état de toutes les opérations d’ingestion des quatre dernières heures, exécutez la commande suivante :

    .show operations
    | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
    | summarize arg_max(LastUpdatedOn, *) by OperationId
    

Nettoyer les ressources

Si vous ne prévoyez pas d’utiliser les ressources que vous avez créées, exécutez la commande suivante dans votre base de données pour supprimer la table StormEvents.

.drop table StormEvents