Procedure: Registraties bulksgewijs exporteren en wijzigen

Er zijn scenario's waarin het nodig is om grote aantallen registraties in een Notification Hub te maken of te wijzigen. Sommige van deze scenario's zijn tag-updates na batchberekeningen of het migreren van een bestaande push-implementatie om Notification Hubs te gebruiken.

In dit onderwerp wordt uitgelegd hoe u bulksgewijs ondersteuning voor Notification Hubs gebruikt om een groot aantal bewerkingen uit te voeren op een Notification Hub of om alle registraties te exporteren.

High-Level Flow

Batch-ondersteuning is ontworpen ter ondersteuning van langdurige taken met miljoenen registraties. Om deze schaal te bereiken, maakt batchverwerkingsondersteuning gebruik van Azure Storage voor het opslaan van taakgegevens en -uitvoer. Voor bulkupdatebewerkingen moet de gebruiker een bestand maken in een blobcontainer waarvan de inhoud de lijst met registratie-updatebewerkingen is. Wanneer de taak wordt gestart, geeft de gebruiker een URL naar de invoer-blob, samen met een URL naar een uitvoermap (ook in een blobcontainer). Nadat de taak is gestart, kan de gebruiker de status controleren door een URL-locatie op te vragen die is opgegeven bij het starten van de taak. Houd er rekening mee dat een specifieke taak alleen bewerkingen van een specifiek type kan uitvoeren (maakt, bijwerken of verwijderen). Exportbewerkingen worden analoog uitgevoerd.

Importeren

Instellen

In deze sectie wordt ervan uitgegaan dat u het volgende hebt:

  1. Een ingerichte Notification Hub.

  2. Een Azure Storage blobcontainer.

  3. Verwijzingen naar de Azure Storage- en Azure Service Bus NuGet-pakketten.

Invoerbestand maken en opslaan in een blob

Een invoerbestand bevat een lijst met registraties die zijn geserialiseerd in XML, één per rij. Met behulp van de Azure SDK ziet u in het volgende codevoorbeeld hoe u de registraties serialiseert en uploadt naar een blobcontainer.

private static void SerializeToBlob(CloudBlobContainer container, RegistrationDescription[] descriptions)
{
    StringBuilder builder = new StringBuilder();
    foreach (var registrationDescription in descriptions)
    {
        builder.AppendLine(RegistrationDescription.Serialize());
    }

    var inputBlob = container.GetBlockBlobReference(INPUT_FILE_NAME);
    using (MemoryStream stream = new MemoryStream(Encoding.UTF8.GetBytes(builder.ToString())))
    {
        inputBlob.UploadFromStream(stream);
    }
}

Belangrijk

De voorgaande code serialiseert de registraties in het geheugen en uploadt vervolgens de hele stream naar een blob. Als u een bestand met meer dan een paar megabytes hebt geüpload, raadpleegt u de Azure Blob-richtlijnen voor het uitvoeren van deze stappen; Bijvoorbeeld blok-blobs.

URL-tokens maken

Zodra het invoerbestand is geüpload, moet u de URL's genereren die moeten worden opgegeven voor uw Notification Hub voor zowel het invoerbestand als de uitvoermap. U kunt twee verschillende blobcontainers gebruiken voor invoer en uitvoer.

static Uri GetOutputDirectoryUrl(CloudBlobContainer container)
{
    SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
    {
        SharedAccessExpiryTime = DateTime.UtcNow.AddDays(1),
        Permissions = SharedAccessBlobPermissions.Write | SharedAccessBlobPermissions.List | SharedAccessBlobPermissions.Read
    };

    string sasContainerToken = container.GetSharedAccessSignature(sasConstraints);
    return new Uri(container.Uri + sasContainerToken);
}

static Uri GetInputFileUrl(CloudBlobContainer container, string filePath)
{
    SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
    {
        SharedAccessExpiryTime = DateTime.UtcNow.AddDays(1),
        Permissions = SharedAccessBlobPermissions.Read
    };
    string sasToken = container.GetBlockBlobReference(filePath).GetSharedAccessSignature(sasConstraints);
    return new Uri(container.Uri + "/" + filePath + sasToken);
}

De taak verzenden

Met de twee invoer- en uitvoer-URL's kunnen we de batchtaak nu starten.

NotificationHubClient client = NotificationHubClient.CreateClientFromConnectionString(CONNECTION_STRING, HUB_NAME);
var createTask = client.SubmitNotificationHubJobAsync(
new NotificationHubJob {
        JobType = NotificationHubJobType.ImportCreateRegistrations,
        OutputContainerUri = outputContainerSasUri,
        ImportFileUri = inputFileSasUri
    }
);
createTask.Wait();

var job = createTask.Result;
long i = 10;
while (i > 0 && job.Status != NotificationHubJobStatus.Completed)
{
    var getJobTask = client.GetNotificationHubJobAsync(job.JobId);
    getJobTask.Wait();
    job = getJobTask.Result;
    Thread.Sleep(1000);
    i--;
}

Naast de invoer- en uitvoer-URL's wordt in dit voorbeeld een object gemaakt dat een NotificationHubJobJobType object bevat. Dit kan een van de volgende zijn:

  • ImportCreateRegistrations

  • ImportUpdateRegistrations

  • ImportDeleteRegistrations

Zodra de aanroep is voltooid, wordt de taak voortgezet door de Notification Hub en kunt u de status ervan controleren met de aanroep naar GetNotificationHubJobAsync.

