Management .NET SDK: impostare ed eseguire processi di analisi tramite l'API di Analisi di flusso di Azure per .NET

Informazioni su come impostare ed eseguire processi di analisi tramite l'API di Analisi di flusso per .NET usando Management .NET SDK. Impostare un progetto, creare origini di input e output, trasformazioni e avviare e arrestare i processi. Per i processi di analisi, è possibile trasmettere i dati di flusso dall'archiviazione BLOB o da un hub eventi.

Vedere la documentazione di riferimento sulla gestione per l'API di Analisi di flusso per .NET.

Analisi dei flussi di Azure è un servizio completamente gestito che consente l'elaborazione di eventi complessi con bassa latenza, elevata disponibilità e scalabilità per lo streaming di dati nel cloud. Analisi di flusso consente ai clienti di configurare processi di flusso per analizzare i flussi di dati e di condurre operazioni di analisi pressoché in tempo reale.

Nota

Il codice di esempio in questo articolo è stato aggiornato con la versione v2.x .NET SDK per la gestione di Analisi di flusso di Azure. Per il codice di esempio che usa la versione legacy (1.x) SDK, vedere Usare Management .NET SDK v1.x per Analisi di flusso.

Prerequisiti

Per eseguire le procedure descritte nell'articolo è necessario:

  • Installare Visual Studio 2017 o 2015.
  • Scaricare e installare Azure .NET SDK.
  • Creare un gruppo di risorse di Azure nella sottoscrizione. Di seguito è riportato un esempio di script di Azure PowerShell: Per informazioni su Azure PowerShell , vedere Installare e configurare Azure PowerShell;

      # Log in to your Azure account
      Add-AzureAccount
    
      # Select the Azure subscription you want to use to create the resource group
      Select-AzureSubscription -SubscriptionName <subscription name>
    
          # If Stream Analytics has not been registered to the subscription, remove the remark symbol (#) to run the Register-AzureRMProvider cmdlet to register the provider namespace
          #Register-AzureRMProvider -Force -ProviderNamespace 'Microsoft.StreamAnalytics'
    
      # Create an Azure resource group
      New-AzureResourceGroup -Name <YOUR RESOURCE GROUP NAME> -Location <LOCATION>
    
  • Configurare un'origine di input e una destinazione di output da usare. Per altre istruzioni, vedere Aggiungere input per impostare un esempio di input e Aggiungere output per impostare un esempio di output.

Configurare un progetto

Per creare un processo di analisi usando l'API di Analisi di flusso per .NET, configurare prima il progetto.

  1. Creare un'applicazione console .NET di Visual Studio C#.
  2. Nella Console di Gestione pacchetti, eseguire i comandi seguenti per installare i pacchetti NuGet. Il primo è l'SDK per .NET di Analisi di flusso di Azure. Il secondo è per l'autenticazione del client di Azure.

     Install-Package Microsoft.Azure.Management.StreamAnalytics -Version 2.0.0
     Install-Package Microsoft.Rest.ClientRuntime.Azure.Authentication -Version 2.3.1
    
  3. Aggiungere la sezione appSettings seguente al file App.config:

     <appSettings>
       <add key="ClientId" value="1950a258-227b-4e31-a9cf-717495945fc2" />
       <add key="RedirectUri" value="urn:ietf:wg:oauth:2.0:oob" />
       <add key="SubscriptionId" value="YOUR SUBSCRIPTION ID" />
       <add key="ActiveDirectoryTenantId" value="YOUR TENANT ID" />
     </appSettings>
    

    Sostituire i valori per SubscriptionId e ActiveDirectoryTenantId con gli ID della sottoscrizione di Azure e del tenant. È possibile ottenere questi valori eseguendo il cmdlet Azure PowerShell seguente:

     Get-AzureAccount
    
  4. Aggiungere il riferimento seguente nel file con estensione csproj:

     <Reference Include="System.Configuration" />
    
  5. Aggiungere le istruzioni using seguenti al file di origine (Program.cs) nel progetto.

     using System;
     using System.Collections.Generic;
     using System.Configuration;
     using System.Threading;
     using System.Threading.Tasks;
    
     using Microsoft.Azure.Management.StreamAnalytics;
     using Microsoft.Azure.Management.StreamAnalytics.Models;
     using Microsoft.Rest.Azure.Authentication;
     using Microsoft.Rest;
    
  6. Aggiungere un metodo helper di autenticazione:

    private static async Task<ServiceClientCredentials> GetCredentials()
    {
        var activeDirectoryClientSettings = ActiveDirectoryClientSettings.UsePromptOnly(ConfigurationManager.AppSettings["ClientId"], new Uri("urn:ietf:wg:oauth:2.0:oob"));
        ServiceClientCredentials credentials = await UserTokenProvider.LoginWithPromptAsync(ConfigurationManager.AppSettings["ActiveDirectoryTenantId"], activeDirectoryClientSettings);
    
        return credentials;
     }
    

