Konfigurera strömningsinmatning på ditt Azure Data Explorer kluster
Artikel
11 minuter för att läsa
Tack!
Strömningsinmatning är användbart för att läsa in data när du behöver korta svarstider mellan inmatning och fråga. Överväg att använda strömningsinmatning i följande scenarier:
Svarstid på mindre än en sekund krävs.
För att optimera den operativa bearbetningen av många tabeller där dataströmmen till varje tabell är relativt liten (några poster per sekund), men den övergripande datainmatningsvolymen är hög (tusentals poster per sekund).
Om dataströmmen till varje tabell är hög (över 4 GB per timme) bör du överväga att använda batchinmatning.
Hubbar konfigureras som strömmande tabelldatakällor. Information om hur du inställningar finns i Event Hub ellerIoT Hub datainmatningsmetoder.
Anpassad inmatning
Anpassad inmatning kräver att du skriver ett program som använder ett av Azure Data Explorer klientbiblioteken. Använd informationen i det här avsnittet för att konfigurera anpassad inmatning. Exempelprogrammet för C#-strömningsinmatning kan också vara användbart.
Använd följande tabell när du ska välja den inmatningstyp som passar din miljö:
Kriterium
Händelsehubb/IoT Hub
Anpassad inmatning
Datafördröjning mellan initiering av datainmatning och tillgängliga data för fråga
Längre fördröjning
Kortare fördröjning
Omkostnader för utveckling
Snabb och enkel installation, inga omkostnader för utveckling
Höga utvecklingskostnader för att skapa ett program som matar in data, hanterar fel och säkerställer datakonsekvens
Anteckning
Du kan hantera processen för att aktivera och inaktivera strömningsinmatning i klustret med hjälp Azure Portal eller programmässigt i C#. Om du använder C# för ditt anpassade program kandet vara enklare att använda den programmatiska metoden.
De viktigaste bidragsgivarna som kan påverka strömningsinmatningen är:
VM- och klusterstorlek:Strömningsinmatningsprestanda och kapacitetsskalor med ökade VM- och klusterstorlekar. Antalet samtidiga inmatningsbegäranden är begränsat till sex per kärna. För SKU:er med 16 kärnor, till exempel D14 och L16, är den maximala belastningen som stöds 96 samtidiga inmatningsbegäranden. För två kärn-SKU:er, till exempel D11, är den maximala belastningen som stöds 12 samtidiga inmatningsbegäranden.
Storleksgräns fördata: Datastorleksgränsen för en begäran om strömningsinmatning är 4 MB. Detta inkluderar alla data som skapas för uppdateringsprinciper under inmatningen.
Schemauppdateringar:Schemauppdateringar, till exempel skapande och ändring av tabeller och inmatningsmappningar, kan ta upp till fem minuter för inmatningstjänsten för direktuppspelning. Mer information finns i Strömningsinmatning och schemaändringar.
SSD-kapacitet:Om du aktiverar strömningsinmatning i ett kluster, även om data inte matas in via strömning, används en del av den lokala SSD-disken på klusterdatorerna för strömmande inmatningsdata och minskar lagringsutrymmet som är tillgängligt för cachelagring med hög belastning.
Aktivera strömningsinmatning i klustret
Innan du kan använda strömningsinmatning måste du aktivera funktionen i klustret och definiera en princip för strömningsinmatning. Du kan aktivera funktionen när du skapar klustret ellerlägga till den i ett befintligt kluster.
Varning
Granska begränsningarna innan du aktiverar strömningsinmatning.
Aktivera strömningsinmatning när du skapar ett nytt kluster
Du kan aktivera strömningsinmatning när du skapar ett nytt kluster med hjälp Azure Portal eller programmässigt i C#.
När du skapar ett kluster med hjälp av stegen i Skapa ett Azure Data Explorer-klusteroch databas går du till fliken Konfigurationer och väljer Strömningsinmatningpå.
Om du vill aktivera strömningsinmatning när du skapar Azure Data Explorer ett nytt kluster kör du följande kod:
using System.Threading.Tasks;
using Microsoft.Azure.Management.Kusto; // Required package Microsoft.Azure.Management.Kusto
using Microsoft.Azure.Management.Kusto.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory; // Required package Microsoft.IdentityModel.Clients.ActiveDirectory
using Microsoft.Rest;
namespace StreamingIngestion
{
class Program
{
static async Task Main(string[] args)
{
string appId = "<appId>";
string appKey = "<appKey>";
string appTenant = "<appTenant>";
string clusterName = "<clusterName>";
string resourceGroupName = "<resourceGroupName>";
string subscriptionId = "<subscriptionId>";
string location = "<location>";
string skuName = "<skuName>";
string tier = "<tier>";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{appTenant}");
var credential = new ClientCredential(appId, appKey);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var cluster = new Cluster(location, new AzureSku(skuName, tier), enableStreamingIngest:true);
await kustoManagementClient.Clusters.CreateOrUpdateAsync(resourceGroupName, clusterName, cluster);
}
}
}
Aktivera strömningsinmatning i ett befintligt kluster
Om du har ett befintligt kluster kan du aktivera strömningsinmatning med hjälp av Azure Portal eller programmässigt i C#.
Kopiera ett av följande kommandon till fönstret Fråga och välj Kör. Detta definierar strömningsinmatningsprincipen för den tabell som du skapade eller på databasen som innehåller tabellen.
Tips
En princip som definieras på databasnivå gäller för alla befintliga och framtida tabeller i databasen.
Om du vill definiera principen för den tabell som du skapade använder du:
import (
"context"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go/kusto/ingest"
"github.com/Azure/go-autorest/autorest/azure/auth"
)
func ingest() {
clusterPath := "https://<clusterName>.kusto.windows.net"
appId := "<appId>"
appKey := "<appKey>"
appTenant := "<appTenant>"
dbName := "<dbName>"
tableName := "<tableName>"
mappingName := "<mappingName>" // Optional, can be nil
// Creates a Kusto Authorizer using your client identity, secret, and tenant identity.
// You may also uses other forms of authorization, see GoDoc > Authorization type.
// auth package is: "github.com/Azure/go-autorest/autorest/azure/auth"
authorizer := kusto.Authorization{
Config: auth.NewClientCredentialsConfig(appId, appKey, appTenant),
}
// Create a client
client, err := kusto.New(clusterPath, authorizer)
if err != nil {
panic("add error handling")
}
// Create an ingestion instance
// Pass the client, the name of the database, and the name of table you wish to ingest into.
in, err := ingest.New(client, dbName, tableName)
if err != nil {
panic("add error handling")
}
// Go currently only supports streaming from a byte array with a maximum size of 4 MB.
jsonEncodedData := []byte("{\"a\": 1, \"b\": 10}\n{\"a\": 2, \"b\": 20}")
// Ingestion from a stream commits blocks of fully formed data encodes (JSON, AVRO, ...) into Kusto:
if err := in.Stream(context.Background(), jsonEncodedData, ingest.JSON, mappingName); err != nil {
panic("add error handling")
}
}
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.result.OperationStatus;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import java.io.FileInputStream;
import java.io.InputStream;
public class FileIngestion {
public static void main(String[] args) throws Exception {
String clusterPath = "https://<clusterName>.kusto.windows.net";
String appId = "<appId>";
String appKey = "<appKey>";
String appTenant = "<appTenant>";
String dbName = "<dbName>";
String tableName = "<tableName>";
// Build connection string and initialize
ConnectionStringBuilder csb =
ConnectionStringBuilder.createWithAadApplicationCredentials(
clusterPath,
appId,
appKey,
appTenant
);
// Initialize client and its properties
IngestClient client = IngestClientFactory.createClient(csb);
IngestionProperties ingestionProperties =
new IngestionProperties(
dbName,
tableName
);
// Ingest from a compressed file
// Create Source info
InputStream zipInputStream = new FileInputStream("MyFile.gz");
StreamSourceInfo zipStreamSourceInfo = new StreamSourceInfo(zipInputStream);
// If the data is compressed
zipStreamSourceInfo.setCompressionType(CompressionType.gz);
// Ingest from stream
OperationStatus status = client.ingestFromStream(zipStreamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
}
}
Inaktivera strömningsinmatning i klustret
Varning
Det kan ta några timmar att inaktivera strömningsinmatning.
Innan du inaktiverar strömningsinmatning på ditt Azure Data Explorer ska du ta bort principen för strömningsinmatning från alla relevanta tabeller och databaser. Borttagningen av strömningsinmatningsprincipen utlöser omflyttning av data i ditt Azure Data Explorer kluster. Strömningsinmatningsdata flyttas från den inledande lagringen till den permanenta lagringen i kolumnlagret (utrymme eller shards). Den här processen kan ta mellan några sekunder och några timmar, beroende på mängden data i den inledande lagringen.
Ta bort strömningsinmatningsprincipen
Du kan ta bort strömningsinmatningsprincipen med hjälp av Azure Portal eller programmässigt i C#.
I den Azure Portal går du till ditt Azure Data Explorer kluster och väljer Fråga.
Om du vill ta bort strömningsinmatningsprincipen från tabellen kopierar du följande kommando till frågefönstret och väljer Kör.
.delete table TestTable policy streamingingestion
I Inställningarväljer du Konfigurationer.
I fönstret Konfigurationer väljer du Av för att inaktivera strömningsinmatning.
Välj Spara.
Om du vill ta bort strömningsinmatningsprincipen från tabellen kör du följande kod:
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
namespace StreamingIngestion
{
class Program
{
static async Task Main(string[] args)
{
string clusterPath = "https://<clusterName>.kusto.windows.net";
string appId = "<appId>";
string appKey = "<appKey>";
string appTenant = "<appTenant>";
string dbName = "<dbName>";
string tableName = "<tableName>";
// Create Kusto connection string with App Authentication
var csb =
new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
var tablePolicyDropCommand = CslCommandGenerator.GenerateTableStreamingIngestionPolicyDropCommand(dbName, tableName);
using (var client = KustoClientFactory.CreateCslAdminProvider(csb))
{
client.ExecuteControlCommand(tablePolicyDropCommand);
}
}
}
}
Om du vill inaktivera strömningsinmatning i klustret kör du följande kod:
using System.Threading.Tasks;
using Microsoft.Azure.Management.Kusto; // Required package Microsoft.Azure.Management.Kusto
using Microsoft.Azure.Management.Kusto.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory; // Required package Microsoft.IdentityModel.Clients.ActiveDirectory
using Microsoft.Rest;
namespace StreamingIngestion
{
class Program
{
static async Task Main(string[] args)
{
string appId = "<appId>";
string appKey = "<appKey>";
string appTenant = "<appTenant>";
string clusterName = "<clusterName>";
string resourceGroupName = "<resourceGroupName>";
string subscriptionId = "<subscriptionId>";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{appTenant}");
var credential = new ClientCredential(appId, appKey);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var clusterUpdateParameters = new ClusterUpdate(enableStreamingIngest: false);
await kustoManagementClient.Clusters.UpdateAsync(resourceGroupName, clusterName, clusterUpdateParameters);
}
}
}
Begränsningar
Databasmarkörer stöds inte för en databas om själva databasen eller någon av dess tabeller har principen för strömningsinmatning definierad och aktiverad.
Datamappningar måste skapas i förväg för användning vid strömningsinmatning. Enskilda begäranden om strömningsinmatning tar inte hänsyn till infogade datamappningar.
Det går inte att ange platstaggar för strömmande inmatningsdata.
Uppdatera principen. Uppdateringsprincipen kan endast referera till nyligen indata i källtabellen och inte andra data eller tabeller i databasen.
Om strömningsinmatning används på någon av databastabellerna kan den här databasen inte användas som ledare för följardatabaser eller som dataleverantör för Azure Data Share.