Configuración de la ingesta de streaming en el clúster de Azure Data Explorer
Artículo
Tiempo de lectura: 12 minutos
Gracias.
La ingesta de streaming es útil para cargar datos cuando necesite una latencia baja entre la ingesta y la consulta. Considere la posibilidad de usar la ingesta de streaming en los escenarios siguientes:
Se requiere una latencia de menos de un segundo.
Para optimizar el procesamiento operativo de muchas tablas donde el flujo de datos a cada tabla es relativamente pequeño (pocos registros por segundo), pero el volumen de ingesta de datos global es alto (miles de registros por segundo).
Si el flujo de datos en cada tabla es alto (más de 4 GB por hora), considere la posibilidad de usar la ingesta por lotes.
Selección del tipo de ingesta de streaming adecuado
Se admiten dos tipos de ingesta de streaming:
Tipo de ingesta
Descripción
Event Hubs o IoT Hub
Los centros de conectividad se configuran como orígenes de datos de streaming de tablas. Para más información sobre la configuración, consulte los métodos de ingesta de datos de Event Hubs o IoT Hub.
Ingesta personalizada
La ingesta personalizada requiere que escriba una aplicación que use una de las bibliotecas cliente de Azure Data Explorer. Use la información de este tema para configurar la ingesta personalizada. También puede resultar útil la aplicación de ejemplo de ingesta de streaming de C#.
Use la tabla siguiente para ayudarle a elegir el tipo de ingesta adecuado para su entorno:
Criterio
Event Hub o IoT Hub
Ingesta personalizada
Retraso de datos entre el inicio de la ingesta y los datos disponibles para la consulta
Retraso más largo
Retraso más corto
Sobrecarga de desarrollo
Configuración rápida y sencilla, sin sobrecarga de desarrollo
Alta sobrecarga de desarrollo para crear una aplicación que ingiera los datos, controle los errores y garantice la coherencia de los datos
Nota
Puede administrar el proceso para habilitar y deshabilitar la ingesta de streaming en el clúster mediante el Azure Portal o mediante programación en C#. Si usa C# para la aplicación personalizada,puede que le resulte más cómodo mediante el enfoque de programación.
Los principales colaboradores que pueden afectar a la ingesta de streaming son:
Tamaños de máquina virtual y clúster: el rendimiento y la capacidad de la ingesta de streaming se escalan cuando aumentan los tamaños de las máquinas virtuales y los clústeres. El número de solicitudes de ingesta simultáneas está limitado a seis por núcleo. Por ejemplo, en el caso de las SKU de 16 núcleos, como las D14 y L16, la carga máxima admitida es las solicitudes de 96 ingestas simultáneas. En el caso de las SKU de dos núcleos, como la D11, la carga máxima admitida es las solicitudes de 12 ingestas simultáneas.
Límite de tamaño de los datos: el límite del tamaño de los datos para una solicitud de ingesta de streaming es de 4 MB. Esto incluye los datos creados para las directivas de actualización durante la ingesta.
Actualizaciones de esquema: las actualizaciones de esquema, como la creación y modificación de tablas y asignaciones de ingesta, pueden tardar hasta cinco minutos en el servicio de ingesta de streaming. Para más información, consulte Ingesta de streaming y cambios de esquema.
Capacidad de SSD: cuando se habilita la ingesta de streaming en un clúster, incluso cuando los datos no se ingieren a través de streaming, se usa parte del disco SSD local de las máquinas del clúster para los datos de ingesta de streaming y se reduce el almacenamiento disponible para la caché activa.
Habilitación de la ingesta de streaming en el clúster
Al crear un clúster mediante los pasos descritos en Creación de un clúster Azure Data Explorer base de datos,en la pestaña Configuraciones, seleccione Ingesta de streamingen.
Para habilitar la ingesta de streaming durante la creación de un nuevo clúster de Azure Data Explorer, ejecute el siguiente código:
using System.Threading.Tasks;
using Microsoft.Azure.Management.Kusto; // Required package Microsoft.Azure.Management.Kusto
using Microsoft.Azure.Management.Kusto.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory; // Required package Microsoft.IdentityModel.Clients.ActiveDirectory
using Microsoft.Rest;
namespace StreamingIngestion
{
class Program
{
static async Task Main(string[] args)
{
string appId = "<appId>";
string appKey = "<appKey>";
string appTenant = "<appTenant>";
string clusterName = "<clusterName>";
string resourceGroupName = "<resourceGroupName>";
string subscriptionId = "<subscriptionId>";
string location = "<location>";
string skuName = "<skuName>";
string tier = "<tier>";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{appTenant}");
var credential = new ClientCredential(appId, appKey);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var cluster = new Cluster(location, new AzureSku(skuName, tier), enableStreamingIngest:true);
await kustoManagementClient.Clusters.CreateOrUpdateAsync(resourceGroupName, clusterName, cluster);
}
}
}
Habilitación de la ingesta de streaming en un clúster existente
Si tiene un clúster existente, puede habilitar la ingesta de streaming mediante la Azure Portal o mediante programación en C#.
En Azure Portal, vaya al clúster de Azure Data Explorer.
En Configuración, seleccione Configuraciones.
En el panel Configuraciones, seleccione Activado para habilitar la ingesta de streaming.
Seleccione Guardar.
Puede habilitar la ingesta de streaming al crear un nuevo clúster de Azure Data Explorer.
using System.Threading.Tasks;
using Microsoft.Azure.Management.Kusto; // Required package Microsoft.Azure.Management.Kusto
using Microsoft.Azure.Management.Kusto.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory; // Required package Microsoft.IdentityModel.Clients.ActiveDirectory
using Microsoft.Rest;
namespace StreamingIngestion
{
class Program
{
static async Task Main(string[] args)
{
string appId = "<appId>";
string appKey = "<appKey>";
string appTenant = "<appTenant>";
string clusterName = "<clusterName>";
string resourceGroupName = "<resourceGroupName>";
string subscriptionId = "<subscriptionId>";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{appTenant}");
var credential = new ClientCredential(appId, appKey);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var clusterUpdateParameters = new ClusterUpdate(enableStreamingIngest: true);
await kustoManagementClient.Clusters.UpdateAsync(resourceGroupName, clusterName, clusterUpdateParameters);
}
}
}
Creación de una tabla de destino y definición de la directiva
Cree una tabla para recibir los datos de ingesta de streaming y definir su directiva relacionada mediante la Azure Portal o mediante programación en C#.
Copie uno de los siguientes comandos en el panel Consulta y seleccione Ejecutar. Esto define la directiva de ingesta de streaming en la tabla que ha creado o en la base de datos que contiene la tabla.
Sugerencia
Una directiva que se define en el nivel de base de datos se aplica a todas las tablas existentes y futuras de la base de datos.
Para definir la directiva en la tabla que ha creado, use:
import (
"context"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go/kusto/ingest"
"github.com/Azure/go-autorest/autorest/azure/auth"
)
func ingest() {
clusterPath := "https://<clusterName>.kusto.windows.net"
appId := "<appId>"
appKey := "<appKey>"
appTenant := "<appTenant>"
dbName := "<dbName>"
tableName := "<tableName>"
mappingName := "<mappingName>" // Optional, can be nil
// Creates a Kusto Authorizer using your client identity, secret, and tenant identity.
// You may also uses other forms of authorization, see GoDoc > Authorization type.
// auth package is: "github.com/Azure/go-autorest/autorest/azure/auth"
authorizer := kusto.Authorization{
Config: auth.NewClientCredentialsConfig(appId, appKey, appTenant),
}
// Create a client
client, err := kusto.New(clusterPath, authorizer)
if err != nil {
panic("add error handling")
}
// Create an ingestion instance
// Pass the client, the name of the database, and the name of table you wish to ingest into.
in, err := ingest.New(client, dbName, tableName)
if err != nil {
panic("add error handling")
}
// Go currently only supports streaming from a byte array with a maximum size of 4 MB.
jsonEncodedData := []byte("{\"a\": 1, \"b\": 10}\n{\"a\": 2, \"b\": 20}")
// Ingestion from a stream commits blocks of fully formed data encodes (JSON, AVRO, ...) into Kusto:
if err := in.Stream(context.Background(), jsonEncodedData, ingest.JSON, mappingName); err != nil {
panic("add error handling")
}
}
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.result.OperationStatus;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import java.io.FileInputStream;
import java.io.InputStream;
public class FileIngestion {
public static void main(String[] args) throws Exception {
String clusterPath = "https://<clusterName>.kusto.windows.net";
String appId = "<appId>";
String appKey = "<appKey>";
String appTenant = "<appTenant>";
String dbName = "<dbName>";
String tableName = "<tableName>";
// Build connection string and initialize
ConnectionStringBuilder csb =
ConnectionStringBuilder.createWithAadApplicationCredentials(
clusterPath,
appId,
appKey,
appTenant
);
// Initialize client and its properties
IngestClient client = IngestClientFactory.createClient(csb);
IngestionProperties ingestionProperties =
new IngestionProperties(
dbName,
tableName
);
// Ingest from a compressed file
// Create Source info
InputStream zipInputStream = new FileInputStream("MyFile.gz");
StreamSourceInfo zipStreamSourceInfo = new StreamSourceInfo(zipInputStream);
// If the data is compressed
zipStreamSourceInfo.setCompressionType(CompressionType.gz);
// Ingest from stream
OperationStatus status = client.ingestFromStream(zipStreamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
}
}
Deshabilitación de la ingesta de streaming en el clúster
Advertencia
La deshabilitación de la ingesta de streaming puede tardar unas horas.
Antes de deshabilitar la ingesta de streaming en el clúster de Azure Data Explorer, quite la directiva de ingesta de streaming de todas las tablas y bases de datos pertinentes. La eliminación de la directiva de ingesta de streaming desencadena la reorganización de los datos dentro del clúster de Azure Data Explorer. Los datos de ingesta de streaming se trasladan desde el almacenamiento inicial hasta el almacenamiento permanente en el almacén de columnas (extensiones o particiones). Este proceso puede tardar entre unos segundos y algunas horas, en función de la cantidad de datos existentes en el almacenamiento inicial.
Eliminación de la directiva de ingesta de streaming
Puede quitar la directiva de ingesta de streaming mediante el Azure Portal o mediante programación en C#.
La asignación de datos debe haberse creado con anterioridad para su uso en la ingesta de streaming. Las solicitudes individuales de ingesta de streaming no acomodan asignaciones de datos insertadas.
Directiva de actualización. La directiva de actualización solo puede hacer referencia a los datos que se acaban de ingerir en la tabla de origen y no a otros datos o tablas de la base de datos.
Si se usa la ingesta de streaming en cualquiera de las tablas de la base de datos, esta base de datos no se puede usar como líder de bases de datos seguidoras ni como proveedor de datos para Azure Data Share.