Zbiorcze eksportowanie i importowanie rejestracji usługi Azure Notification Hubs

Istnieją scenariusze, w których wymagane jest utworzenie lub zmodyfikowanie dużej liczby rejestracji w centrum powiadomień. Niektóre z tych scenariuszy to aktualizacje tagów po obliczeniach wsadowych lub migrowanie istniejącej implementacji wypychania do korzystania z usługi Azure Notification Hubs.

W tym artykule wyjaśniono, jak wykonać dużą liczbę operacji w centrum powiadomień lub wyeksportować wszystkie rejestracje zbiorczo.

UWAGA: Zbiorcze importowanie/eksportowanie jest dostępne tylko dla warstwy cenowej "Standardowa"

Przepływ wysokiego poziomu

Obsługa usługi Batch jest przeznaczona do obsługi długotrwałych zadań obejmujących miliony rejestracji. Aby osiągnąć tę skalę, obsługa wsadowa używa usługi Azure Storage do przechowywania szczegółów i danych wyjściowych zadania. W przypadku operacji aktualizacji zbiorczej użytkownik musi utworzyć plik w kontenerze obiektów blob, którego zawartość jest listą operacji aktualizacji rejestracji. Podczas uruchamiania zadania użytkownik udostępnia adres URL wejściowego obiektu blob wraz z adresem URL katalogu wyjściowego (również w kontenerze obiektów blob). Po rozpoczęciu zadania użytkownik może sprawdzić stan, wysyłając zapytanie o lokalizację adresu URL podaną na początku zadania. Określone zadanie może wykonywać tylko operacje określonego rodzaju (tworzy, aktualizuje lub usuwa). Operacje eksportowania są wykonywane analogicznie.

Importuj

Konfiguruj

W tej sekcji założono, że masz następujące jednostki:

Tworzenie pliku wejściowego i przechowywanie go w obiekcie blob

Plik wejściowy zawiera listę rejestracji serializacji w formacie XML, jeden na wiersz. Korzystając z zestawu Azure SDK, w poniższym przykładzie kodu pokazano, jak serializować rejestracje i przekazywać je do kontenera obiektów blob:

private static async Task SerializeToBlobAsync(BlobContainerClient container, RegistrationDescription[] descriptions)
{
     StringBuilder builder = new StringBuilder();
     foreach (var registrationDescription in descriptions)
     {
          builder.AppendLine(registrationDescription.Serialize());
     }

     var inputBlob = container.GetBlobClient(INPUT_FILE_NAME);
     using (MemoryStream stream = new MemoryStream(Encoding.UTF8.GetBytes(builder.ToString())))
     {
         await inputBlob.UploadAsync(stream);
     }
}

Ważne

Powyższy kod serializuje rejestracje w pamięci, a następnie przekazuje cały strumień do obiektu blob. Jeśli przekazano plik zawierający więcej niż kilka megabajtów, zapoznaj się ze wskazówkami dotyczącymi wykonywania tych kroków w usłudze Azure Blob; na przykład blokowe obiekty blob.

Tworzenie tokenów adresu URL

Po przekazaniu pliku wejściowego wygeneruj adresy URL, aby udostępnić je w centrum powiadomień zarówno dla pliku wejściowego, jak i katalogu wyjściowego. Dla danych wejściowych i wyjściowych można użyć dwóch różnych kontenerów obiektów blob.

static Uri GetOutputDirectoryUrl(BlobContainerClient container)
{
      Console.WriteLine(container.CanGenerateSasUri);
      BlobSasBuilder builder = new BlobSasBuilder(BlobSasPermissions.All, DateTime.UtcNow.AddDays(1));
      return container.GenerateSasUri(builder);
}

static Uri GetInputFileUrl(BlobContainerClient container, string filePath)
{
      Console.WriteLine(container.CanGenerateSasUri);
      BlobSasBuilder builder = new BlobSasBuilder(BlobSasPermissions.Read, DateTime.UtcNow.AddDays(1));
      return container.GenerateSasUri(builder);
}

Przesyłanie zadania

Przy użyciu dwóch adresów URL danych wejściowych i wyjściowych można teraz uruchomić zadanie wsadowe.

NotificationHubClient client = NotificationHubClient.CreateClientFromConnectionString(CONNECTION_STRING, HUB_NAME);
var job = await client.SubmitNotificationHubJobAsync(
     new NotificationHubJob {
             JobType = NotificationHubJobType.ImportCreateRegistrations,
             OutputContainerUri = outputContainerSasUri,
             ImportFileUri = inputFileSasUri
         }
     );

long i = 10;
while (i > 0 && job.Status != NotificationHubJobStatus.Completed)
{
    job = await client.GetNotificationHubJobAsync(job.JobId);
    await Task.Delay(1000);
    i--;
}

Oprócz adresów URL wejściowych i wyjściowych ten przykład tworzy NotificationHubJob obiekt zawierający JobType obiekt, który może być jednym z następujących typów:

  • ImportCreateRegistrations
  • ImportUpdateRegistrations
  • ImportDeleteRegistrations

Po zakończeniu wywołania zadanie jest kontynuowane przez centrum powiadomień i możesz sprawdzić jego stan za pomocą wywołania getNotificationHubJobAsync.

