Share via


Adatok betöltése a Kusto Java SDK használatával

Az Azure Adatkezelő egy gyors és hatékonyan skálázható adatáttekintési szolgáltatás napló- és telemetriaadatokhoz. A Java-ügyfélkódtár használható adatok betöltésére, problémakezelési parancsokra és adatok lekérdezésére az Azure Data Explorer-fürtökben.

Ebből a cikkből megtudhatja, hogyan betölthet adatokat az Azure Data Explorer Java-kódtár használatával. Először egy táblát és egy adatleképezést fog létrehozni egy tesztfürtben. Ezután a Java SDK használatával várólistára állítja a blobtárolóból a fürtbe történő betöltést, és ellenőrzi az eredményeket.

Előfeltételek

A kód áttekintése

Ez a szakasz nem kötelező. A kód működésének megismeréséhez tekintse át az alábbi kódrészleteket. A szakasz kihagyásához nyissa meg az alkalmazást.

Hitelesítés

A program Microsoft Entra hitelesítési hitelesítő adatokat használ a ConnectionStringBuilder használatával."

  1. Hozzon létre egy lekérdezést com.microsoft.azure.kusto.data.Client és felügyeletet.

    static Client getClient() throws Exception {
        ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(endpoint, clientID, clientSecret, tenantID);
        return ClientFactory.createClient(csb);
    }
    
  2. Hozzon létre és használjon egy com.microsoft.azure.kusto.ingest.IngestClient elemet az adatbetöltés várólistára helyezéséhez az Azure Data Explorer:

    static IngestClient getIngestionClient() throws Exception {
        String ingestionEndpoint = "https://ingest-" + URI.create(endpoint).getHost();
        ConnectionStringBuilder csb = ConnectionStringBuilder.createWithAadApplicationCredentials(ingestionEndpoint, clientID, clientSecret);
        return IngestClientFactory.createClient(csb);
    }
    

Felügyeleti parancsok

A felügyeleti parancsok ( például .drop és .create) egy objektum meghívásával executecom.microsoft.azure.kusto.data.Client lesznek végrehajtva.

A tábla például a StormEvents következőképpen jön létre:

static final String createTableCommand = ".create table StormEvents (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)";

static void createTable(String database) {
    try {
        getClient().execute(database, createTableCommand);
        System.out.println("Table created");
    } catch (Exception e) {
        System.out.println("Failed to create table: " + e.getMessage());
        return;
    }

}

Adatfeldolgozás

