Adatok betöltése a Kusto .NET SDK használatával

A .NET-hez két ügyfélkódtár érhető el: egy betöltési és egy adattár. A .NET SDK-val kapcsolatos további információkért lásd: .NET SDK. Ezekkel a kódtárakkal adatokat tölthet be egy fürtbe, illetve adatokat kérdezhet le a kódból. Ebben a cikkben először egy táblát és egy adatleképezést hoz létre egy tesztfürtben. Ezután várólistára kell tennie egy betöltési műveletet a fürtnek, és ellenőriznie kell az eredményeket.

Előfeltételek

A betöltési kódtár telepítése

Install-Package Microsoft.Azure.Kusto.Ingest

Hitelesítés hozzáadása és kapcsolati karakterlánc létrehozása

Hitelesítés

Egy alkalmazás hitelesítéséhez az SDK a Microsoft Entra bérlőazonosítóját használja. A bérlőazonosító megkereséséhez használja a következő URL-címet úgy, hogy a YourDomain kifejezés helyére a saját tartományát írja be.

https://login.microsoftonline.com/<YourDomain>/.well-known/openid-configuration/

Ha például a tartomány a contoso.com, az URL-cím a következő: https://login.microsoftonline.com/contoso.com/.well-known/openid-configuration/. Kattintson erre az URL-címre az eredmények megtekintéséhez; az első sor a következő.

"authorization_endpoint":"https://login.microsoftonline.com/6babcaad-604b-40ac-a9d7-9fd97c0b779f/oauth2/authorize"

A bérlőazonosító ebben az esetben a következő: 6babcaad-604b-40ac-a9d7-9fd97c0b779f.

Ez a példa interaktív Microsoft Entra felhasználóhitelesítést használ a fürt eléréséhez. Az Microsoft Entra alkalmazáshitelesítést tanúsítványsal vagy titkos alkalmazáskóddal is használhatja. A kód futtatása előtt mindenképpen állítsa be a és clusterUri a megfelelő értékekettenantId.

Az SDK kényelmes módot biztosít a hitelesítési módszer beállítására a kapcsolati karakterlánc részeként. A kapcsolati sztringek teljes dokumentációját lásd: Kapcsolati sztringek.

Megjegyzés

Az SDK jelenlegi verziója nem támogatja az interaktív felhasználói hitelesítést a .NET Core-on. Ha szükséges, használjon inkább Microsoft Entra felhasználónevet/jelszót vagy alkalmazáshitelesítést.

A kapcsolati karakterlánc létrehozása

Most már létrehozhatja a kapcsolati karakterlánc. A céltáblát és a leképezést egy későbbi lépésben fogja létrehozni.

var kustoUri = "https://<ClusterName>.<Region>.kusto.windows.net/";
var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
var kustoConnectionStringBuilder = new KustoConnectionStringBuilder(kustoUri).WithAadUserPromptAuthentication(tenantId);

A forrásfájl adatainak beállítása

Állítsa be a forrásfájl elérési útját. Ez a példa egy Azure Blob Storage-ban üzemeltetett mintafájlt használ. A StormEvents mintaadatkészlet a National Centers for Environment Information időjárással kapcsolatos adatait tartalmazza.

var blobPath = "https://kustosamples.blob.core.windows.net/samplefiles/StormEvents.csv";

Tábla létrehozása a tesztfürtön

Hozzon létre egy nevű táblát StormEvents , amely megfelel a fájlban lévő adatok sémájának StormEvents.csv .

Tipp

Az alábbi kódrészletek szinte minden híváshoz létrehoznak egy ügyfélpéldányt. Ez azért történik, hogy az egyes kódrészletek egyenként futtathatók legyenek. Éles környezetben az ügyfélpéldányok újraküldésre kerülnek, és a szükséges ideig meg kell tartani őket. URI-nként egyetlen ügyfélpéldány elegendő, még akkor is, ha több adatbázissal dolgozik (az adatbázis parancsszinten adható meg).

var databaseName = "<DatabaseName>";
var tableName = "StormEvents";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableCreateCommand(
        tableName,
        new[]
        {
            Tuple.Create("StartTime", "System.DateTime"),
            Tuple.Create("EndTime", "System.DateTime"),
            Tuple.Create("EpisodeId", "System.Int32"),
            Tuple.Create("EventId", "System.Int32"),
            Tuple.Create("State", "System.String"),
            Tuple.Create("EventType", "System.String"),
            Tuple.Create("InjuriesDirect", "System.Int32"),
            Tuple.Create("InjuriesIndirect", "System.Int32"),
            Tuple.Create("DeathsDirect", "System.Int32"),
            Tuple.Create("DeathsIndirect", "System.Int32"),
            Tuple.Create("DamageProperty", "System.Int32"),
            Tuple.Create("DamageCrops", "System.Int32"),
            Tuple.Create("Source", "System.String"),
            Tuple.Create("BeginLocation", "System.String"),
            Tuple.Create("EndLocation", "System.String"),
            Tuple.Create("BeginLat", "System.Double"),
            Tuple.Create("BeginLon", "System.Double"),
            Tuple.Create("EndLat", "System.Double"),
            Tuple.Create("EndLon", "System.Double"),
            Tuple.Create("EpisodeNarrative", "System.String"),
            Tuple.Create("EventNarrative", "System.String"),
            Tuple.Create("StormSummary", "System.Object"),
        }
    );
    await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}

Adatbetöltési leképezés meghatározása

Képezte le a bejövő CSV-adatokat a tábla létrehozásakor használt oszlopnevekre. Hozzon létre egy CSV-oszlopleképezési objektumot a táblán.

