Ingesta de datos mediante la biblioteca de Node de Azure Data Explorer

El Explorador de datos de Azure es un servicio de exploración de datos altamente escalable y rápido para datos de telemetría y registro. Azure Data Explorer proporciona dos bibliotecas cliente para Node: una biblioteca de ingesta y una biblioteca de datos. Estas bibliotecas permiten ingerir (cargar) datos en un clúster y consultar datos desde el código. En este artículo, primero creará una tabla y la asignación de datos en un clúster de prueba. A continuación, pondrá en cola la ingesta en el clúster y validará los resultados.

Si no tiene una suscripción a Azure, cree una cuenta gratuita de Azure antes de empezar.

Requisitos previos

  • Una cuenta de Microsoft o una identidad de usuario Microsoft Entra. No se necesita una suscripción a Azure.
  • Un clúster y la base de datos de Azure Data Explorer. Cree un clúster y una base de datos.
  • Node.js instalado en el equipo de desarrollo

Instalación de los datos y las bibliotecas de ingesta

Instale azure-kusto-ingest y azure-kusto-data.

npm i azure-kusto-ingest@^3.3.2 azure-kusto-data@^3.3.2

Incorporación de instrucciones de importación y constantes

Importación de clases de las bibliotecas


const { Client: KustoClient, KustoConnectionStringBuilder } =  require('azure-kusto-data');
const {
    IngestClient: KustoIngestClient,
    IngestionProperties,
    IngestionDescriptors,
    DataFormat,
    IngestionMappingKind,
} =  require("azure-kusto-ingest");

Para autenticar una aplicación, Azure Data Explorer usa el identificador de inquilino de Microsoft Entra. Para buscar el identificador de inquilino, consulte Búsqueda del identificador del inquilino de Microsoft 365.

Establezca los valores de authorityId, kustoUri, kustoIngestUri y kustoDatabase antes de ejecutar este código.

const cluster = "MyCluster";
const region = "westus";
const authorityId = "microsoft.com";
const kustoUri = `https://${cluster}.${region}.kusto.windows.net`;
const kustoIngestUri = `https://ingest-${cluster}.${region}.kusto.windows.net`;
const kustoDatabase  = "Weather";

Ahora, cree la cadena de conexión. En este ejemplo se utiliza la autenticación de dispositivos para acceder al clúster. Compruebe la salida de la consola para completar la autenticación. También puede usar un certificado de aplicación de Microsoft Entra, una clave de aplicación y un usuario y una contraseña.

Creará la tabla de destino y la asignación en un paso posterior.

const kcsbIngest = KustoConnectionStringBuilder.withAadDeviceAuthentication(kustoIngestUri, authorityId);
const kcsbData = KustoConnectionStringBuilder.withAadDeviceAuthentication(kustoUri, authorityId);
const destTable = "StormEvents";
const destTableMapping = "StormEvents_CSV_Mapping";

Definición de la información del archivo de origen

Importe más clases y defina las constantes para el archivo de origen de datos. Este ejemplo utiliza un archivo de ejemplo hospedado en Azure Blob Storage. El conjunto de datos de ejemplo StormEvents contiene datos relacionados con el tiempo de los Centros Nacionales de Información Ambiental.

const container = "samplefiles";
const account = "kustosamples";
const sas = "";  // If relevant add SAS token
const filePath = "StormEvents.csv";
const blobPath = `https://${account}.blob.core.windows.net/${container}/${filePath}${sas}`;

Creación de una tabla en el clúster de prueba

Cree una tabla que coincida con el esquema de los datos del archivo StormEvents.csv. Cuando se ejecuta este código, devuelve un mensaje similar al siguiente: Para iniciar sesión, use un explorador web para abrir la página https://microsoft.com/devicelogin y escriba el código XXXXXXXXX para autenticarse. Siga los pasos para iniciar sesión y, a continuación, vuelva a ejecutar el siguiente bloque de código. Los bloques de código subsiguientes que establecen una conexión requieren que vuelva a iniciar sesión.

const kustoClient = new KustoClient(kcsbData);
const createTableCommand = `.create table ${destTable} (StartTime: datetime, EndTime: datetime, EpisodeId: int, EventId: int, State: string, EventType: string, InjuriesDirect: int, InjuriesIndirect: int, DeathsDirect: int, DeathsIndirect: int, DamageProperty: int, DamageCrops: int, Source: string, BeginLocation: string, EndLocation: string, BeginLat: real, BeginLon: real, EndLat: real, EndLon: real, EpisodeNarrative: string, EventNarrative: string, StormSummary: dynamic)`;