Po zakończeniu zadania możesz sprawdzić wyniki, przeglądając następujące pliki w katalogu wyjściowym:

  • /<hub>/<jobid>/Failed.txt
  • /<hub>/<jobid>/Output.txt

Te pliki zawierają listę pomyślnych i zakończonych niepowodzeniem operacji z partii. Format pliku to .cvs, w którym każdy wiersz ma numer wiersza oryginalnego pliku wejściowego i dane wyjściowe operacji (zazwyczaj utworzony lub zaktualizowany opis rejestracji).

Pełny przykładowy kod

Poniższy przykładowy kod importuje rejestracje do centrum powiadomień.

using Microsoft.Azure.NotificationHubs;
using Azure.Storage.Blobs;
using Azure.Storage.Sas;
using System.Text;

namespace ConsoleApplication1
{
    class Program
    {
        private static string CONNECTION_STRING = "namespace"; 
        private static string HUB_NAME = "demohub";
        private static string INPUT_FILE_NAME = "CreateFile.txt";
        private static string STORAGE_ACCOUNT_CONNECTIONSTRING = "connectionstring";
        private static string CONTAINER_NAME = "containername";

        static async Task Main(string[] args)
        {
            var descriptions = new[]
            {
                new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMkUxREQFBlVTTkMwMQ"),
                new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMjUxREQFBlVTTkMwMQ"),
                new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMhUxREQFBlVTTkMwMQ"),
                new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMdUxREQFBlVTTkMwMQ"),
            };

            // Get a reference to a container named "sample-container" and then create it
            BlobContainerClient container = new BlobContainerClient(STORAGE_ACCOUNT_CONNECTIONSTRING, CONTAINER_NAME);

            await container.CreateIfNotExistsAsync();

            await SerializeToBlobAsync(container, descriptions);

            // TODO then create Sas
            var outputContainerSasUri = GetOutputDirectoryUrl(container);
            
            BlobContainerClient inputcontainer = new BlobContainerClient(STORAGE_ACCOUNT_CONNECTIONSTRING, STORAGE_ACCOUNT_CONNECTIONSTRING + "/" +         INPUT_FILE_NAME);

            var inputFileSasUri = GetInputFileUrl(inputcontainer, INPUT_FILE_NAME);


            // Import this file
            NotificationHubClient client = NotificationHubClient.CreateClientFromConnectionString(CONNECTION_STRING, HUB_NAME);
            var job = await client.SubmitNotificationHubJobAsync(
                new NotificationHubJob {
                    JobType = NotificationHubJobType.ImportCreateRegistrations,
                    OutputContainerUri = outputContainerSasUri,
                    ImportFileUri = inputFileSasUri
                }
            );

            long i = 10;
            while (i > 0 && job.Status != NotificationHubJobStatus.Completed)
            {
                job = await client.GetNotificationHubJobAsync(job.JobId);
                await Task.Delay(1000);
                i--;
            }
        }

        private static async Task SerializeToBlobAsync(BlobContainerClient container, RegistrationDescription[] descriptions)
        {
            StringBuilder builder = new StringBuilder();
            foreach (var registrationDescription in descriptions)
            {
                builder.AppendLine(registrationDescription.Serialize());
            }

            var inputBlob = container.GetBlobClient(INPUT_FILE_NAME);
            using (MemoryStream stream = new MemoryStream(Encoding.UTF8.GetBytes(builder.ToString())))
            {
                await inputBlob.UploadAsync(stream);
            }
        }

        static Uri GetOutputDirectoryUrl(BlobContainerClient container)
        {
            Console.WriteLine(container.CanGenerateSasUri);
            BlobSasBuilder builder = new BlobSasBuilder(BlobSasPermissions.All, DateTime.UtcNow.AddDays(1));
            return container.GenerateSasUri(builder);
        }

        static Uri GetInputFileUrl(BlobContainerClient container, string filePath)
        {
            Console.WriteLine(container.CanGenerateSasUri);
            BlobSasBuilder builder = new BlobSasBuilder(BlobSasPermissions.Read, DateTime.UtcNow.AddDays(1));
            return container.GenerateSasUri(builder);

        }
    }
}

Eksportowanie

Eksportowanie rejestracji jest podobne do importu z następującymi różnicami:

  • Potrzebny jest tylko adres URL danych wyjściowych.
  • Utworzysz obiekt NotificationHubJob typu ExportRegistrations.

Przykładowy fragment kodu

Poniżej przedstawiono przykładowy fragment kodu do eksportowania rejestracji w języku Java:

// Submit an export job
NotificationHubJob job = new NotificationHubJob();
job.setJobType(NotificationHubJobType.ExportRegistrations);
job.setOutputContainerUri("container uri with SAS signature");
job = hub.submitNotificationHubJob(job);

// Wait until the job is done
while(true){
    Thread.sleep(1000);
    job = hub.getNotificationHubJob(job.getJobId());
    if(job.getJobStatus() == NotificationHubJobStatus.Completed)
        break;
}

Następne kroki

Aby dowiedzieć się więcej na temat rejestracji, zobacz następujące artykuły: