Share via


Como exportar e modificar registros em massa

Existem cenários em que é necessário criar ou modificar grandes números de registros em um hub de notificação. Alguns desses cenários são atualizações de marcas após computações em lote ou a migração de uma implementação de push existente para usar Hubs de Notificação.

Este tópico explica como usar o suporte a operações em massa de Hubs de Notificação para executar um grande número de operações em um hub de notificação ou para exportar todos os registros.

Fluxo de alto nível

O suporte a operações em lote destina-se a trabalhos de longa duração que envolvem milhões de registros. Para obter essa escala, o suporte ao envio em lote usa Armazenamento do Azure para armazenar detalhes e saída do trabalho. Para operações de atualização em massa, o usuário precisa criar um arquivo em um contêiner de blobs, cujo conteúdo é a lista de operações de atualização de registros. Ao iniciar o trabalho, o usuário fornece uma URL para o blob de entrada, junto com uma URL para um diretório de saída (também contido em um contêiner de blobs). Uma vez iniciado o trabalho, o usuário poderá verificar o status consultando um local de URL fornecido no início do trabalho. Observe que cada trabalho só poderá executar operações de determinado tipo (criações, atualizações ou exclusões). As operações de exportação são executadas de maneira semelhante.

Importar

Instalação

Esta seção pressupõe que você tenha o seguinte:

  1. Um hub de notificação provisionado.

  2. Um contêiner de blobs do Armazenamento do Azure.

  3. Referências aos pacotes de NuGet do Azure Armazenamento e Barramento de Serviço do Azure.

Criar um arquivo de entrada e armazená-lo em um blob

Um arquivo de entrada contém uma lista de registros serializados em XML, sendo um por linha. Usando o SDK do Azure, o exemplo de código a seguir mostra como serializar os registros e carregá-los no contêiner de blobs.

private static void SerializeToBlob(CloudBlobContainer container, RegistrationDescription[] descriptions)
{
    StringBuilder builder = new StringBuilder();
    foreach (var registrationDescription in descriptions)
    {
        builder.AppendLine(RegistrationDescription.Serialize());
    }

    var inputBlob = container.GetBlockBlobReference(INPUT_FILE_NAME);
    using (MemoryStream stream = new MemoryStream(Encoding.UTF8.GetBytes(builder.ToString())))
    {
        inputBlob.UploadFromStream(stream);
    }
}

Importante

O código anterior serializa os registros na memória e, em seguida, carrega o fluxo inteiro em um blob. Se você tiver carregado um arquivo de mais do que apenas alguns megabytes, consulte as diretrizes de blob do Azure sobre como executar essas etapas; por exemplo, bloquear blobs.

Criar tokens de URL

Uma vez carregado o arquivo de entrada, você deverá gerar as URLs para o arquivo de entrada e o diretório de saída a fim de fornecê-las ao hub de notificação. Observe que você pode usar dois contêineres de blobs diferentes para entrada e saída.

static Uri GetOutputDirectoryUrl(CloudBlobContainer container)
{
    SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
    {
        SharedAccessExpiryTime = DateTime.UtcNow.AddDays(1),
        Permissions = SharedAccessBlobPermissions.Write | SharedAccessBlobPermissions.List | SharedAccessBlobPermissions.Read
    };

    string sasContainerToken = container.GetSharedAccessSignature(sasConstraints);
    return new Uri(container.Uri + sasContainerToken);
}

static Uri GetInputFileUrl(CloudBlobContainer container, string filePath)
{
    SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
    {
        SharedAccessExpiryTime = DateTime.UtcNow.AddDays(1),
        Permissions = SharedAccessBlobPermissions.Read
    };
    string sasToken = container.GetBlockBlobReference(filePath).GetSharedAccessSignature(sasConstraints);
    return new Uri(container.Uri + "/" + filePath + sasToken);
}

Enviar o trabalho

Com as duas URLs de entrada e saída, você poderá iniciar o trabalho em lote.

NotificationHubClient client = NotificationHubClient.CreateClientFromConnectionString(CONNECTION_STRING, HUB_NAME);
var createTask = client.SubmitNotificationHubJobAsync(
new NotificationHubJob {
        JobType = NotificationHubJobType.ImportCreateRegistrations,
        OutputContainerUri = outputContainerSasUri,
        ImportFileUri = inputFileSasUri
    }
);
createTask.Wait();

var job = createTask.Result;
long i = 10;
while (i > 0 && job.Status != NotificationHubJobStatus.Completed)
{
    var getJobTask = client.GetNotificationHubJobAsync(job.JobId);
    getJobTask.Wait();
    job = getJobTask.Result;
    Thread.Sleep(1000);
    i--;
}

Além das URLs de entrada e saída, este exemplo cria um objeto NotificationHubJob que contém um objeto JobType, que poderá ser um dos apresentados a seguir:

  • ImportCreateRegistrations

  • ImportUpdateRegistrations

  • ImportDeleteRegistrations

Quando a chamada for concluída, o hub de notificação dará continuidade ao trabalho, e você poderá verificar o seu status com a chamada de GetNotificationHubJobAsync.

Quando o trabalho for concluído, você poderá verificar os resultados examinando os seguintes arquivos em seu diretório de saída:

  • /<hub>/<jobid>/Failed.txt

  • /<hub>/<jobid>/Output.txt