const createTableResults = await kustoClient.executeMgmt(kustoDatabase, createTableCommand);
console.log(createTableResults.primaryResults[0].toJSON().data);

Definición de la asignación de ingesta

Asigna los datos de CSV entrantes a los nombres de columna y tipos de datos utilizados al crear la tabla.

const createMappingCommand = `.create table ${destTable} ingestion csv mapping '${destTableMapping}' '[{"Name":"StartTime","datatype":"datetime","Ordinal":0}, {"Name":"EndTime","datatype":"datetime","Ordinal":1},{"Name":"EpisodeId","datatype":"int","Ordinal":2},{"Name":"EventId","datatype":"int","Ordinal":3},{"Name":"State","datatype":"string","Ordinal":4},{"Name":"EventType","datatype":"string","Ordinal":5},{"Name":"InjuriesDirect","datatype":"int","Ordinal":6},{"Name":"InjuriesIndirect","datatype":"int","Ordinal":7},{"Name":"DeathsDirect","datatype":"int","Ordinal":8},{"Name":"DeathsIndirect","datatype":"int","Ordinal":9},{"Name":"DamageProperty","datatype":"int","Ordinal":10},{"Name":"DamageCrops","datatype":"int","Ordinal":11},{"Name":"Source","datatype":"string","Ordinal":12},{"Name":"BeginLocation","datatype":"string","Ordinal":13},{"Name":"EndLocation","datatype":"string","Ordinal":14},{"Name":"BeginLat","datatype":"real","Ordinal":16},{"Name":"BeginLon","datatype":"real","Ordinal":17},{"Name":"EndLat","datatype":"real","Ordinal":18},{"Name":"EndLon","datatype":"real","Ordinal":19},{"Name":"EpisodeNarrative","datatype":"string","Ordinal":20},{"Name":"EventNarrative","datatype":"string","Ordinal":21},{"Name":"StormSummary","datatype":"dynamic","Ordinal":22}]'`;

const mappingCommandResults = await kustoClient.executeMgmt(kustoDatabase, createMappingCommand);
console.log(mappingCommandResults.primaryResults[0].toJSON().data);

Colocación de un mensaje en cola para la ingesta

Coloca en cola un mensaje para extraer datos desde Blob Storage e introduce esos datos en el Explorador de datos de Azure.

const defaultProps  = new IngestionProperties({
    database: kustoDatabase,
    table: destTable,
    format: DataFormat.CSV,
    ingestionMappingReference: destTableMapping,
    ingestionMappingKind: IngestionMappingKind.CSV,
    additionalProperties: {ignoreFirstRecord: true},
});

const ingestClient = new KustoIngestClient(kcsbIngest, defaultProps);
// All ingestion properties are documented here: https://learn.microsoft.com/azure/kusto/management/data-ingest#ingestion-properties

const blobDesc = new BlobDescriptor(blobPath, 10);
try {
	const ingestionResult = await ingestClient.ingestFromBlob(blobDesc, null);
} catch (err) {
	// Handle errors
}

Validación de que la tabla contiene datos

Valide la ingesta de los datos en la tabla. Espere entre cinco y diez minutos para que la ingesta en cola programe la ingesta y cargue los datos en el Explorador de datos de Azure. A continuación, ejecute el siguiente código para obtener el recuento de registros de la tabla StormEvents.

const query = `${destTable} | count`;

var tableResults = await kustoClient.execute(kustoDatabase, query);
console.log(tableResults.primaryResults[0].toJSON().data);

Ejecución de consultas de solución de problemas

Inicie sesión en https://dataexplorer.azure.com y conéctese al clúster Ejecute el siguiente comando en la base de datos para ver si se ha producido algún error de ingesta en las últimas cuatro horas. Reemplace el nombre de la base de datos antes de ejecutarlo.

.show ingestion failures
| where FailedOn > ago(4h) and Database == "<DatabaseName>"

Ejecute el siguiente comando para ver el estado de todas las operaciones de ingesta en las últimas cuatro horas. Reemplace el nombre de la base de datos antes de ejecutarlo.

.show operations
| where StartedOn > ago(4h) and Database == "<DatabaseName>" and Operation == "DataIngestPull"
| summarize arg_max(LastUpdatedOn, *) by OperationId

Limpieza de recursos

Si tiene previsto seguir nuestros otros artículos, conserve los recursos que creó. De lo contrario, ejecute el siguiente comando en la base de datos para limpiar la tabla StormEvents.

.drop table StormEvents