Elaborare il feed di modifiche in Archiviazione BLOB di Azure

Il feed di modifiche fornisce i log delle transazioni di tutte le modifiche apportate ai BLOB e ai metadati BLOB nell'account di archiviazione. Questo articolo illustra come leggere i record dei feed di modifiche usando la libreria del processore del feed di modifiche BLOB.

Per altre informazioni sul feed di modifiche, vedere Feed di modifiche in Archiviazione BLOB di Azure.

Ottenere la libreria del processore del feed di modifiche BLOB

  1. Aprire una finestra di comando (ad esempio: Windows PowerShell).
  2. Nella directory del progetto installare il pacchetto NuGet azure.Storage.BLOBs.Changefeed.
dotnet add package Azure.Storage.Blobs --version 12.5.1
dotnet add package Azure.Storage.Blobs.ChangeFeed --version 12.0.0-preview.4

Leggere i record

Nota

Il feed di modifiche è un'entità non modificabile e di sola lettura nell'account di archiviazione. Qualsiasi numero di applicazioni può leggere ed elaborare il feed di modifiche simultaneamente e indipendentemente in base alla propria comodità. I record non vengono rimossi dal feed di modifiche quando un'applicazione le legge. Lo stato di lettura o iterazione di ogni lettore di utilizzo è indipendente e gestito solo dall'applicazione.

Questo esempio esegue l'iterazione di tutti i record nel feed di modifiche, li aggiunge a un elenco e quindi restituisce tale elenco al chiamante.

public async Task<List<BlobChangeFeedEvent>> ChangeFeedAsync(string connectionString)
{
    // Get a new blob service client.
    BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

    // Get a new change feed client.
    BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();

    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    // Get all the events in the change feed. 
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync())
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

In questo esempio vengono stampati nella console alcuni valori di ogni record nell'elenco.

public void showEventData(List<BlobChangeFeedEvent> changeFeedEvents)
{
    foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedEvents)
    {
        string subject = changeFeedEvent.Subject;
        string eventType = changeFeedEvent.EventType.ToString();
        string api = changeFeedEvent.EventData.Api;

        Console.WriteLine("Subject: " + subject + "\n" +
        "Event Type: " + eventType + "\n" +
        "Api: " + api);
    }
}

Riprendere la lettura dei record da una posizione salvata

È possibile scegliere di salvare la posizione di lettura nel feed di modifiche e quindi riprendere l'iterazione dei record in un momento futuro. È possibile salvare la posizione di lettura ottenendo il cursore del feed di modifiche. Il cursore è una stringa e l'applicazione può salvare tale stringa in qualsiasi modo che abbia senso per la progettazione dell'applicazione ( ad esempio: in un file o in un database).

Questo esempio esegue l'iterazione di tutti i record nel feed di modifiche, li aggiunge a un elenco e salva il cursore. L'elenco e il cursore vengono restituiti al chiamante.

public async Task<(string, List<BlobChangeFeedEvent>)> ChangeFeedResumeWithCursorAsync
    (string connectionString,  string cursor)
{
    // Get a new blob service client.
    BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

    // Get a new change feed client.
    BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuation: cursor)
        .AsPages(pageSizeHint: 10)
        .GetAsyncEnumerator();

    await enumerator.MoveNextAsync();

    foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
    {

        changeFeedEvents.Add(changeFeedEvent);             
    }

    // Update the change feed cursor.  The cursor is not required to get each page of events,
    // it is intended to be saved and used to resume iterating at a later date.
    cursor = enumerator.Current.ContinuationToken;
    return (cursor, changeFeedEvents);
}

Elaborazione di flusso di record

È possibile scegliere di elaborare i record del feed di modifiche durante il commit nel feed di modifiche. Vedere Specifiche. Gli eventi di modifica vengono pubblicati nel feed di modifiche in un periodo di 60 secondi in media. È consigliabile eseguire il polling di nuove modifiche con questo periodo in mente quando si specifica l'intervallo di polling.

