Konfigurace příjmu streamování v clusteru Azure Průzkumník dat
Článek
11 min ke čtení
Děkujeme.
Ingestování streamování je užitečné pro načítání dat, když potřebujete nízkou latenci mezi přijetím a dotazem. Vezměte v úvahu použití příjmu streamování v následujících situacích:
Je vyžadována latence menší než sekunda.
K optimalizaci provozního zpracování mnoha tabulek, ve kterých je datový proud dat do každé tabulky relativně malý (několik záznamů za sekundu), ale celkový objem příjmu dat je vysoký (tisíce záznamů za sekundu).
Pokud je datový proud dat v každé tabulce vysoký (více než 4 GB za hodinu), zvažte použití příjmu dávky.
Další informace o různých metodách přijímání naleznete v tématu Přehled příjmu dat.
Zvolit vhodný typ příjmu streamování
Podporují se dva typy ingestování streamování:
Typ přijímání
Popis
Centrum událostí nebo IoT Hub
Rozbočovače jsou nakonfigurovány jako zdroje dat streamování tabulek. Informace o tom, jak tyto informace nastavovat, najdete v tématu centrum událostí nebo IoT Hub metody ingestování dat.
Následující tabulka vám může při výběru typu ingestování, který je vhodný pro vaše prostředí, použít:
Kritérium
Centrum událostí/IoT Hub
Vlastní ingestování
Zpoždění dat mezi zahájením příjmu a daty dostupnými pro dotaz
Delší zpoždění
Kratší zpoždění
Režie vývoje
Rychlé a snadné nastavení bez režie vývoje
Vysoce vyvinutá režie pro vytváření aplikací pro ingestování dat, zpracování chyb a zajištění konzistence dat
Poznámka
Pomocí Azure Portal nebo programově v jazyce C# můžete spravovat proces Povolení a zakázání příjmu datových proudů ve vašem clusteru. Pokud používáte jazyk C# pro vlastní aplikaci, může být pohodlnější pomocí přístupového přístupu.
Hlavní přispěvatelé, kteří můžou ovlivnit přijímání streamování, jsou:
Velikost virtuálního počítače a clusteru: zvýšení výkonu a kapacity příjmu streamování s využitím vyšší velikosti virtuálních počítačů a clusterů. Počet souběžných žádostí o přijetí změn je omezený na šest na jádro. Například u 16 základních SKU, jako je například D14 a L16, je maximální podporované zatížení 96 souběžných žádostí o přijetí změn. U dvou základních SKU, jako je D11, je maximální podporované zatížení 12 souběžných žádostí o přijetí změn.
Omezení velikosti dat: omezení velikosti dat pro žádost o přijetí služby streamování je 4 MB. To zahrnuje všechna data vytvořená pro zásady aktualizace během příjmu.
Aktualizace schématu: aktualizace schématu, jako je vytváření a úprava tabulek a mapování příjmu, můžou trvat až pět minut, než se služba pro přijímání streamování streamuje. Další informace najdete v tématu změny příjmu streamování a schématu.
Kapacita SSD: Povolení příjmu streamování v clusteru, i když se data ingestují prostřednictvím streamování, používá část místního disku SSD clusterových počítačů pro data ingestování streamování a snižuje úložiště dostupné pro hotkou mezipaměť.
Chcete-li vytvořit tabulku, která bude přijímat data prostřednictvím příjmu streamování, zkopírujte následující příkaz do podokna dotazu a vyberte Spustit.
Zkopírujte jeden z následujících příkazů do podokna dotazu a vyberte Spustit. Tím se definuje zásada pro přijímání streamování v tabulce, kterou jste vytvořili, nebo v databázi obsahující tabulku.
Tip
Zásada definovaná na úrovni databáze se vztahuje na všechny existující a budoucí tabulky v databázi.
Pro definování zásad pro tabulku, kterou jste vytvořili, použijte:
import (
"context"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go/kusto/ingest"
"github.com/Azure/go-autorest/autorest/azure/auth"
)
func ingest() {
clusterPath := "https://<clusterName>.kusto.windows.net"
appId := "<appId>"
appKey := "<appKey>"
appTenant := "<appTenant>"
dbName := "<dbName>"
tableName := "<tableName>"
mappingName := "<mappingName>" // Optional, can be nil
// Creates a Kusto Authorizer using your client identity, secret, and tenant identity.
// You may also uses other forms of authorization, see GoDoc > Authorization type.
// auth package is: "github.com/Azure/go-autorest/autorest/azure/auth"
authorizer := kusto.Authorization{
Config: auth.NewClientCredentialsConfig(appId, appKey, appTenant),
}
// Create a client
client, err := kusto.New(clusterPath, authorizer)
if err != nil {
panic("add error handling")
}
// Create an ingestion instance
// Pass the client, the name of the database, and the name of table you wish to ingest into.
in, err := ingest.New(client, dbName, tableName)
if err != nil {
panic("add error handling")
}
// Go currently only supports streaming from a byte array with a maximum size of 4 MB.
jsonEncodedData := []byte("{\"a\": 1, \"b\": 10}\n{\"a\": 2, \"b\": 20}")
// Ingestion from a stream commits blocks of fully formed data encodes (JSON, AVRO, ...) into Kusto:
if err := in.Stream(context.Background(), jsonEncodedData, ingest.JSON, mappingName); err != nil {
panic("add error handling")
}
}
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.result.OperationStatus;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import java.io.FileInputStream;
import java.io.InputStream;
public class FileIngestion {
public static void main(String[] args) throws Exception {
String clusterPath = "https://<clusterName>.kusto.windows.net";
String appId = "<appId>";
String appKey = "<appKey>";
String appTenant = "<appTenant>";
String dbName = "<dbName>";
String tableName = "<tableName>";
// Build connection string and initialize
ConnectionStringBuilder csb =
ConnectionStringBuilder.createWithAadApplicationCredentials(
clusterPath,
appId,
appKey,
appTenant
);
// Initialize client and its properties
IngestClient client = IngestClientFactory.createClient(csb);
IngestionProperties ingestionProperties =
new IngestionProperties(
dbName,
tableName
);
// Ingest from a compressed file
// Create Source info
InputStream zipInputStream = new FileInputStream("MyFile.gz");
StreamSourceInfo zipStreamSourceInfo = new StreamSourceInfo(zipInputStream);
// If the data is compressed
zipStreamSourceInfo.setCompressionType(CompressionType.gz);
// Ingest from stream
OperationStatus status = client.ingestFromStream(zipStreamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
}
}
Zakázat ingestování streamování v clusteru
Upozornění
Zakazování příjmu streamování může trvat několik hodin.
Než zapnete přijímání streamování v clusteru Azure Průzkumník dat, vyřaďte zásadu přijímání streamování ze všech relevantních tabulek a databází. Odstraněním zásad příjmu streamování se aktivuje změna uspořádání dat v clusteru Azure Průzkumník dat. Data ingestování streamování se přesunou z počátečního úložiště do trvalého úložiště v úložišti sloupců (rozsahy nebo horizontálních oddílů). Tento proces může trvat několik sekund až pár hodin v závislosti na množství dat v počátečním úložišti.
Vyřaďte zásadu ingestování streamování.
Zásady příjmu streamování můžete v jazyce C# Azure Portal nebo programově.
V Azure Portal přejděte do svého clusteru Azure Data Explorer a vyberte Dotaz.
Pokud chcete z tabulky vypustit zásady příjmu streamování, zkopírujte následující příkaz do podokna Dotazu a vyberte Spustit.
.delete table TestTable policy streamingingestion
V Nastavenívyberte Konfigurace.
V podokně Konfigurace vyberte Vypnuto a zakažtepříjem dat streamování.
Vyberte Uložit.
Pokud chcete z tabulky vypustit zásady příjmu streamování, spusťte následující kód:
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
namespace StreamingIngestion
{
class Program
{
static async Task Main(string[] args)
{
string clusterPath = "https://<clusterName>.kusto.windows.net";
string appId = "<appId>";
string appKey = "<appKey>";
string appTenant = "<appTenant>";
string dbName = "<dbName>";
string tableName = "<tableName>";
// Create Kusto connection string with App Authentication
var csb =
new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
var tablePolicyDropCommand = CslCommandGenerator.GenerateTableStreamingIngestionPolicyDropCommand(dbName, tableName);
using (var client = KustoClientFactory.CreateCslAdminProvider(csb))
{
client.ExecuteControlCommand(tablePolicyDropCommand);
}
}
}
}
Pokud chcete v clusteru zakázat příjem streamování, spusťte následující kód:
using System.Threading.Tasks;
using Microsoft.Azure.Management.Kusto; // Required package Microsoft.Azure.Management.Kusto
using Microsoft.Azure.Management.Kusto.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory; // Required package Microsoft.IdentityModel.Clients.ActiveDirectory
using Microsoft.Rest;
namespace StreamingIngestion
{
class Program
{
static async Task Main(string[] args)
{
string appId = "<appId>";
string appKey = "<appKey>";
string appTenant = "<appTenant>";
string clusterName = "<clusterName>";
string resourceGroupName = "<resourceGroupName>";
string subscriptionId = "<subscriptionId>";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{appTenant}");
var credential = new ClientCredential(appId, appKey);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var clusterUpdateParameters = new ClusterUpdate(enableStreamingIngest: false);
await kustoManagementClient.Clusters.UpdateAsync(resourceGroupName, clusterName, clusterUpdateParameters);
}
}
}
Omezení
Kurzory databáze nejsou podporované pro databázi, pokud má samotná databáze nebo kterákoli z jejích tabulek definované a povolené zásady příjmu streamování.
Mapování dat musí být předem vytvořená pro použití při příjmu streamování. Požadavky na příjem jednotlivých streamovaných dat nepohotoví s vloženými mapováními dat.
Značky rozsahu nelze nastavit u streamovaných dat pro příjem dat.
Aktualizujte zásady. Zásady aktualizace mohou odkazovat pouze na nově ingestovaná data ve zdrojové tabulce, a ne na žádná jiná data ani tabulky v databázi.
Pokud se pro libovolnou z tabulek databáze používá příjem streamování, nelze tuto databázi použít jako vedoucí databázi pro sledované databáze ani jako poskytovatele dat pro Azure Data Share.