Várólista-betöltés meglévő Azure Blob Storage tárolóból származó fájl használatával.

  • A Blob Storage elérési útjának megadására használható BlobSourceInfo .
  • A tábla, az adatbázis, a leképezés neve és az adattípus definiálására használható IngestionProperties . Az alábbi példában az adattípus a következő CSV: .
    ...
    static final String blobPathFormat = "https://%s.blob.core.windows.net/%s/%s%s";
    static final String blobStorageAccountName = "kustosamples";
    static final String blobStorageContainer = "samplefiles";
    static final String fileName = "StormEvents.csv";
    static final String blobStorageToken = ""; //If relevant add SAS token
    ....

    static void ingestFile(String database) throws InterruptedException {
        String blobPath = String.format(blobPathFormat, blobStorageAccountName, blobStorageContainer,
                fileName, blobStorageToken);
        BlobSourceInfo blobSourceInfo = new BlobSourceInfo(blobPath);

        IngestionProperties ingestionProperties = new IngestionProperties(database, tableName);
        ingestionProperties.setDataFormat(DATA_FORMAT.csv);
        ingestionProperties.setIngestionMapping(ingestionMappingRefName, IngestionMappingKind.Csv);
        ingestionProperties.setReportLevel(IngestionReportLevel.FailuresAndSuccesses);
        ingestionProperties.setReportMethod(IngestionReportMethod.QueueAndTable);
    ....

A betöltési folyamat egy külön szálban kezdődik, és a main szál megvárja, amíg a betöltési szál befejeződik. Ez a folyamat a CountdownLatch függvényt használja. A betöltési API (IngestClient#ingestFromBlob) nem aszinkron. A while rendszer 5 másodpercenként lekérdezi az aktuális állapotot, és megvárja, amíg a betöltési állapot egy másik állapotra változik Pending . A végső állapot lehet Succeeded, Failedvagy PartiallySucceeded.

        ....
        CountDownLatch ingestionLatch = new CountDownLatch(1);
        new Thread(new Runnable() {
            @Override
            public void run() {
                IngestionResult result = null;
                try {
                    result = getIngestionClient().ingestFromBlob(blobSourceInfo, ingestionProperties);
                } catch (Exception e) {
                    ingestionLatch.countDown();
                }
                try {
                    IngestionStatus status = result.getIngestionStatusCollection().get(0);
                    while (status.status == OperationStatus.Pending) {
                        Thread.sleep(5000);
                        status = result.getIngestionStatusCollection().get(0);
                    }
                    ingestionLatch.countDown();
                } catch (Exception e) {
                    ingestionLatch.countDown();
                }
            }
        }).start();
        ingestionLatch.await();
    }

Tipp

A különböző alkalmazások aszinkron módon történő betöltésének kezelésére más módszerek is léteznek. Például létrehozhat egy folyamatot, CompletableFuture amely meghatározza a betöltés utáni műveletet, például lekérdezheti a táblát, vagy kezelheti a IngestionStatusjelentésben szereplő kivételeket.

Az alkalmazás futtatása

Általános kérdések

A mintakód futtatásakor a következő műveletek lesznek végrehajtva:

  1. Tábla elvetése: StormEvents a tábla el lesz vetve (ha létezik).
  2. Tábla létrehozása: StormEvents a tábla létrejön.
  3. Leképezés létrehozása: StormEvents_CSV_Mapping a leképezés létrejön.
  4. Fájlbetöltés: Egy CSV-fájl (Azure Blob Storage) várólistára kerül a betöltéshez.

A következő mintakód a következőből származik App.java:

public static void main(final String[] args) throws Exception {
    dropTable(database);
    createTable(database);
    createMapping(database);
    ingestFile(database);
}

Tipp

A műveletek különböző kombinációinak kipróbálásához bontsa ki a megfelelő metódusokat a következőben App.java: .

Az alkalmazás futtatása

  1. Klónozza a mintakódot a GitHubról:

    git clone https://github.com/Azure-Samples/azure-data-explorer-java-sdk-ingest.git
    cd azure-data-explorer-java-sdk-ingest
    
  2. Állítsa be a szolgáltatásnév adatait a következő információkkal a program által használt környezeti változókként:

    • Fürtvégpont
    • Adatbázis neve
    export AZURE_SP_CLIENT_ID="<replace with appID>"
    export AZURE_SP_CLIENT_SECRET="<replace with password>"
    export KUSTO_ENDPOINT="https://<cluster name>.<azure region>.kusto.windows.net"
    export KUSTO_DB="name of the database"
    
  3. Buildelés és futtatás:

    mvn clean package
    java -jar target/adx-java-ingest-jar-with-dependencies.jar
    

    A kimenet a következőhöz hasonló lesz:

    Table dropped
    Table created
    Mapping created
    Waiting for ingestion to complete...
    

Várjon néhány percet, amíg a betöltési folyamat befejeződik. A sikeres befejezés után a következő naplóüzenet jelenik meg: Ingestion completed successfully. Ezen a ponton kiléphet a programból, és továbbléphet a következő lépésre anélkül, hogy ez hatással lenne a már várólistára helyezett betöltési folyamatra.

Érvényesítés

Várjon 5–10 percet, amíg az üzenetsorba helyezett betöltés ütemezi a betöltési folyamatot, és betölti az adatokat az Azure Data Explorer.

  1. Jelentkezzen be a https://dataexplorer.azure.com oldalon, és csatlakozzon a fürthöz.

  2. Futtassa a következő parancsot a táblában lévő StormEvents rekordok számának lekéréséhez:

    StormEvents | count
    

Hibaelhárítás

  1. Az elmúlt négy órában előforduló betöltési hibák megtekintéséhez futtassa a következő parancsot az adatbázisban:

    .show ingestion failures
    | where FailedOn > ago(4h) and Database == "<DatabaseName>"
    
  2. Az elmúlt négy órában végrehajtott összes betöltési művelet állapotának megtekintéséhez futtassa a következő parancsot:

    .show operations
    | where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
    | summarize arg_max(LastUpdatedOn, *) by OperationId
    

Az erőforrások eltávolítása

Ha nem tervezi használni a létrehozott erőforrásokat, futtassa a következő parancsot az adatbázisban a StormEvents tábla elvetéséhez.

.drop table StormEvents