Creare un client di gestione di Analisi di flusso.

Un oggetto StreamAnalyticsManagementClient consente di gestire il processo e i componenti di processo, ad esempio l'input, l'output e la trasformazione.

Aggiungere il codice seguente all'inizio del metodo Main .

 string resourceGroupName = "<YOUR AZURE RESOURCE GROUP NAME>";
 string streamingJobName = "<YOUR STREAMING JOB NAME>";
 string inputName = "<YOUR JOB INPUT NAME>";
 string transformationName = "<YOUR JOB TRANSFORMATION NAME>";
 string outputName = "<YOUR JOB OUTPUT NAME>";

 SynchronizationContext.SetSynchronizationContext(new SynchronizationContext());

 // Get credentials
 ServiceClientCredentials credentials = GetCredentials().Result;

 // Create Stream Analytics management client
 StreamAnalyticsManagementClient streamAnalyticsManagementClient = new StreamAnalyticsManagementClient(credentials)
 {
     SubscriptionId = ConfigurationManager.AppSettings["SubscriptionId"]
 };

Il valore della variabile resourceGroupName deve essere lo stesso del nome del gruppo di risorse creato o selezionato nei passaggi preliminari.

Per automatizzare l'aspetto di presentazione delle credenziali della creazione del processo, fare riferimento a Autenticazione di un'entità servizio con Gestione risorse di Azure.

Nelle sezioni rimanenti di questo articolo si presuppone che il codice sia all'inizio del metodo Main .

Creare un processo di Analisi di flusso.

Il codice seguente crea un processo di Analisi di flusso nel gruppo di risorse che è stato definito. Verranno aggiunti un input, un output e la trasformazione al processo in un secondo momento.

// Create a streaming job
StreamingJob streamingJob = new StreamingJob()
{
    Tags = new Dictionary<string, string>()
    {
        { "Origin", ".NET SDK" },
        { "ReasonCreated", "Getting started tutorial" }
    },
    Location = "West US",
    EventsOutOfOrderPolicy = EventsOutOfOrderPolicy.Drop,
    EventsOutOfOrderMaxDelayInSeconds = 5,
    EventsLateArrivalMaxDelayInSeconds = 16,
    OutputErrorPolicy = OutputErrorPolicy.Drop,
    DataLocale = "en-US",
    CompatibilityLevel = CompatibilityLevel.OneFullStopZero,
    Sku = new Sku()
    {
        Name = SkuName.Standard
    }
};
StreamingJob createStreamingJobResult = streamAnalyticsManagementClient.StreamingJobs.CreateOrReplace(streamingJob, resourceGroupName, streamingJobName);

Creare un'origine di input di Analisi di flusso.

Il codice seguente crea un'origine di input di Analisi di flusso con il tipo di origine di input BLOB e la serializzazione CSV. Per creare un'origine di input di hub eventi, usare EventHubStreamInputDataSource anziché BlobStreamInputDataSource. Analogamente, è possibile personalizzare il tipo di serializzazione dell'origine di input.

// Create an input
StorageAccount storageAccount = new StorageAccount()
{
    AccountName = "<YOUR STORAGE ACCOUNT NAME>",
    AccountKey = "<YOUR STORAGE ACCOUNT KEY>"
};
Input input = new Input()
{
    Properties = new StreamInputProperties()
    {
        Serialization = new CsvSerialization()
        {
            FieldDelimiter = ",",
            Encoding = Encoding.UTF8
        },
        Datasource = new BlobStreamInputDataSource()
        {
            StorageAccounts = new[] { storageAccount },
            Container = "<YOUR STORAGE BLOB CONTAINER>",
            PathPattern = "{date}/{time}",
            DateFormat = "yyyy/MM/dd",
            TimeFormat = "HH",
            SourcePartitionCount = 16
        }
    }
};
Input createInputResult = streamAnalyticsManagementClient.Inputs.CreateOrReplace(input, resourceGroupName, streamingJobName, inputName);

Le origini di input, dall'archiviazione BLOB o un hub eventi, sono legate a un processo specifico. Per usare la stessa origine di input per processi diversi, è necessario chiamare nuovamente il metodo e specificare un nome diverso.

Testare un'origine di input di Analisi di flusso