Esses arquivos contêm a lista das operações de seu lote que foram bem-sucedidas e falharam. O formato do arquivo é .cvs, no qual cada linha contém o número da linha do arquivo de entrada original e a saída da operação (geralmente a descrição de registro criada ou atualizada).

Exportação

A exportação do registro é semelhante à importação, com as seguintes diferenças:

  1. Você precisa somente da URL de saída.

  2. Você deve criar um NotificationHubJob do tipo ExportRegistrations.

Código de Exemplo Completo

A seguir é apresentado um exemplo completo que importa registros para um hub de notificação.

using Microsoft.ServiceBus.Notifications;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using System;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Runtime.Serialization;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml;

namespace ConsoleApplication1
{
    class Program
    {
        private static string CONNECTION_STRING = "---";
        private static string HUB_NAME = "---";
        private static string INPUT_FILE_NAME = "CreateFile.txt";
        private static string STORAGE_ACCOUNT = "---";
        private static string STORAGE_PASSWORD = "---";
        private static StorageUri STORAGE_ENDPOINT = new StorageUri(new Uri("---"));

        static void Main(string[] args)
        {
            var descriptions = new[]
            {
                new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMkUxREQFBlVTTkMwMQ"),
                new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMjUxREQFBlVTTkMwMQ"),
                new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMhUxREQFBlVTTkMwMQ"),
                new MpnsRegistrationDescription(@"http://dm2.notify.live.net/throttledthirdparty/01.00/12G9Ed13dLb5RbCii5fWzpFpAgAAAAADAQAAAAQUZm52OkJCMjg1QTg1QkZDMdUxREQFBlVTTkMwMQ"),
            };

            //write to blob store to create an input file
            var blobClient = new CloudBlobClient(STORAGE_ENDPOINT, new Microsoft.WindowsAzure.Storage.Auth.StorageCredentials(STORAGE_ACCOUNT, STORAGE_PASSWORD));
            var container = blobClient.GetContainerReference("testjobs");
            container.CreateIfNotExists();

            SerializeToBlob(container, descriptions);

            // TODO then create Sas
            var outputContainerSasUri = GetOutputDirectoryUrl(container);
            var inputFileSasUri = GetInputFileUrl(container, INPUT_FILE_NAME);


            //Lets import this file
            NotificationHubClient client = NotificationHubClient.CreateClientFromConnectionString(CONNECTION_STRING, HUB_NAME);
            var createTask = client.SubmitNotificationHubJobAsync(
                new NotificationHubJob {
                    JobType = NotificationHubJobType.ImportCreateRegistrations,
                    OutputContainerUri = outputContainerSasUri,
                    ImportFileUri = inputFileSasUri
                }
            );
            createTask.Wait();

            var job = createTask.Result;
            long i = 10;
            while (i > 0 && job.Status != NotificationHubJobStatus.Completed)
            {
                var getJobTask = client.GetNotificationHubJobAsync(job.JobId);
                getJobTask.Wait();
                job = getJobTask.Result;
                Thread.Sleep(1000);
                i--;
            }
        }

        private static void SerializeToBlob(CloudBlobContainer container, RegistrationDescription[] descriptions)
        {
            StringBuilder builder = new StringBuilder();
            foreach (var registrationDescription in descriptions)
            {
                builder.AppendLine(RegistrationDescription.Serialize());
            }

            var inputBlob = container.GetBlockBlobReference(INPUT_FILE_NAME);
            using (MemoryStream stream = new MemoryStream(Encoding.UTF8.GetBytes(builder.ToString())))
            {
                inputBlob.UploadFromStream(stream);
            }
        }

        static Uri GetOutputDirectoryUrl(CloudBlobContainer container)
        {
            //Set the expiry time and permissions for the container.
            //In this case no start time is specified, so the shared access signature becomes valid immediately.
            SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
            {
                SharedAccessExpiryTime = DateTime.UtcNow.AddHours(4),
                Permissions = SharedAccessBlobPermissions.Write | SharedAccessBlobPermissions.List | SharedAccessBlobPermissions.Read
            };

            //Generate the shared access signature on the container, setting the constraints directly on the signature.
            string sasContainerToken = container.GetSharedAccessSignature(sasConstraints);

            //Return the URI string for the container, including the SAS token.
            return new Uri(container.Uri + sasContainerToken);
        }

        static Uri GetInputFileUrl(CloudBlobContainer container, string filePath)
        {
            //Set the expiry time and permissions for the container.
            //In this case no start time is specified, so the shared access signature becomes valid immediately.
            SharedAccessBlobPolicy sasConstraints = new SharedAccessBlobPolicy
            {
                SharedAccessExpiryTime = DateTime.UtcNow.AddHours(4),
                Permissions = SharedAccessBlobPermissions.Read
            };

            //Generate the shared access signature on the container, setting the constraints directly on the signature.
            string sasToken = container.GetBlockBlobReference(filePath).GetSharedAccessSignature(sasConstraints);

            //Return the URI string for the container, including the SAS token.
            return new Uri(container.Uri + "/" + filePath + sasToken);
        }

        static string GetJobPath(string namespaceName, string notificationHubPath, string jobId)
        {
            return string.Format(CultureInfo.InvariantCulture, @"{0}//{1}/{2}/", namespaceName, notificationHubPath, jobId);
        }
    }
}