Configurare l'inserimento di flussi nel cluster di Esplora dati di Azure con C #Configure streaming ingestion on your Azure Data Explorer cluster using C#

Usare l'inserimento di flussi per caricare i dati quando è necessaria una bassa latenza tra inserimento e query.Use streaming ingestion to load data when you need low latency between ingestion and query. L'operazione di inserimento del flusso viene completata in meno di 10 secondi e i dati sono immediatamente disponibili per la query dopo il completamento.The streaming ingestion operation completes in under 10 seconds, and your data is immediately available for query after completion. Questo metodo di inserimento è adatto per l'inserimento di un volume elevato di dati, ad esempio migliaia di record al secondo, distribuiti in migliaia di tabelle.This ingestion method is suitable for ingesting a high volume of data, such as thousands of records per second, spread over thousands of tables. Ogni tabella riceve un volume di dati relativamente basso, ad esempio alcuni record al secondo.Each table receives a relatively low volume of data, such as a few records per second.

Usare l'inserimento bulk anziché l'inserimento di flussi quando la quantità di dati inseriti supera i 4 GB all'ora per ogni tabella.Use bulk ingestion instead of streaming ingestion when the amount of data ingested exceeds 4 GB per hour per table.

Per ulteriori informazioni sui diversi metodi di inserimento, vedere Cenni preliminarisull'inserimento dati.To learn more about different ingestion methods, see data ingestion overview.

PrerequisitiPrerequisites

Abilitare l'inserimento di flussi nel cluster con C #Enable streaming ingestion on your cluster using C#

Abilitare l'inserimento di flussi durante la creazione di un nuovo clusterEnable streaming ingestion while creating a new cluster

È possibile abilitare l'inserimento di flussi durante la creazione di un nuovo cluster Esplora dati di Azure.You can enable streaming ingestion while creating a new Azure Data Explorer cluster.

...
var cluster = new Cluster(location, sku, enableStreamingIngest:true);
...

Abilitare l'inserimento di flussi in un cluster esistenteEnable streaming ingestion on an existing cluster

Per abilitare l'inserimento di flussi nel cluster Esplora dati di Azure, eseguire il codice seguente:To enable streaming ingestion on your Azure Data Explorer cluster, run the following code:

    var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Directory (tenant) ID
    var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Application ID
    var clientSecret = "xxxxxxxxxxxxxx";//Client Secret
    var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
    var authenticationContext = new AuthenticationContext($"https://login.windows.net/{tenantId}");
    var credential = new ClientCredential(clientId, clientSecret);
    var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);

    var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);

    var kustoManagementClient = new KustoManagementClient(credentials)
    {
        SubscriptionId = subscriptionId
    };

    var resourceGroupName = "testrg";
    var clusterName = "mystreamingcluster";
    var clusterUpdateParameters = new ClusterUpdate(enableStreamingIngest: true);

    await kustoManagementClient.Clusters.UpdateAsync(resourceGroupName, clusterName, clusterUpdateParameters);

Avviso

Esaminare le limitazioni prima di abilitare l'inserimento del vapore.Review the limitations prior to enabling steaming ingestion.

Creare una tabella di destinazione e definire i criteri tramite C #Create a target table and define the policy using C#

Per creare una tabella e definire i criteri di inserimento dei flussi in questa tabella, eseguire il codice seguente:To create a table and define a streaming ingestion policy on this table, run the following code:

    var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Directory (tenant) ID
    var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Application ID
    var clientSecret = "xxxxxxxxxxxxxx";//Client Secret
    var databaseName = "StreamingTestDb";
    var tableName = "TestTable";
    var kcsb = new KustoConnectionStringBuilder("https://mystreamingcluster.westcentralus.kusto.windows.net", databaseName);
    kcsb = kcsb.WithAadApplicationKeyAuthentication(clientId, clientSecret, tenantId);

    var tableSchema = new TableSchema(
        tableName,
        new ColumnSchema[]
        {
            new ColumnSchema("TimeStamp", "System.DateTime"),
            new ColumnSchema("Name",      "System.String"),
            new ColumnSchema("Metric",    "System.int"),
            new ColumnSchema("Source",    "System.String"),
        });

    var tableCreateCommand = CslCommandGenerator.GenerateTableCreateCommand(tableSchema);
    var tablePolicyAlterCommand = CslCommandGenerator.GenerateTableAlterStreamingIngestionPolicyCommand(tableName, isEnabled: true);
    using (var client = KustoClientFactory.CreateCslAdminProvider(kcsb))
    {
        client.ExecuteControlCommand(tableCreateCommand);

        client.ExecuteControlCommand(tablePolicyAlterCommand);
    }