Il metodo TestConnection verifica se il processo di Analisi di flusso è in grado di connettersi all'origine di input, nonché altri aspetti specifici del tipo di origine di input. Ad esempio, nell'origine di input BLOB creata in precedenza, il metodo verifica che il nome dell'account di archiviazione e la coppia di chiavi possano essere usati per connettersi all'account di archiviazione, nonché verificare che il contenitore specificato esista.

// Test the connection to the input
ResourceTestStatus testInputResult = streamAnalyticsManagementClient.Inputs.Test(resourceGroupName, streamingJobName, inputName);

Creare una destinazione di output di Analisi di flusso

La creazione di una destinazione di output è molto simile alla creazione di un'origine di input di Analisi di flusso. Analogamente alle origini di input, le destinazioni di output sono legate a un processo specifico. Per usare la stessa destinazione di output per processi diversi, è necessario chiamare nuovamente il metodo e specificare un nome diverso.

Il codice seguente crea una destinazione di output (database SQL di Azure). È possibile personalizzare il tipo di dati della destinazione di output e/o il tipo di serializzazione.

// Create an output
Output output = new Output()
{
    Datasource = new AzureSqlDatabaseOutputDataSource()
    {
        Server = "<YOUR DATABASE SERVER NAME>",
        Database = "<YOUR DATABASE NAME>",
        User = "<YOUR DATABASE LOGIN>",
        Password = "<YOUR DATABASE LOGIN PASSWORD>",
        Table = "<YOUR DATABASE TABLE NAME>"
    }
};
Output createOutputResult = streamAnalyticsManagementClient.Outputs.CreateOrReplace(output, resourceGroupName, streamingJobName, outputName);

Testare una destinazione di output di Analisi di flusso

Una destinazione di output di Analisi di flusso dispone inoltre del metodo TestConnection per il test delle connessioni.

// Test the connection to the output
ResourceTestStatus testOutputResult = streamAnalyticsManagementClient.Outputs.Test(resourceGroupName, streamingJobName, outputName);

Creare una trasformazione di Analisi di flusso

Il codice seguente crea una trasformazione di Analisi di flusso con la query "select * from Input" e specifica l'assegnazione di un'unità di streaming per il processo di Analisi di flusso. Per altre informazioni sulla regolazione di unità di streaming, vedere Scalabilità dei processi di Analisi di flusso di Azure.

// Create a transformation
Transformation transformation = new Transformation()
{
    Query = "Select Id, Name from <your input name>", // '<your input name>' should be replaced with the value you put for the 'inputName' variable above or in a previous step
    StreamingUnits = 1
};
Transformation createTransformationResult = streamAnalyticsManagementClient.Transformations.CreateOrReplace(transformation, resourceGroupName, streamingJobName, transformationName);

Analogamente all'input e all'output, una trasformazione è inoltre collegata a un processo di Analisi di flusso specifico in cui è stato creata.

Avvio di un processo di Analisi di flusso

Dopo aver creato un processo di Analisi di flusso nonché gli input, gli output e la trasformazione, è possibile avviare il processo chiamando il metodo Start .

Il codice di esempio seguente avvia un processo di Analisi di flusso con un'ora di inizio dell'output personalizzato impostata sulle 12:12:12 UTC del 12 dicembre 2012:

// Start a streaming job
StartStreamingJobParameters startStreamingJobParameters = new StartStreamingJobParameters()
{
    OutputStartMode = OutputStartMode.CustomTime,
    OutputStartTime = new DateTime(2012, 12, 12, 12, 12, 12, DateTimeKind.Utc)
};
streamAnalyticsManagementClient.StreamingJobs.Start(resourceGroupName, streamingJobName, startStreamingJobParameters);

Arresto di un processo di Analisi di flusso

È possibile arrestare un processo di Analisi di flusso in esecuzione chiamando il metodo Stop .

// Stop a streaming job
streamAnalyticsManagementClient.StreamingJobs.Stop(resourceGroupName, streamingJobName);

Eliminazione di un processo di Analisi di flusso

Il metodo Delete consente di eliminare il processo, nonché le risorse secondarie sottostanti, inclusi gli input, gli output e la trasformazione del processo.

// Delete a streaming job
streamAnalyticsManagementClient.StreamingJobs.Delete(resourceGroupName, streamingJobName);

Supporto

Per ulteriore assistenza, provare il Forum di Analisi dei flussi di Azure.

Passaggi successivi

Sono state fornite le nozioni di base dell'utilizzo di .NET SDK per creare ed eseguire i processi di analisi. Per altre informazioni, vedere gli argomenti seguenti: