Rychlé zprovoznění: Vytvoření datové továrny a kanálu pomocí sady .NET SDK
PLATÍ PRO:
Azure Data Factory
Azure Synapse Analytics
Tento rychlý start popisuje, jak pomocí sady .NET SDK vytvořit Azure Data Factory. Kanál, který vytvoříte v této datové továrně, kopíruje data z jedné složky do jiné složky v úložišti objektů blob v Azure. Kurz transformace dat pomocí Azure Data Factory najdete v tématu Kurz: Transformace dat pomocí Sparku.
Poznámka
Tento článek neposkytuje podrobný úvod do služby Data Factory. Úvod do služby Azure Data Factory najdete v tématu Úvod do Azure Data Factory.
Požadavky
Předplatné Azure
Pokud ještě nemáte předplatné Azure, vytvořte si napřed bezplatný účet.
Role Azure
Pro vytvoření instancí služby Data Factory musí být uživatelský účet, který použijete pro přihlášení k Azure, členem role přispěvatel nebo vlastník nebo správcem předplatného Azure. Pokud chcete zobrazit oprávnění, která máte v rámci předplatného, klikněte na Azure Portal, v pravém horním rohu vyberte své uživatelské jméno, pro další možnosti vyberte ikonu ... a pak vyberte Moje oprávnění. Pokud máte přístup k několika předplatným, vyberte odpovídající předplatné.
Při vytváření a správě podřízených prostředků pro službu Data Factory, včetně datových sad, propojených služeb, kanálů, triggerů a prostředí Integration Runtime, platí následující požadavky:
- K vytváření a správě podřízených prostředků v Azure Portal musíte patřit do role přispěvatel Data Factory na úrovni skupiny prostředků nebo výše.
- Pro vytváření a správu podřízených prostředků pomocí PowerShellu nebo sady SDK na úrovni prostředku nebo vyšší je dostatečná role Přispěvatel.
Ukázku pokynů pro přidání uživatele do role najdete v článku věnovaném přidávání rolí.
Další informace najdete v následujících článcích:
Účet služby Azure Storage
V tomto rychlém startu použijete účet pro obecné účely Azure Storage (konkrétně úložiště objektů BLOB) jako zdrojové i cílové úložiště dat. Pokud nemáte účet pro obecné účely Azure Storage, přečtěte si článek Vytvoření účtu úložiště a vytvořte si ho.
Získání názvu účtu úložiště
Pro účely tohoto rychlého startu potřebujete název účtu Azure Storage. Následující postup popisuje kroky pro získání názvu účtu úložiště:
- Ve webovém prohlížeči přejdete na Azure Portal a přihlaste se pomocí uživatelského jména a hesla Azure.
- V nabídce Azure Portal vyberte všechny služby a pak vyberte > účty úložiště úložiště. Můžete také vyhledat a vybrat účty úložiště z libovolné stránky.
- Na stránce účty úložiště vyfiltrujte váš účet úložiště (Pokud je to potřeba) a pak vyberte svůj účet úložiště.
Můžete také vyhledat a vybrat účty úložiště z libovolné stránky.
Vytvoření kontejneru objektů blob
V této části vytvoříte v úložišti objektů blob v Azure kontejner objektů blob s názvem adftutorial.
Na stránce účet úložiště vyberte přehledové > kontejnery.
Na <Account name> - panelu nástrojů stránky kontejnerů vyberte kontejner.
V dialogovém okně Nový kontejner jako název zadejte adftutorial a pak vyberte OK. <Account name> - Stránka kontejnery je aktualizována tak, aby zahrnovala adftutorial v seznamu kontejnerů.
Přidání vstupní složky a souboru pro kontejner objektů BLOB
V této části vytvoříte ve vytvořeném kontejneru složku s názvem input a nahrajete do ní ukázkový soubor. Než začnete, otevřete textový editor, jako je například Poznámkový blok, a vytvořte soubor s názvem emp.txt s následujícím obsahem:
John, Doe
Jane, Doe
Uložte soubor do složky C:\ADFv2QuickStartPSH . (Pokud složka ještě neexistuje, vytvořte ji.) Pak se vraťte do Azure Portal a proveďte následující kroky:
Na <Account name> - stránce kontejnery , kde jste skončili, vyberte adftutorial z aktualizovaného seznamu kontejnerů.
- Pokud jste okno zavřeli nebo jste přešli na jinou stránku, přihlaste se k Azure Portal znovu.
- V nabídce Azure Portal vyberte všechny služby a pak vyberte > účty úložiště úložiště. Můžete také vyhledat a vybrat účty úložiště z libovolné stránky.
- Vyberte svůj účet úložiště a pak vyberte kontejnery > adftutorial.
Na panelu nástrojů na stránce kontejneru adftutorial vyberte nahrát.
Na stránce nahrát objekt BLOB vyberte pole soubory a pak vyhledejte a vyberte soubor emp.txt .
Rozbalte nadpis Upřesnit . Stránka se teď zobrazí, jak je znázorněno na následujícím obrázku:
Do pole Odeslat do složky zadejte Input ( vstup).
Vyberte tlačítko Nahrát. Měli byste vidět soubor emp.txt a stav nahrávání v seznamu.
Vyberte ikonu Zavřít ( X) pro zavření stránky nahrát objekt BLOB .
Nechejte stránku kontejneru adftutorial otevřenou. Použijete ji k ověření výstupu na konci tohoto rychlého startu.
Visual Studio
Návod v tomto článku používá Visual Studio 2019. Postupy pro Visual Studio 2013, 2015 nebo 2017 se mírně liší.
Vytvoření aplikace v Azure Active Directory
V částech Postupy: Vytvoření aplikace Azure AD a instančního objektu s přístupem k prostředkům pomocí portálu postupujte podle pokynů k těmto úlohám:
- V části Azure Active Directory aplikacevytvořte aplikaci, která představuje aplikaci .NET, kterou vytváříte v tomto kurzu. Jako přihlašovací adresu URL můžete poskytnout fiktivní URL, jak ukazuje článek (
https://contoso.org/exampleapp). - V části Získat hodnoty pro přihlášenízískejte ID aplikace a ID tenanta a poznamenejte si tyto hodnoty, které použijete později v tomto kurzu.
- V části Certifikáty a tajnéklíče získejte ověřovací klíč a poznamenejte si tuto hodnotu, kterou použijete později v tomto kurzu.
- V části Přiřadit aplikaci k rolipřiřaďte aplikaci roli Přispěvatel na úrovni předplatného, aby aplikace v předplatném vytvořila datové továrny.
Vytvoření projektu ve Visual Studiu
Dále vytvořte konzolovou aplikaci C# .NET v Visual Studio:
- Spusťte Visual Studio.
- V okně Start vyberte Vytvořit nový projekt > Konzolová aplikace (.NET Framework). Vyžaduje se .NET verze 4.5.2 nebo novější.
- Do Project zadejte ADFv2QuickStart.
- Vyberte Vytvořit a vytvořte projekt.
Instalace balíčků NuGet
Vyberte Nástroje > Správce balíčků NuGet > Konzola správce balíčků.
V podokně Správce balíčků konzoly nainstalujte balíčky spuštěním následujících příkazů. Další informace najdete v balíčku NuGet Microsoft.Azure.Management.DataFactory.
Install-Package Microsoft.Azure.Management.DataFactory Install-Package Microsoft.Azure.Management.ResourceManager -IncludePrerelease Install-Package Microsoft.IdentityModel.Clients.ActiveDirectory
Vytvoření klienta datové továrny
Otevřete soubor Program.cs a vložte do něj následující příkazy. Přidáte tak odkazy na obory názvů.
using System; using System.Collections.Generic; using System.Linq; using Microsoft.Rest; using Microsoft.Rest.Serialization; using Microsoft.Azure.Management.ResourceManager; using Microsoft.Azure.Management.DataFactory; using Microsoft.Azure.Management.DataFactory.Models; using Microsoft.IdentityModel.Clients.ActiveDirectory;Do metody Main přidejte následující kód, který nastaví proměnné. Zástupné symboly nahraďte vlastními hodnotami. Pokud chcete zobrazit seznam oblastí Azure, ve kterých je služba Data Factory aktuálně dostupná, na následující stránce vyberte oblasti, které vás zajímají, pak rozbalte Analýza a vyhledejte Data Factory:Dostupné produkty v jednotlivých oblastech. Úložiště dat (Azure Storage, Azure SQL Database a další) a výpočetní prostředí (HDInsight a další) používaná datovou továrnou mohou být v jiných oblastech.
// Set variables string tenantID = "<your tenant ID>"; string applicationId = "<your application ID>"; string authenticationKey = "<your authentication key for the application>"; string subscriptionId = "<your subscription ID where the data factory resides>"; string resourceGroup = "<your resource group where the data factory resides>"; string region = "<the location of your resource group>"; string dataFactoryName = "<specify the name of data factory to create. It must be globally unique.>"; string storageAccount = "<your storage account name to copy data>"; string storageKey = "<your storage account key>"; // specify the container and input folder from which all files // need to be copied to the output folder. string inputBlobPath = "<path to existing blob(s) to copy data from, e.g. containername/inputdir>"; //specify the contains and output folder where the files are copied string outputBlobPath = "<the blob path to copy data to, e.g. containername/outputdir>"; // name of the Azure Storage linked service, blob dataset, and the pipeline string storageLinkedServiceName = "AzureStorageLinkedService"; string blobDatasetName = "BlobDataset"; string pipelineName = "Adfv2QuickStartPipeline";
Poznámka
Pro suverénní cloudy musíte použít odpovídající koncové body specifické pro cloud pro ActiveDirectoryAuthority a ResourceManagerUrl (BaseUri). Například v oblasti Azure Gov v USA byste místo a místo použijte autoritu a pak vytvořili klienta pro správu https://login.microsoftonline.us https://login.microsoftonline.com datové https://management.usgovcloudapi.net https://management.azure.com/ továrny. Pomocí PowerShellu můžete snadno získat adresy URL koncových bodů pro různé cloudy spuštěním rutiny Get-AzEnvironment | Format-List, který vrátí seznam koncových bodů pro každé cloudové prostředí.
Do metody Main přidejte následující kód, který vytvoří instanci třídy DataFactoryManagementClient. Tento objekt použijete k vytvoření datové továrny, propojené služby, datových sad a kanálu. Použijete ho také k monitorování podrobných informací o spuštění kanálu.
// Authenticate and create a data factory management client var context = new AuthenticationContext("https://login.microsoftonline.com/" + tenantID); ClientCredential cc = new ClientCredential(applicationId, authenticationKey); AuthenticationResult result = context.AcquireTokenAsync( "https://management.azure.com/", cc).Result; ServiceClientCredentials cred = new TokenCredentials(result.AccessToken); var client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };
Vytvoření datové továrny
Do metody Main přidejte následující kód, který vytvoří datovou továrnu.
// Create a data factory
Console.WriteLine("Creating data factory " + dataFactoryName + "...");
Factory dataFactory = new Factory
{
Location = region,
Identity = new FactoryIdentity()
};
client.Factories.CreateOrUpdate(resourceGroup, dataFactoryName, dataFactory);
Console.WriteLine(
SafeJsonConvert.SerializeObject(dataFactory, client.SerializationSettings));
while (client.Factories.Get(resourceGroup, dataFactoryName).ProvisioningState ==
"PendingCreation")
{
System.Threading.Thread.Sleep(1000);
}
Vytvoření propojené služby
Do metody Main přidejte následující kód, který vytvoří propojenou službu Azure Storage.
V datové továrně vytvoříte propojené služby, abyste svá úložiště dat a výpočetní služby spojili s datovou továrnou. V tomto rychlém startu potřebujete vytvořit pouze jednu propojenou službu Azure Storage pro zdroj kopírování Azure Storage úložiště jímky. V ukázce má název AzureStorageLinkedService.
// Create an Azure Storage linked service
Console.WriteLine("Creating linked service " + storageLinkedServiceName + "...");
LinkedServiceResource storageLinkedService = new LinkedServiceResource(
new AzureStorageLinkedService
{
ConnectionString = new SecureString(
"DefaultEndpointsProtocol=https;AccountName=" + storageAccount +
";AccountKey=" + storageKey)
}
);
client.LinkedServices.CreateOrUpdate(
resourceGroup, dataFactoryName, storageLinkedServiceName, storageLinkedService);
Console.WriteLine(SafeJsonConvert.SerializeObject(
storageLinkedService, client.SerializationSettings));
Vytvoření datové sady
Do metody Main přidejte následující kód, který vytvoří datovou sadu objektů blob Azure.
Definujete datovou sadu, která představuje data pro kopírování ze zdroje do jímky. Tato datová sada objektů blob v tomto příkladu odkazuje na propojenou službu Azure Storage, kterou jste vytvořili v předchozím kroku: Datová sada přebírá parametr, jehož hodnota je nastavená v aktivitě, která tuto datovou sadu využívá. Parametr se používá k vytvoření cesty ke složce folderPath odkazující na místo, kde se data nacházejí nebo kde jsou uložená.
// Create an Azure Blob dataset
Console.WriteLine("Creating dataset " + blobDatasetName + "...");
DatasetResource blobDataset = new DatasetResource(
new AzureBlobDataset
{
LinkedServiceName = new LinkedServiceReference
{
ReferenceName = storageLinkedServiceName
},
FolderPath = new Expression { Value = "@{dataset().path}" },
Parameters = new Dictionary<string, ParameterSpecification>
{
{ "path", new ParameterSpecification { Type = ParameterType.String } }
}
}
);
client.Datasets.CreateOrUpdate(
resourceGroup, dataFactoryName, blobDatasetName, blobDataset);
Console.WriteLine(
SafeJsonConvert.SerializeObject(blobDataset, client.SerializationSettings));
Vytvoření kanálu
Do metody Main přidejte následující kód, který vytvoří kanál s aktivitou kopírování.
V tomto příkladu tento kanál obsahuje jednu aktivitu a přebírá dva parametry: vstupní cestu k objektu blob a výstupní cestu k objektu blob. Hodnoty pro tyto parametry se nastaví při aktivaci nebo spuštění kanálu. Aktivita kopírování odkazuje na stejnou datovou sadu objektů blob, kterou jste vytvořili v předchozím kroku jako vstup a výstup. Když se tato datová sada použije jako vstupní, zadá se vstupní cesta. A když se tato datová sada použije jako výstupní, zadá se výstupní cesta.
// Create a pipeline with a copy activity
Console.WriteLine("Creating pipeline " + pipelineName + "...");
PipelineResource pipeline = new PipelineResource
{
Parameters = new Dictionary<string, ParameterSpecification>
{
{ "inputPath", new ParameterSpecification { Type = ParameterType.String } },
{ "outputPath", new ParameterSpecification { Type = ParameterType.String } }
},
Activities = new List<Activity>
{
new CopyActivity
{
Name = "CopyFromBlobToBlob",
Inputs = new List<DatasetReference>
{
new DatasetReference()
{
ReferenceName = blobDatasetName,
Parameters = new Dictionary<string, object>
{
{ "path", "@pipeline().parameters.inputPath" }
}
}
},
Outputs = new List<DatasetReference>
{
new DatasetReference
{
ReferenceName = blobDatasetName,
Parameters = new Dictionary<string, object>
{
{ "path", "@pipeline().parameters.outputPath" }
}
}
},
Source = new BlobSource { },
Sink = new BlobSink { }
}
}
};
client.Pipelines.CreateOrUpdate(resourceGroup, dataFactoryName, pipelineName, pipeline);
Console.WriteLine(SafeJsonConvert.SerializeObject(pipeline, client.SerializationSettings));
Vytvoření spuštění kanálu
Do metody Main přidejte následující kód, který aktivuje spuštění kanálu.
Tento kód také nastaví hodnoty parametrů inputPath a outputPath zadaných v kanálu se skutečnými hodnotami cest k objektům blob zdroje a jímky.
// Create a pipeline run
Console.WriteLine("Creating pipeline run...");
Dictionary<string, object> parameters = new Dictionary<string, object>
{
{ "inputPath", inputBlobPath },
{ "outputPath", outputBlobPath }
};
CreateRunResponse runResponse = client.Pipelines.CreateRunWithHttpMessagesAsync(
resourceGroup, dataFactoryName, pipelineName, parameters: parameters
).Result.Body;
Console.WriteLine("Pipeline run ID: " + runResponse.RunId);
Monitorování spuštění kanálu
Do metody Main přidejte následující kód, který nepřetržitě kontroluje stav, dokud se kopírování dat nedokončí.
// Monitor the pipeline run Console.WriteLine("Checking pipeline run status..."); PipelineRun pipelineRun; while (true) { pipelineRun = client.PipelineRuns.Get( resourceGroup, dataFactoryName, runResponse.RunId); Console.WriteLine("Status: " + pipelineRun.Status); if (pipelineRun.Status == "InProgress" || pipelineRun.Status == "Queued") System.Threading.Thread.Sleep(15000); else break; }Do metody Main přidejte následující kód, který načte podrobnosti o spuštění aktivity kopírování, jako je například velikost přečtena nebo zapsána data.
// Check the copy activity run details Console.WriteLine("Checking copy activity run details..."); RunFilterParameters filterParams = new RunFilterParameters( DateTime.UtcNow.AddMinutes(-10), DateTime.UtcNow.AddMinutes(10)); ActivityRunsQueryResponse queryResponse = client.ActivityRuns.QueryByPipelineRun( resourceGroup, dataFactoryName, runResponse.RunId, filterParams); if (pipelineRun.Status == "Succeeded") Console.WriteLine(queryResponse.Value.First().Output); else Console.WriteLine(queryResponse.Value.First().Error); Console.WriteLine("\nPress any key to exit..."); Console.ReadKey();
Spuštění kódu
Sestavte a spusťte aplikaci a potom ověřte spuštění kanálu.
Konzola vytiskne průběh vytváření datové továrny, propojených služeb, datových sad, kanálu a spuštění kanálu. Potom zkontroluje stav spuštění kanálu. Počkejte, dokud neuvidíte podrobnosti o spuštění aktivity kopírování s velikostí dat pro čtení a zápis. Pak pomocí nástrojů, jako Průzkumník služby Azure Storage, zkontrolujte, jestli se objekty blob zkopírované do outputBlobPath z inputBlobPath, jak jste zadali v proměnných.
Ukázkový výstup
Creating data factory SPv2Factory0907...
{
"identity": {
"type": "SystemAssigned"
},
"location": "East US"
}
Creating linked service AzureStorageLinkedService...
{
"properties": {
"type": "AzureStorage",
"typeProperties": {
"connectionString": {
"value": "DefaultEndpointsProtocol=https;AccountName=<storageAccountName>;AccountKey=<storageAccountKey>",
"type": "SecureString"
}
}
}
}
Creating dataset BlobDataset...
{
"properties": {
"type": "AzureBlob",
"typeProperties": {
"folderPath": {
"value": "@{dataset().path}",
"type": "Expression"
}
},
"linkedServiceName": {
"referenceName": "AzureStorageLinkedService",
"type": "LinkedServiceReference"
},
"parameters": {
"path": {
"type": "String"
}
}
}
}
Creating pipeline Adfv2QuickStartPipeline...
{
"properties": {
"activities": [
{
"type": "Copy",
"typeProperties": {
"source": {
"type": "BlobSource"
},
"sink": {
"type": "BlobSink"
}
},
"inputs": [
{
"referenceName": "BlobDataset",
"parameters": {
"path": "@pipeline().parameters.inputPath"
},
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "BlobDataset",
"parameters": {
"path": "@pipeline().parameters.outputPath"
},
"type": "DatasetReference"
}
],
"name": "CopyFromBlobToBlob"
}
],
"parameters": {
"inputPath": {
"type": "String"
},
"outputPath": {
"type": "String"
}
}
}
}
Creating pipeline run...
Pipeline run ID: 308d222d-3858-48b1-9e66-acd921feaa09
Checking pipeline run status...
Status: InProgress
Status: InProgress
Checking copy activity run details...
{
"dataRead": 331452208,
"dataWritten": 331452208,
"copyDuration": 23,
"throughput": 14073.209,
"errors": [],
"effectiveIntegrationRuntime": "DefaultIntegrationRuntime (West US)",
"usedDataIntegrationUnits": 2,
"billedDuration": 23
}
Press any key to exit...
Ověření výstupu
Kanál automaticky vytvoří výstupní složku v kontejneru objektů blob adftutorial. Potom zkopíruje soubor emp.txt ze vstupní složky do výstupní složky.
- Na stránce Azure Portal kontejneru adftutorial, na které jste se zastavili v části Přidání vstupní složky a souboru pro kontejner objektů blob výše vyberte Aktualizovat, abyste viděli výstupní složku.
- V seznamu složek vyberte output.
- Potvrďte, že je do výstupní složky zkopírovaný soubor emp.txt.
Vyčištění prostředků
Pokud chcete datovou továrnu odstranit programově, přidejte do programu následující řádky kódu:
Console.WriteLine("Deleting the data factory");
client.Factories.Delete(resourceGroup, dataFactoryName);
Další kroky
Kanál v této ukázce kopíruje data z jednoho umístění do jiného umístění v úložišti objektů blob Azure. Projděte si kurzy, kde se dozvíte o použití služby Data Factory ve více scénářích.