Usare l'inserimento di flussi per inserire i dati nel clusterUse streaming ingestion to ingest data to your cluster

Sono supportati due tipi di inserimento di flussi:Two streaming ingestion types are supported:

  • Hub eventi o HubInternet, usato come origine dati.Event Hub or IoT Hub, which is used as a data source.
  • Per l'inserimento personalizzato è necessario scrivere un'applicazione che usi una delle librerie clientdi Azure Esplora dati.Custom ingestion requires you to write an application that uses one of the Azure Data Explorer client libraries. Per un'applicazione di esempio, vedere esempio di inserimento di flussi .See streaming ingestion sample for a sample application.

Scegliere il tipo di inserimento flusso appropriatoChoose the appropriate streaming ingestion type

CriterioCriterion Hub eventiEvent Hub Inserimento personalizzatoCustom Ingestion
Ritardo dei dati tra l'avvio dell'inserimento e i dati disponibili per la queryData delay between ingestion initiation and the data available for query Ritardo più lungoLonger delay Ritardo più breveShorter delay
Overhead di sviluppoDevelopment overhead Configurazione rapida e semplice, nessun sovraccarico di sviluppoFast and easy setup, no development overhead Elevato sovraccarico di sviluppo per l'applicazione per gestire gli errori e assicurare la coerenza dei datiHigh development overhead for application to handle errors and ensure data consistency

Disabilitare l'inserimento di flussi nel clusterDisable streaming ingestion on your cluster

Avviso

La disabilitazione dell'inserimento di flussi potrebbe richiedere alcune ore.Disabling streaming ingestion may take a few hours.

Prima di disabilitare l'inserimento di flussi nel cluster di Esplora dati di Azure, eliminare i criteri di inserimento dei flussi da tutte le tabelle e i database pertinenti.Before disabling streaming ingestion on your Azure Data Explorer cluster, drop the streaming ingestion policy from all relevant tables and databases. La rimozione dei criteri di inserimento dei flussi attiva la riorganizzazione dei dati all'interno del cluster Esplora dati di Azure.The removal of the streaming ingestion policy triggers data rearrangement inside your Azure Data Explorer cluster. I dati di inserimento dei flussi vengono spostati dalla risorsa di archiviazione iniziale all'archiviazione permanente nell'archivio colonne (extent o partizioni).The streaming ingestion data is moved from the initial storage to permanent storage in the column store (extents or shards). Questo processo può richiedere da alcuni secondi ad alcune ore, a seconda della quantità di dati nell'archiviazione iniziale.This process can take between a few seconds to a few hours, depending on the amount of data in the initial storage.

Eliminare i criteri di inserimento dei flussi utilizzando C #Drop the streaming ingestion policy using C#

Per eliminare i criteri di inserimento dei flussi dalla tabella, eseguire il codice seguente:To drop the streaming ingestion policy from the table, run the following code:

        var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Directory (tenant) ID
        var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Application ID
        var clientSecret = "xxxxxxxxxxxxxx";//Client Secret
        var databaseName = "StreamingTestDb";
        var tableName = "TestTable";
        var kcsb = new KustoConnectionStringBuilder("https://mystreamingcluster.westcentralus.kusto.windows.net", databaseName);
        kcsb = kcsb.WithAadApplicationKeyAuthentication(clientId, clientSecret, tenantId);
    
        var tablePolicyDropCommand = CslCommandGenerator.GenerateTableStreamingIngestionPolicyDropCommand(databaseName, tableName);
        using (var client = KustoClientFactory.CreateCslAdminProvider(kcsb))
        {
            client.ExecuteControlCommand(tablePolicyDropCommand);
        }