Questo esempio esegue periodicamente il polling delle modifiche. Se esistono record di modifica, questo codice elabora tali record e salva il cursore del feed di modifiche. In questo modo, se il processo viene arrestato e quindi avviato di nuovo, l'applicazione può usare il cursore per riprendere l'elaborazione dei record in cui è stata interrotta l'ultima operazione. In questo esempio il cursore viene salvato in un file di configurazione dell'applicazione locale, ma l'applicazione può salvarla in qualsiasi forma che abbia il senso più appropriato per lo scenario.

public async Task ChangeFeedStreamAsync
    (string connectionString, int waitTimeMs, string cursor)
{
    // Get a new blob service client.
    BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

    // Get a new change feed client.
    BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();

    while (true)
    {
        IAsyncEnumerator<Page<BlobChangeFeedEvent>> enumerator = changeFeedClient
        .GetChangesAsync(continuation: cursor).AsPages().GetAsyncEnumerator();

        while (true) 
        {
            var result = await enumerator.MoveNextAsync();

            if (result)
            {
                foreach (BlobChangeFeedEvent changeFeedEvent in enumerator.Current.Values)
                {
                    string subject = changeFeedEvent.Subject;
                    string eventType = changeFeedEvent.EventType.ToString();
                    string api = changeFeedEvent.EventData.Api;

                    Console.WriteLine("Subject: " + subject + "\n" +
                        "Event Type: " + eventType + "\n" +
                        "Api: " + api);
                }

                // helper method to save cursor. 
                SaveCursor(enumerator.Current.ContinuationToken);
            }
            else
            {
                break;
            }

        }
        await Task.Delay(waitTimeMs);
    }

}

public void SaveCursor(string cursor)
{
    System.Configuration.Configuration config = 
        ConfigurationManager.OpenExeConfiguration
        (ConfigurationUserLevel.None);

    config.AppSettings.Settings.Clear();
    config.AppSettings.Settings.Add("Cursor", cursor);
    config.Save(ConfigurationSaveMode.Modified);
}

Lettura dei record all'interno di un intervallo di tempo

È possibile leggere i record che rientrano in un intervallo di tempo specifico. Questo esempio esegue l'iterazione di tutti i record nel feed di modifiche che rientrano tra le 3:00 del 2 marzo 2020 e le 2:00 del 7 agosto 2020, li aggiunge a un elenco e quindi restituisce tale elenco al chiamante.

Selezione di segmenti per un intervallo di tempo

public async Task<List<BlobChangeFeedEvent>> ChangeFeedBetweenDatesAsync(string connectionString)
{
    // Get a new blob service client.
    BlobServiceClient blobServiceClient = new BlobServiceClient(connectionString);

    // Get a new change feed client.
    BlobChangeFeedClient changeFeedClient = blobServiceClient.GetChangeFeedClient();
    List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();

    // Create the start and end time.  The change feed client will round start time down to
    // the nearest hour, and round endTime up to the next hour if you provide DateTimeOffsets
    // with minutes and seconds.
    DateTimeOffset startTime = new DateTimeOffset(2020, 3, 2, 15, 0, 0, TimeSpan.Zero);
    DateTimeOffset endTime = new DateTimeOffset(2020, 8, 7, 2, 0, 0, TimeSpan.Zero);

    // You can also provide just a start or end time.
    await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync(
        start: startTime,
        end: endTime))
    {
        changeFeedEvents.Add(changeFeedEvent);
    }

    return changeFeedEvents;
}

L'ora di inizio fornito viene arrotondata all'ora più vicina e l'ora di fine viene arrotondata fino all'ora più vicina. È possibile che gli utenti possano visualizzare eventi che si sono verificati prima dell'ora di inizio e dopo l'ora di fine. È anche possibile che alcuni eventi che si verificano tra l'ora di inizio e di fine non verranno visualizzati. Questo perché gli eventi potrebbero essere registrati durante l'ora precedente all'ora di inizio o durante l'ora dopo l'ora di fine.

Passaggi successivi