var tableMappingName = "StormEvents_CSV_Mapping";
using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableMappingCreateCommand(
        IngestionMappingKind.Csv,
        tableName,
        tableMappingName,
        new ColumnMapping[]
        {
            new() { ColumnName = "StartTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "0" } } },
            new() { ColumnName = "EndTime", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "1" } } },
            new() { ColumnName = "EpisodeId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "2" } } },
            new() { ColumnName = "EventId", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "3" } } },
            new() { ColumnName = "State", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "4" } } },
            new() { ColumnName = "EventType", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "5" } } },
            new() { ColumnName = "InjuriesDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "6" } } },
            new() { ColumnName = "InjuriesIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "7" } } },
            new() { ColumnName = "DeathsDirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "8" } } },
            new() { ColumnName = "DeathsIndirect", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "9" } } },
            new() { ColumnName = "DamageProperty", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "10" } } },
            new() { ColumnName = "DamageCrops", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "11" } } },
            new() { ColumnName = "Source", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "12" } } },
            new() { ColumnName = "BeginLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "13" } } },
            new() { ColumnName = "EndLocation", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "14" } } },
            new() { ColumnName = "BeginLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "15" } } },
            new() { ColumnName = "BeginLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "16" } } },
            new() { ColumnName = "EndLat", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "17" } } },
            new() { ColumnName = "EndLon", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "18" } } },
            new() { ColumnName = "EpisodeNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "19" } } },
            new() { ColumnName = "EventNarrative", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "20" } } },
            new() { ColumnName = "StormSummary", Properties = new Dictionary<string, string> { { MappingConsts.Ordinal, "21" } } }
        }
    );
    
    await kustoClient.ExecuteControlCommandAsync(databaseName, command);
}

Kötegelési szabályzat definiálása a táblához

A bejövő adatok kötegelése optimalizálja az adat szegmensméretét, amelyet a betöltési kötegelési szabályzat szabályoz. Módosítsa a szabályzatot a betöltési kötegelési házirend kezelése paranccsal. Ezzel a szabályzattal csökkentheti a lassan érkező adatok késését.

using (var kustoClient = KustoClientFactory.CreateCslAdminProvider(kustoConnectionStringBuilder))
{
    var command = CslCommandGenerator.GenerateTableAlterIngestionBatchingPolicyCommand(
        databaseName,
        tableName,
        new IngestionBatchingPolicy(
            maximumBatchingTimeSpan: TimeSpan.FromSeconds(10),
            maximumNumberOfItems: 100,
            maximumRawDataSizeMB: 1024
        )
    );
    kustoClient.ExecuteControlCommand(command);
}

Javasoljuk, hogy határozzon meg egy Raw Data Size értéket a betöltött adatokhoz, és növelje a méretet 250 MB-ra, miközben ellenőrizze, hogy javul-e a teljesítmény.

A tulajdonság használatával kihagyhatja a Flush Immediately kötegelést, bár ez nem ajánlott a nagy léptékű betöltéshez, mivel ez gyenge teljesítményt okozhat.

Üzenet várólistába helyezése a betöltéshez

Üzenet várólistára helyezése a blobtárolóból származó adatok lekéréséhez és az adatok betöltéséhez. Létrejön egy kapcsolat a betöltési fürttel, és egy másik ügyfél jön létre a végponttal való együttműködéshez.

Tipp

Az alábbi kódrészletek szinte minden híváshoz létrehoznak egy ügyfélpéldányt. Ez azért történik, hogy az egyes kódrészletek egyenként futtathatók legyenek. Éles környezetben az ügyfélpéldányok újraküldésre kerülnek, és a szükséges ideig meg kell tartani őket. URI-nként egyetlen ügyfélpéldány elegendő, még akkor is, ha több adatbázissal dolgozik (az adatbázis parancsszinten adható meg).

var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net";
var ingestConnectionStringBuilder = new KustoConnectionStringBuilder(ingestUri).WithAadUserPromptAuthentication(tenantId);
using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestConnectionStringBuilder);
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
{
    Format = DataSourceFormat.csv,
    IngestionMapping = new IngestionMapping
    {
        IngestionMappingReference = tableMappingName,
        IngestionMappingKind = IngestionMappingKind.Csv
    },
    IgnoreFirstRecord = true
};
await ingestClient.IngestFromStorageAsync(blobPath, properties);

Annak ellenőrzése, hogy az adatok betöltése megtörtént-e a táblába

Várjon 5–10 percet, amíg a várólistán lévő betöltési folyamat ütemezi a betöltést, és betölti az adatokat a fürtbe. Ezután futtassa a következő kódot a StormEvents-táblában lévő rekordok számának lekérdezéséhez.

using var cslQueryProvider = KustoClientFactory.CreateCslQueryProvider(kustoConnectionStringBuilder);
var query = $"{tableName} | count";
var results = cslQueryProvider.ExecuteQuery<long>(databaseName, query);
Console.WriteLine(results.Single());

Hibaelhárítási lekérdezések futtatása

Jelentkezzen be a https://dataexplorer.azure.com oldalon, és csatlakozzon a fürthöz. Futtassa a következő parancsot az adatbázisban annak ellenőrzéséhez, hogy voltak-e betöltési hibák az elmúlt négy órában. A futtatás előtt cserélje le az adatbázis nevét.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

Futtassa a következő parancsot az elmúlt négy órában végzett összes betöltési művelet állapotának megtekintéséhez. A futtatás előtt cserélje le az adatbázis nevét.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

Az erőforrások eltávolítása

Ha követni szeretné a többi cikkünket, tartsa meg a létrehozott erőforrásokat. Ha nem szeretné, futtassa a következő parancsot az adatbázisban a StormEvents-tábla felesleges elemeinek eltávolításához.

.drop table StormEvents