Disabilitare l'inserimento di flussi tramite C #Disable streaming ingestion using C#

Per disabilitare l'inserimento di flussi nel cluster, eseguire il codice seguente:To disable streaming ingestion on your cluster, run the following code:

        var tenantId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Directory (tenant) ID
        var clientId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";//Application ID
        var clientSecret = "xxxxxxxxxxxxxx";//Client Secret
        var subscriptionId = "xxxxxxxx-xxxxx-xxxx-xxxx-xxxxxxxxx";
        var authenticationContext = new AuthenticationContext($"https://login.windows.net/{tenantId}");
        var credential = new ClientCredential(clientId, clientSecret);
        var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
    
        var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
    
        var kustoManagementClient = new KustoManagementClient(credentials)
        {
            SubscriptionId = subscriptionId
        };
    
        var resourceGroupName = "testrg";
        var clusterName = "mystreamingcluster";
        var clusterUpdateParameters = new ClusterUpdate(enableStreamingIngest: false);
    
        await kustoManagementClient.Clusters.UpdateAsync(resourceGroupName, clusterName, clusterUpdateParameters);

LimitazioniLimitations

  • I cursori di database non sono supportati per un database se il database stesso o una delle relative tabelle include i criteri di inserimento dei flussi definiti e abilitati.Database cursors aren't supported for a database if the database itself or any of its tables have the Streaming ingestion policy defined and enabled.
  • I mapping dei dati devono essere creati in precedenza per l'uso nell'inserimento di flussi.Data mappings must be pre-created for use in streaming ingestion. Le richieste di inserimento di flussi individuali non supportano i mapping dei dati inline.Individual streaming ingestion requests don't accommodate inline data mappings.
  • Le prestazioni e la capacità di inserimento dei flussi vengono ridimensionate con dimensioni maggiori di macchine virtuali e cluster.Streaming ingestion performance and capacity scales with increased VM and cluster sizes. Il numero di richieste di inserimento simultanee è limitato a sei per core.The number of concurrent ingestion requests is limited to six per core. Per gli SKU a 16 core, ad esempio D14 e L16, il carico massimo supportato è 96 richieste di inserimento simultanee.For example, for 16 core SKUs, such as D14 and L16, the maximal supported load is 96 concurrent ingestion requests. Per due SKU principali, ad esempio D11, il carico massimo supportato è 12 richieste di inserimento simultanee.For two core SKUs, such as D11, the maximal supported load is 12 concurrent ingestion requests.
  • Il limite per le dimensioni dei dati per la richiesta di inserimento del flusso è 4 MB.The data size limit for streaming ingestion request is 4 MB.
  • Gli aggiornamenti dello schema, ad esempio la creazione e la modifica di tabelle e mapping di inserimento, possono richiedere fino a cinque minuti per il servizio di inserimento di flussi.Schema updates, such as creation and modification of tables and ingestion mappings, may take up to five minutes for the streaming ingestion service. Per altre informazioni, vedere Inserimento di flussi e modifiche dello schema.For more information see Streaming ingestion and schema changes.
  • L'abilitazione dell'inserimento di flussi in un cluster, anche quando i dati non vengono inseriti tramite lo streaming, usa una parte del disco SSD locale dei computer del cluster per il flusso dei dati di inserimento e riduce lo spazio di archiviazione disponibile per la cache a caldo.Enabling streaming ingestion on a cluster, even when data isn't ingested via streaming, uses part of the local SSD disk of the cluster machines for streaming ingestion data and reduces the storage available for hot cache.
  • I tag di extent non possono essere impostati sui dati di inserimento del flusso.Extent tags can't be set on the streaming ingestion data.
  • Se l'inserimento di flussi viene usato in una qualsiasi delle tabelle del database, questo database non può essere usato come leader per i database di seguito o come provider di dati per la condivisione di dati di Azure.If streaming ingestion is used on any of the tables of the database, this database cannot be used as leader for follower databases or as a data provider for Azure Data Share.

Passaggi successiviNext steps