Streaming-opname configureren op uw Azure Data Explorer cluster
Artikel
11 minuten om te lezen
Bedankt.
Streaming-opname is handig voor het laden van gegevens wanneer u lage latentie nodig hebt tussen opname en query. Overweeg het gebruik van streaming-opname in de volgende scenario's:
Latentie van minder dan een seconde is vereist.
Om de operationele verwerking van veel tabellen te optimaliseren, waarbij de gegevensstroom in elke tabel relatief klein is (een paar records per seconde), maar het totale gegevens opnamevolume hoog is (duizenden records per seconde).
Als de gegevensstroom in elke tabel hoog is (meer dan 4 GB per uur), kunt u batchgegevens opnemen.
Er worden twee typen streaming-opname ondersteund:
Opnametype
Beschrijving
Event Hub of IoT Hub
Hubs worden geconfigureerd als gegevensbronnen voor tabelstreaming. Zie Event Hub of IoT Hub voor meer informatie over het instellen van deze instellingen.
Gebruik de volgende tabel om het opnametype te kiezen dat geschikt is voor uw omgeving:
Criterium
Event Hub/IoT Hub
Aangepaste opname
Gegevensvertraging tussen opname-initiëren en de gegevens die beschikbaar zijn voor query
Langere vertraging
Kortere vertraging
Ontwikkelingsoverhead
Snelle en eenvoudige installatie, zonder overhead voor ontwikkeling
Hoge ontwikkelingsoverhead voor het maken van een toepassing, het opnemen van de gegevens, het afhandelen van fouten en het garanderen van gegevensconsistentie
Notitie
U kunt het proces voor het in- en uitschakelen van streaming-opname in uw cluster beheren met behulp van de Azure Portal programmatisch in C#. Als u C# gebruikt voor uw aangepaste toepassing,is het wellicht handiger om de programmatische benadering te gebruiken.
De belangrijkste inzenders die van invloed kunnen zijn op streaming-opname zijn:
VM en clustergrootte:de prestaties en capaciteit van streaming-opname worden geschaald met grotere VM- en clustergrootten. Het aantal gelijktijdige opnameaanvragen is beperkt tot zes per kern. Voor SKU's met 16 kernen, zoals D14 en L16, is de maximaal ondersteunde belasting bijvoorbeeld 96 gelijktijdige opnameaanvragen. Voor twee kern-SKU's, zoals D11, is de maximaal ondersteunde belasting 12 gelijktijdige opnameaanvragen.
Limiet voor gegevensgrootte:de limiet voor de gegevensgrootte voor een aanvraag voor streaming-opname is 4 MB. Dit omvat alle gegevens die zijn gemaakt voor updatebeleid tijdens de opname.
SSD-capaciteit:het inschakelen van streaming-opname op een cluster, zelfs wanneer gegevens niet worden opgenomen via streaming, maakt gebruik van een deel van de lokale SSD-schijf van de clustermachines voor streaminggegevens en vermindert de beschikbare opslag voor hot-cache.
Terwijl u een cluster maakt met behulp van de stappen in Een cluster Azure Data Explorer databasemaken, selecteert u op het tabblad Configuraties de optie Streaming-opnameop.
Als u streaming-opname wilt inschakelen tijdens het maken van een Azure Data Explorer cluster, moet u de volgende code uitvoeren:
using System.Threading.Tasks;
using Microsoft.Azure.Management.Kusto; // Required package Microsoft.Azure.Management.Kusto
using Microsoft.Azure.Management.Kusto.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory; // Required package Microsoft.IdentityModel.Clients.ActiveDirectory
using Microsoft.Rest;
namespace StreamingIngestion
{
class Program
{
static async Task Main(string[] args)
{
string appId = "<appId>";
string appKey = "<appKey>";
string appTenant = "<appTenant>";
string clusterName = "<clusterName>";
string resourceGroupName = "<resourceGroupName>";
string subscriptionId = "<subscriptionId>";
string location = "<location>";
string skuName = "<skuName>";
string tier = "<tier>";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{appTenant}");
var credential = new ClientCredential(appId, appKey);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var cluster = new Cluster(location, new AzureSku(skuName, tier), enableStreamingIngest:true);
await kustoManagementClient.Clusters.CreateOrUpdateAsync(resourceGroupName, clusterName, cluster);
}
}
}
Streaming-opname in een bestaand cluster inschakelen
Als u een bestaand cluster hebt, kunt u streaming-opname inschakelen met behulp van de Azure Portal programmatisch in C#.
Als u de tabel wilt maken die de gegevens ontvangt via streaming-opname, kopieert u de volgende opdracht naar het deelvenster Query en selecteert u Uitvoeren.
Kopieer een van de volgende opdrachten naar het deelvenster Query en selecteer Uitvoeren. Hiermee definieert u het streaming-opnamebeleid voor de tabel die u hebt gemaakt of op de database die de tabel bevat.
Tip
Een beleid dat is gedefinieerd op databaseniveau is van toepassing op alle bestaande en toekomstige tabellen in de database.
Gebruik het volgende om het beleid te definiëren voor de tabel die u hebt gemaakt:
import (
"context"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go/kusto/ingest"
"github.com/Azure/go-autorest/autorest/azure/auth"
)
func ingest() {
clusterPath := "https://<clusterName>.kusto.windows.net"
appId := "<appId>"
appKey := "<appKey>"
appTenant := "<appTenant>"
dbName := "<dbName>"
tableName := "<tableName>"
mappingName := "<mappingName>" // Optional, can be nil
// Creates a Kusto Authorizer using your client identity, secret, and tenant identity.
// You may also uses other forms of authorization, see GoDoc > Authorization type.
// auth package is: "github.com/Azure/go-autorest/autorest/azure/auth"
authorizer := kusto.Authorization{
Config: auth.NewClientCredentialsConfig(appId, appKey, appTenant),
}
// Create a client
client, err := kusto.New(clusterPath, authorizer)
if err != nil {
panic("add error handling")
}
// Create an ingestion instance
// Pass the client, the name of the database, and the name of table you wish to ingest into.
in, err := ingest.New(client, dbName, tableName)
if err != nil {
panic("add error handling")
}
// Go currently only supports streaming from a byte array with a maximum size of 4 MB.
jsonEncodedData := []byte("{\"a\": 1, \"b\": 10}\n{\"a\": 2, \"b\": 20}")
// Ingestion from a stream commits blocks of fully formed data encodes (JSON, AVRO, ...) into Kusto:
if err := in.Stream(context.Background(), jsonEncodedData, ingest.JSON, mappingName); err != nil {
panic("add error handling")
}
}
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.result.OperationStatus;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import java.io.FileInputStream;
import java.io.InputStream;
public class FileIngestion {
public static void main(String[] args) throws Exception {
String clusterPath = "https://<clusterName>.kusto.windows.net";
String appId = "<appId>";
String appKey = "<appKey>";
String appTenant = "<appTenant>";
String dbName = "<dbName>";
String tableName = "<tableName>";
// Build connection string and initialize
ConnectionStringBuilder csb =
ConnectionStringBuilder.createWithAadApplicationCredentials(
clusterPath,
appId,
appKey,
appTenant
);
// Initialize client and its properties
IngestClient client = IngestClientFactory.createClient(csb);
IngestionProperties ingestionProperties =
new IngestionProperties(
dbName,
tableName
);
// Ingest from a compressed file
// Create Source info
InputStream zipInputStream = new FileInputStream("MyFile.gz");
StreamSourceInfo zipStreamSourceInfo = new StreamSourceInfo(zipInputStream);
// If the data is compressed
zipStreamSourceInfo.setCompressionType(CompressionType.gz);
// Ingest from stream
OperationStatus status = client.ingestFromStream(zipStreamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
}
}
Streaming-opname in uw cluster uitschakelen
Waarschuwing
Het uitschakelen van streaming-opname kan enkele uren duren.
Voordat u streaming-opname op uw cluster Azure Data Explorer, moet u het opnamebeleid voor streaming verwijderen uit alle relevante tabellen en databases. Het verwijderen van het beleid voor streaming-opname activeert het opnieuw rangschikken van gegevens in Azure Data Explorer cluster. De streaming-opnamegegevens worden verplaatst van de initiële opslag naar de permanente opslag in de kolomopslag (omvang of shards). Dit proces kan enkele seconden tot enkele uren duren, afhankelijk van de hoeveelheid gegevens in de initiële opslag.
Het beleid voor streaming-opname neerzetten
U kunt het beleid voor streaming-opname neerzetten met behulp van Azure Portal programmatisch in C#.
Ga in Azure Portal naar uw Azure Data Explorer cluster en selecteer Query uitvoeren.
Als u het beleid voor streaming-opname uit de tabel wilt verwijderen, kopieert u de volgende opdracht naar het deelvenster Query en selecteert u Uitvoeren.
Selecteer in het deelvenster Configuraties de optie Uit omStreaming-opname uit te schakelen.
Selecteer Opslaan.
Als u het beleid voor streaming-opname uit de tabel wilt neerzetten, moet u de volgende code uitvoeren:
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
namespace StreamingIngestion
{
class Program
{
static async Task Main(string[] args)
{
string clusterPath = "https://<clusterName>.kusto.windows.net";
string appId = "<appId>";
string appKey = "<appKey>";
string appTenant = "<appTenant>";
string dbName = "<dbName>";
string tableName = "<tableName>";
// Create Kusto connection string with App Authentication
var csb =
new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
var tablePolicyDropCommand = CslCommandGenerator.GenerateTableStreamingIngestionPolicyDropCommand(dbName, tableName);
using (var client = KustoClientFactory.CreateCslAdminProvider(csb))
{
client.ExecuteControlCommand(tablePolicyDropCommand);
}
}
}
}
Als u streaming-opname in uw cluster wilt uitschakelen, moet u de volgende code uitvoeren:
using System.Threading.Tasks;
using Microsoft.Azure.Management.Kusto; // Required package Microsoft.Azure.Management.Kusto
using Microsoft.Azure.Management.Kusto.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory; // Required package Microsoft.IdentityModel.Clients.ActiveDirectory
using Microsoft.Rest;
namespace StreamingIngestion
{
class Program
{
static async Task Main(string[] args)
{
string appId = "<appId>";
string appKey = "<appKey>";
string appTenant = "<appTenant>";
string clusterName = "<clusterName>";
string resourceGroupName = "<resourceGroupName>";
string subscriptionId = "<subscriptionId>";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{appTenant}");
var credential = new ClientCredential(appId, appKey);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var clusterUpdateParameters = new ClusterUpdate(enableStreamingIngest: false);
await kustoManagementClient.Clusters.UpdateAsync(resourceGroupName, clusterName, clusterUpdateParameters);
}
}
}
Beperkingen
Databasecursors worden niet ondersteund voor een database als voor de database zelf of een van de tabellen het streaming-opnamebeleid is gedefinieerd en ingeschakeld.
Extent-tags kunnen niet worden ingesteld voor de streaming-opnamegegevens.
Werk beleid bij. Het updatebeleid kan alleen verwijzen naar de zojuist opgenomen gegevens in de brontabel en niet naar andere gegevens of tabellen in de database.
Als streaming-opname wordt gebruikt in een van de tabellen van de database, kan deze database niet worden gebruikt als leider voor volgdatabases of als gegevensprovider voor Azure Data Share.