Azure Veri Gezgini kümenizde akış alımı yapılandırma
Makale
Okumak için 10 dakika
Teşekkür ederiz.
Alma ve sorgu arasında düşük gecikme süresi gerektiğinde veri yüklemesi için akış alma faydalıdır. Aşağıdaki senaryolarda akış alma kullanmayı göz önünde bulundurun:
Bir saniyeden az gecikme süresi gereklidir.
Her tablo için veri akışının görece küçük (saniyede birkaç kayıt) olduğu, ancak genel veri alma birimi yüksek (saniyede binlerce kayıt) olduğu çok sayıda tablonun işlemsel işlenmesini iyileştirmek için.
Her bir tabloya veri akışı yüksekse (saat başına 4 GB 'tan fazla), Batchalımı kullanmayı düşünün.
Farklı alma yöntemleri hakkında daha fazla bilgi edinmek için bkz. veri alma genel bakış.
Uygun akış alma türünü seçin
İki akış alma türü desteklenir:
Giriş türü
Description
Olay Hub 'ı veya IoT Hub
Hub 'lar tablo akışı veri kaynakları olarak yapılandırılır. Bunları ayarlama hakkında daha fazla bilgi için bkz. Olay Hub 'ı veya IoT Hub veri alma yöntemleri.
Özel Alım
Özel Alım, Azure Veri Gezgini istemci kitaplıklarındanbirini kullanan bir uygulama yazmanızı gerektirir. Özel alımı yapılandırmak için bu konudaki bilgileri kullanın. C# akış alımı örnek uygulamasını da yararlı bulabilirsiniz.
Ortamınız için uygun olan alma türünü seçmenize yardımcı olması için aşağıdaki tabloyu kullanın:
Landı
Olay Hub 'ı/IoT Hub
Özel Alım
Sorgu başlatma ve sorgu için kullanılabilen veriler arasındaki veri gecikmesi
Daha uzun gecikme
Daha kısa gecikme
Geliştirme ek yükü
Hızlı ve kolay kurulum, geliştirme yükü yok
Veri alma, hataları işleme ve veri tutarlılığı sağlama gibi bir uygulama oluşturmak için yüksek geliştirme yükü
Not
Azure portal veya C# ' de programlı olarak, kümenizde akış alımını etkinleştirmek ve devre dışı bırakmak için işlemi yönetebilirsiniz. Özel uygulamanıziçin C# kullanıyorsanız, programlama yaklaşımını kullanarak daha kullanışlı bulabilirsiniz.
Önkoşullar
Azure aboneliği. Ücretsiz bir Azure hesabıoluşturun.
Performans ve operasyonel hususlar
Akış alımını etkileyebilecek ana katkıda bulunanlar şunlardır:
VM ve küme boyutu: artan VM ve küme boyutları ile akış alma performansı ve kapasitesi ölçekleme. Eş zamanlı alma isteklerinin sayısı, çekirdek başına altı ile sınırlıdır. Örneğin, D14 ve L16 gibi 16 çekirdekli SKU 'Lar için, desteklenen en fazla 96 yükü eşzamanlı Alım isteklerdedir. D11 gibi iki çekirdek SKU 'su için, en fazla desteklenen yük, 12 eşzamanlı Alım istemindedir.
Veri boyutu sınırı: akış alma isteği için veri boyutu SıNıRı 4 MB 'tır. Bu, alma sırasında güncelleştirme ilkeleri için oluşturulan tüm verileri içerir.
Şema güncelleştirmeleri: tablo ve giriş eşlemelerini oluşturma ve değiştirme gibi şema güncelleştirmeleri, akış alma hizmeti için beş dakikaya kadar sürebilir. Daha fazla bilgi için bkz. akış alma ve şema değişiklikleri.
SSD kapasitesi: veri akışı aracılığıyla yapılmasa bile, küme üzerinde akış alımı etkinleştirme, verileri akışa alma için küme makinelerinin yerel SSD disk bir kısmını kullanır ve etkin önbellek için kullanılabilir depolama alanını azaltır.
Aşağıdaki komutlardan birini sorgu bölmesine kopyalayın ve Çalıştır' ı seçin. Bu, oluşturduğunuz tabloda veya tabloyu içeren veritabanında akış alma ilkesini tanımlar.
İpucu
Veritabanı düzeyinde tanımlanan bir ilke, veritabanındaki tüm mevcut ve gelecekteki tablolar için geçerlidir.
İlkeyi oluşturduğunuz tabloda tanımlamak için şunu kullanın:
import (
"context"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go/kusto/ingest"
"github.com/Azure/go-autorest/autorest/azure/auth"
)
func ingest() {
clusterPath := "https://<clusterName>.kusto.windows.net"
appId := "<appId>"
appKey := "<appKey>"
appTenant := "<appTenant>"
dbName := "<dbName>"
tableName := "<tableName>"
mappingName := "<mappingName>" // Optional, can be nil
// Creates a Kusto Authorizer using your client identity, secret, and tenant identity.
// You may also uses other forms of authorization, see GoDoc > Authorization type.
// auth package is: "github.com/Azure/go-autorest/autorest/azure/auth"
authorizer := kusto.Authorization{
Config: auth.NewClientCredentialsConfig(appId, appKey, appTenant),
}
// Create a client
client, err := kusto.New(clusterPath, authorizer)
if err != nil {
panic("add error handling")
}
// Create an ingestion instance
// Pass the client, the name of the database, and the name of table you wish to ingest into.
in, err := ingest.New(client, dbName, tableName)
if err != nil {
panic("add error handling")
}
// Go currently only supports streaming from a byte array with a maximum size of 4 MB.
jsonEncodedData := []byte("{\"a\": 1, \"b\": 10}\n{\"a\": 2, \"b\": 20}")
// Ingestion from a stream commits blocks of fully formed data encodes (JSON, AVRO, ...) into Kusto:
if err := in.Stream(context.Background(), jsonEncodedData, ingest.JSON, mappingName); err != nil {
panic("add error handling")
}
}
import com.microsoft.azure.kusto.data.auth.ConnectionStringBuilder;
import com.microsoft.azure.kusto.ingest.IngestClient;
import com.microsoft.azure.kusto.ingest.IngestClientFactory;
import com.microsoft.azure.kusto.ingest.IngestionProperties;
import com.microsoft.azure.kusto.ingest.result.OperationStatus;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import java.io.FileInputStream;
import java.io.InputStream;
public class FileIngestion {
public static void main(String[] args) throws Exception {
String clusterPath = "https://<clusterName>.kusto.windows.net";
String appId = "<appId>";
String appKey = "<appKey>";
String appTenant = "<appTenant>";
String dbName = "<dbName>";
String tableName = "<tableName>";
// Build connection string and initialize
ConnectionStringBuilder csb =
ConnectionStringBuilder.createWithAadApplicationCredentials(
clusterPath,
appId,
appKey,
appTenant
);
// Initialize client and its properties
IngestClient client = IngestClientFactory.createClient(csb);
IngestionProperties ingestionProperties =
new IngestionProperties(
dbName,
tableName
);
// Ingest from a compressed file
// Create Source info
InputStream zipInputStream = new FileInputStream("MyFile.gz");
StreamSourceInfo zipStreamSourceInfo = new StreamSourceInfo(zipInputStream);
// If the data is compressed
zipStreamSourceInfo.setCompressionType(CompressionType.gz);
// Ingest from stream
OperationStatus status = client.ingestFromStream(zipStreamSourceInfo, ingestionProperties).getIngestionStatusCollection().get(0).status;
}
}
Kümenizde akış alımını devre dışı bırakma
Uyarı
Akış alımını devre dışı bırakmak birkaç saat sürebilir.
Azure Veri Gezgini kümenizde akış alma 'yı devre dışı bırakmadan önce, tüm ilgili tablo ve veritabanlarından akış alma ilkesini bırakın. Akış alma ilkesinin kaldırılması, Azure Veri Gezgini kümenizin içinde veri yeniden düzenlemesini tetikler. Akış alma verileri, ilk depolamadan sütun deposundaki (kapsamlar veya parçalar) kalıcı depolamaya taşınır. Bu işlem, ilk depolama alanındaki veri miktarına bağlı olarak birkaç saniye ila birkaç saat arasında sürebilir.
Akış alma ilkesini bırakma
C# içinde program aracılığıyla veya akış alımı Azure portal kullanarak akışın alımını atabilirsiniz.
Kümede Azure portal kümenize gidin Azure Veri Gezgini'yi seçin.
Akış alımı ilkesi tablodan bırakmak için aşağıdaki komutu Sorgu bölmesine kopyalayın ve Çalıştır'ıseçin.
.delete table TestTable policy streamingingestion
Bu AyarlarYapılandırmalar'ı seçin.
Yapılandırmalar bölmesinde, Akış alımını devre dışı bırakmak için Kapalı'ya tıklayın.
Kaydet’i seçin.
Akış alımı ilkesi tablodan bırakmak için aşağıdaki kodu çalıştırın:
using System.Threading.Tasks;
using Kusto.Data; // Requires Package Microsoft.Azure.Kusto.Data
using Kusto.Data.Common;
using Kusto.Data.Net.Client;
namespace StreamingIngestion
{
class Program
{
static async Task Main(string[] args)
{
string clusterPath = "https://<clusterName>.kusto.windows.net";
string appId = "<appId>";
string appKey = "<appKey>";
string appTenant = "<appTenant>";
string dbName = "<dbName>";
string tableName = "<tableName>";
// Create Kusto connection string with App Authentication
var csb =
new KustoConnectionStringBuilder(clusterPath)
.WithAadApplicationKeyAuthentication(
applicationClientId: appId,
applicationKey: appKey,
authority: appTenant
);
var tablePolicyDropCommand = CslCommandGenerator.GenerateTableStreamingIngestionPolicyDropCommand(dbName, tableName);
using (var client = KustoClientFactory.CreateCslAdminProvider(csb))
{
client.ExecuteControlCommand(tablePolicyDropCommand);
}
}
}
}
Kümenize akış alımını devre dışı bırakmak için aşağıdaki kodu çalıştırın:
using System.Threading.Tasks;
using Microsoft.Azure.Management.Kusto; // Required package Microsoft.Azure.Management.Kusto
using Microsoft.Azure.Management.Kusto.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory; // Required package Microsoft.IdentityModel.Clients.ActiveDirectory
using Microsoft.Rest;
namespace StreamingIngestion
{
class Program
{
static async Task Main(string[] args)
{
string appId = "<appId>";
string appKey = "<appKey>";
string appTenant = "<appTenant>";
string clusterName = "<clusterName>";
string resourceGroupName = "<resourceGroupName>";
string subscriptionId = "<subscriptionId>";
var authenticationContext = new AuthenticationContext($"https://login.windows.net/{appTenant}");
var credential = new ClientCredential(appId, appKey);
var result = await authenticationContext.AcquireTokenAsync(resource: "https://management.core.windows.net/", clientCredential: credential);
var credentials = new TokenCredentials(result.AccessToken, result.AccessTokenType);
var kustoManagementClient = new KustoManagementClient(credentials)
{
SubscriptionId = subscriptionId
};
var clusterUpdateParameters = new ClusterUpdate(enableStreamingIngest: false);
await kustoManagementClient.Clusters.UpdateAsync(resourceGroupName, clusterName, clusterUpdateParameters);
}
}
}
Sınırlamalar
Veritabanının kendisi veya tablolarının herhangi biri Akış alımı ilkesi tanımlandı ve etkinleştirildiyse veritabanı imleçleri bir veritabanı için desteklenmiyor.
Veri eşlemeleri,akış alımında kullanmak için önceden oluşturulmuş olması gerekir. Tek tek akış alımı istekleri satır içi veri eşlemeleri için uygun değildir.
İlkeyi güncelleştirin. Güncelleştirme ilkesi yalnızca kaynak tablodaki yeni alınan verilere başvurarak veritabanındaki diğer verilere veya tablolara başvurabilirsiniz.
Akış alımı, veritabanının tablolarının herhangi biri üzerinde kullanılıyorsa, bu veritabanı takip veritabanları için lider olarak veya veritabanı için bir veri sağlayıcısı Azure Veri Paylaşımı.