Wanneer de taak is voltooid, kunt u de resultaten controleren door de volgende bestanden in uw uitvoermap te bekijken:

  • /<hub>/<jobid>/Failed.txt

  • /<hub>/<jobid>/Output.txt

Deze bestanden bevatten de lijst met geslaagde en mislukte bewerkingen uit uw batch. De bestandsindeling is .cvs, waarin elke rij het regelnummer van het oorspronkelijke invoerbestand heeft en de uitvoer van de bewerking (meestal de beschrijving van de gemaakte of bijgewerkte registratie).

Exporteren

Het exporteren van registratie is vergelijkbaar met het importeren, met de volgende verschillen:

  1. U hebt alleen de uitvoer-URL nodig.

  2. U moet een NotificationHubJob van het type ExportRegistrations maken.

Volledige voorbeeldcode

Hier volgt een volledig werkend voorbeeld waarmee registraties worden geïmporteerd in een Notification Hub.

using Microsoft.ServiceBus.Notifications;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using System;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Runtime.Serialization;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml;

namespace ConsoleApplication1
{
    class Program
    {
        private static string CONNECTION_STRING = "---";
        private static string HUB_NAME = "---";
        private static string INPUT_FILE_NAME = "CreateFile.txt";
        private static string STORAGE_ACCOUNT = "---";
        private static string STORAGE_PASSWORD = "---";
        private static StorageUri STORAGE_ENDPOINT = new StorageUri(new Uri("---"));

        static void Main(string[] args)
        {
            var descriptions = new[]
            {
                new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMkUxREQFBlVTTkMwMQ"),
                new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMjUxREQFBlVTTkMwMQ"),
                new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMhUxREQFBlVTTkMwMQ"),
                new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMdUxREQFBlVTTkMwMQ"),
            };

            //write to blob store to create an input file
            var blobClient = new CloudBlobClient(STORAGE_ENDPOINT, new Microsoft.WindowsAzure.Storage.Auth.StorageCredentials(STORAGE_ACCOUNT, STORAGE_PASSWORD));
            var container = blobClient.GetContainerReference("testjobs");
            container.CreateIfNotExists();

            SerializeToBlob(container, descriptions);

            // TODO then create Sas
            var outputContainerSasUri = GetOutputDirectoryUrl(container);
            var inputFileSasUri = GetInputFileUrl(container, INPUT_FILE_NAME);


            //Lets import this file
            NotificationHubClient client = NotificationHubClient.CreateClientFromConnectionString(CONNECTION_STRING, HUB_NAME);
            var createTask = client.SubmitNotificationHubJobAsync(
                new NotificationHubJob {
                    JobType = NotificationHubJobType.ImportCreateRegistrations,
                    OutputContainerUri = outputContainerSasUri,
                    ImportFileUri = inputFileSasUri
                }
            );
            createTask.Wait();

            var job = createTask.Result;
            long i = 10;
            while (i > 0 && job.Status != NotificationHubJobStatus.Completed)
            {
                var getJobTask = client.GetNotificationHubJobAsync(job.JobId);
                getJobTask.Wait();
                job = getJobTask.Result;
                Thread.Sleep(1000);
                i--;
            }
        }

        private static void SerializeToBlob(CloudBlobContainer container, RegistrationDescription[] descriptions)
        {
            StringBuilder builder = new StringBuilder();
            foreach (var registrationDescription in descriptions)
            {
                builder.AppendLine(RegistrationDescription.Serialize());
            }

            var inputBlob = container.GetBlockBlobReference(INPUT_FILE_NAME);
            using (MemoryStream stream = new MemoryStream(Encoding.UTF8.GetBytes(builder.ToString())))
            {
                inputBlob.UploadFromStream(stream);
            }
        }

        static Uri GetOutputDirectoryUrl(CloudBlobContainer container)
        {
            //Set the expiry time and permissions for the container.
            //In this case no start time is specified, so the shared access signature becomes valid immediately.
            SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
            {
                SharedAccessExpiryTime = DateTime.UtcNow.AddHours(4),
                Permissions = SharedAccessBlobPermissions.Write | SharedAccessBlobPermissions.List | SharedAccessBlobPermissions.Read
            };

            //Generate the shared access signature on the container, setting the constraints directly on the signature.
            string sasContainerToken = container.GetSharedAccessSignature(sasConstraints);

            //Return the URI string for the container, including the SAS token.
            return new Uri(container.Uri + sasContainerToken);
        }

        static Uri GetInputFileUrl(CloudBlobContainer container, string filePath)
        {
            //Set the expiry time and permissions for the container.
            //In this case no start time is specified, so the shared access signature becomes valid immediately.
            SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
            {
                SharedAccessExpiryTime = DateTime.UtcNow.AddHours(4),
                Permissions = SharedAccessBlobPermissions.Read
            };

            //Generate the shared access signature on the container, setting the constraints directly on the signature.
            string sasToken = container.GetBlockBlobReference(filePath).GetSharedAccessSignature(sasConstraints);

            //Return the URI string for the container, including the SAS token.
            return new Uri(container.Uri + "/" + filePath + sasToken);
        }

        static string GetJobPath(string namespaceName, string notificationHubPath, string jobId)
        {
            return string.Format(CultureInfo.InvariantCulture, @"{0}//{1}/{2}/", namespaceName, notificationHubPath, jobId);
        }
    }
}