Azure Queue  

Udostępnij na: Facebook

Pobierz i uruchom

Autor: Piotr Zieliński

Opublikowano: 2011-02-02

Wprowadzenie

Mechanizm kolejek jest sposobem komunikacji pomiędzy różnymi, niezależnymi od siebie komponentami. Azure Queue można porównać do SQL Server Service Broker lub Microsoft Message Queuing. Dzięki kolejce niezależne komponenty mogą zlecać zadania. Komponent zlecający (zwany również producentem) dodaje komunikat (polecenie np. „wyślij e-mail”.) do kolejki. Następnie komponenty odpowiedzialne za wykonanie zlecenia (konsumenci) zdejmują komunikat z kolejki i wykonują stosowną operację. Dla uzyskania pełnej skalowalności bardzo ważne jest, aby zaprojektowana logika była podzielona na niezależne komponenty. Kolejka jest po prostu mediatorem do wymiany informacji między nimi. Tak jak pozostałe typy kontenerów, Azure Queue opiera się również na architekturze REST i protokole HTTP.

Struktura

Windows Azure Queue składa się z dwóch zasadniczych komponentów:

  • Kolejka – miejsce, w którym składowane są komunikaty. Jedno konto Azure Storage może zawierać nieograniczoną liczbę kolejek.
  • Wiadomość – pojedyncza wiadomość (w architekturze opartej na kolejkach komunikat może być utożsamiany z komendą) nieprzekraczająca 8KB. Wiadomości zdejmowane z kolejki zawsze są w formacie Base64.

Warto przyjrzeć się dokładniej atrybutom każdego komunikatu (wiadomości):

  • MessageID – unikalny identyfikator (GUID).
  • VisibilityTimeout – czas,  przez jaki  komunikat po zdjęciu z kolejki nie będzie widoczny dla konsumentów. Aby w pełni zrozumieć istotę tego atrybutu, rozważmy scenariusz, w którym konsument zdejmuje z kolejki komunikat zawierający jakiś rozkaz do wykonania. Następnie konsument próbuje wykonać operację, ale z jakichś względów nie udaje mu się sfinalizować operacji. Dzięki VisibilityTimeout  konsument ma szanse zrealizować zadanie w późniejszym terminie. Jeśli nie zdoła go zrealizować w ciągu czasu VisibilityTimeout, wiadomość stanie się znów widoczna i inny konsument będzie mógł rozpocząć realizowanie zadania.
  • MessageTTL – czas życia komunikatu, maksymalnie siedem dni. Dzięki MessageTTL mamy pewność, że niechciane komunikaty zostaną usunięte i nie będą niepotrzebnie obciążały komputera.
  • PopReceipt – każda wiadomość ma dwie właściwości identyfikujące: MessageID i PopReceipt. PopReceipt, w przeciwieństwie do MessageID, nie jest taki sam dla danego komunikatu – przy każdym zdjęciu komunikatu z kolejki PopReceipt jest na nowo generowany. Dzięki temu, oprócz zidentyfikowania samej wiadomości, można również jednocześnie określić konkretną operację zdjęcia komunikatu z kolejki.

Znając ogólną architekturę, spróbujmy prześledzić cykl życia wiadomości:

  1. Producent umieszcza wiadomość w kolejce.
  2. Kolejka oczekuje na zdjęcie wiadomości. Wiadomość będzie przechowywana w kolejce dopóty, dopóki nie zostanie przekroczony czas zdefiniowany przez MessageTTL.
  3. Konsument zdejmuje wiadomość z kolejki. Wiadomość nie jest automatycznie usuwana, jednak komunikat nie będzie widoczny przez czas zdefiniowany za pomocą parametru VisibilityTimeout. Każde zdjęcie wiadomości z kolejki wiąże się z automatycznym wygenerowaniem nowego PopReceipt. Co dalej się stanie z komunikatem, zależy już od konsumenta:
    1. konsument realizuje zadanie z sukcesem i usuwa komunikat z kolejki na podstawie identyfikatora MessageTTL oraz PopReceipt,
    2. nie udało się zrealizować zadania. Komunikat będzie niewidoczny jeszcze przez pewien czas (VisibilityTimeout). Mechanizm VisibilityTimeout zapobiega usuwaniu komunikatów, które nie zostały zrealizowane. Dzięki temu mamy pewność, że w kolejce znajdują się wiadomości, które nie zostały jeszcze przetworzone.

Przed projektowaniem architektury opartej na Azure Queue należy być świadomym kilku faktów:

  • Azure gwarantuje, że komunikat dotrze do przynajmniej jednego konsumenta. Nie ma gwarancji, że nie zostanie on odebrany przez kilka instancji naraz. Innymi słowy, wiadomość może zostać przetworzona kilkakrotnie.
  • Komunikaty dodane jako pierwsze niekoniecznie będą odebrane później również w pierwszej kolejności (brak struktury FIFO).
  • Komunikaty dodane do kolejki mogą pojawić się w niej z lekkim opóźnieniem (nie możemy zakładać, że dodana wiadomość od razu stanie się widoczna dla konsumentów).

Przykład

Rozważmy prosty scenariusz jednego producenta i konsumenta. Projekt będzie składał się z dwóch ról – Web Role i Worker Role:

Web Role będzie zlecał zadania do wykonania poprzez umieszczenie odpowiedniej wiadomości w kolejce. Z kolei Worker Role będzie działał w tle, odbierając wiadomości i odpowiednio je interpretując.

Przede wszystkim musimy utworzyć kolejkę dla komunikatów:

CloudQueueClient queueClient = cloudStorageAccount.CreateCloudQueueClient();

CloudQueue queue = queueClient.GetQueueReference("commands");

queue.CreateIfNotExist();

Wysłanie (a raczej umieszczenie w kolejce) wiadomości jest również bardzo proste:

CloudQueueMessage message = new CloudQueueMessage("SEND_MESSAGE");

queue.AddMessage(message);

Potrafimy już umieszczać komunikaty w kolejce. Przejdźmy teraz do odbierania ich w Worker Role. W metodzie Run umieszczamy następującą obsługę komunikatów:

public override void Run()
{
    StorageCredentialsAccountAndKey credentials = null;
    credentials = new StorageCredentialsAccountAndKey(accountName, key);
    CloudStorageAccount cloudStorageAccount = new CloudStorageAccount(credentials, true);
    CloudQueueClient queueClient = cloudStorageAccount.CreateCloudQueueClient();
    CloudQueue queue = queueClient.GetQueueReference("commands");

    while (true)
    {
        Thread.Sleep(10000);
        if (queue.Exists())
        {
            var msg = queue.GetMessage();
            if (msg != null)
            {
                string command = msg.AsString;
                if (command == "SEND_MESSAGE")
                {
                    try
                    {
                        SmtpClient client = new SmtpClient(“host”);
                        client.Credentials = new NetworkCredential("login", "haslo");
                        client.Send("from",”to”,”subject”,”body”); 
                    }
                    catch
                    {
                        continue;
                    }
                    queue.DeleteMessage(msg);
                }
            }
        }
    }
}

Powyższy kod w nieskończonej pętli czeka na komunikaty. W przypadku odebrania komunikatu sprawdza, czy zawiera on polecenie „SEND_MESSAGE”. Jeśli tak, to wysyła wiadomość na jakiś adres. Gdy tylko uda się wysłać wiadomość, usuwa komunikat z kolejki.

Warto zwrócić uwagę na skalowalność przedstawionego rozwiązania – możemy stworzyć dowolną liczbę instancji powyższych ról. Jeśli okazałoby się, że jeden Worker Role nie nadąża z wysyłaniem e-maili, można dodać następną instancję. Podobnie wygląda sytuacja z Web Role. Jeśli mamy więc do wykonania jakieś czasochłonne operacje, warto rozważyć wykorzystanie mechanizmu kolejek. Wysyłanie maili posłużyło tylko jako przykład i w praktyce nie jest operacją zbyt czasochłonną. Jeśli jednak wysyłanie miałoby trwać kilka minut, to wtedy szkoda marnować czasu w Web Role i lepiej wykorzystać właśnie kolejki i  Worker Role.

Metadane

Podobnie jak bloby, kolejki również mogą mieć przypisane dowolne metadane. Ustawienie metadanych wygląda analogicznie:

CloudQueue queue = client.GetQueueReference("commands");

Queue.MetaData.Add(“dowolna_nazwa”,”dowolona_wartosc”);

Queue.SetMetaData();

Bezpośredni dostęp do kolejek z zewnętrznego klienta

Istnieje również możliwość dodawania wiadomości (zlecania zadań) z poziomu zewnętrznego klienta, znajdującego się poza chmurą. Możemy stworzyć np. aplikację ASP.NET, która będzie zlecała zadania roli umieszczonej w chmurze. Korzystanie z kolejki w sposób bezpośredni niczym nie różni się od sposobu przedstawionego we wcześniejszej części artykułu. Przede wszystkim należy do aplikacji dodać bibliotekę StorageClient, którą można znaleźć w przykładach dołączonych do Azure Platform Kit. StorageClient stanowi po prostu wrapper na wywołania REST do AzureStorage. Zamiast ręcznie tworzyć odpowiednie nagłówki HTTP, StorageClient w tym nas wyręczy.

API zaimplementowane w bibliotece StorageClient jest bardzo intuicyjne. Aby dodać wiadomość SEND_MESSAGE do kolejki „commands”, wystarczy:

string accountName = "";

string key = ""

StorageAccountInfo storageAccountInfo = null;

Uri baseUri = new Uri("http://nazwa.queue.core.windows.net");

storageAccountInfo = new StorageAccountInfo(baseUri, null, accountName, key);

QueueStorage queueStorage= QueueStorage.Create(storageAccountInfo);

MessageQueue messageQueue= queueStorage.GetQueue("commands");

messageQueue.PutMessage(new Message("SEND_MESSAGE"));

Powyższe rozwiązanie jest w pełni poprawne. Należy jednak znaczyć, że bezpośrednie podłączenie do kolejki jest bardzo niebezpieczne. Pomijając fakt, że w takim przypadku nie można zaimplementować dodatkowego uwierzytelnienia i autoryzacji, musimy dostarczyć aplikacji klienckiej nazwę konta i klucz dostępu – co naraża nas na atak zewnętrzny, np. gdy programista aplikacji klienckiej nie zadba o bezpieczne składowanie klucza.

Dostęp do kolejek z zewnętrznego klienta z wykorzystaniem warstwy pośredniej WCF

Dużo lepszym i elastyczniejszym rozwiązaniem jest umiejscowienie usługi WCF pomiędzy klientem a Azure Sotrage. Aby wyeksponować operacje na kolejce, należy utworzyć klientom zewnętrznym usługę WCF. Następnie dodajemy następującą operację WCF:

public void AddMessage(string message)

{

       string accountName = "";

       string key = ""

       StorageCredentialsAccountAndKey credentials = null;

       credentials = new StorageCredentialsAccountAndKey(accountName, key);

       CloudStorageAccount cloudStorageAccount = new CloudStorageAccount(credentials, true);

       CloudQueueClient queueClient =cloudStorageAccount.CreateCloudQueueClient();

       CloudQueue queue = queueClient.GetQueueReference("commands");

       queue.CreateIfNotExist();

       CloudQueueMessage queueMessage = new CloudQueueMessage(message);

       queue.AddMessage(queueMessage);

}

W ten sposób wyeksponowaliśmy operację dodawania komunikatu. Dzięki WCF możemy zawrzeć zaawansowaną logikę biznesową oraz bardziej wyrafinowaną metodę uwierzytelniania. W aplikacji klienckiej wystarczy tylko podłączyć usługę za pomocą Add Service Reference (menu kontekstowe Solution Explorer):

Po dodaniu referencji do usługi WCF można wywołać odpowiednią metodę:

ServiceReference1.Service1Client client = null;

client = new ServiceReference1.Service1Client();

client.AddMessage("SEND_MESSAGE");

Zakończenie

Azure Queue powinien być traktowany jako sposób komunikacji niezależnych, skalowalnych komponentów, a nie jako jeszcze jeden sposób przechowywania danych. Wykorzystanie kolejek jest możliwe, tylko jeśli architektura została prawidłowo zaprojektowana – powinna składać się z niezależnych komponentów. Azure nie gwarantuje, że komunikat zostanie odebrany wyłącznie przez jednego konsumenta, więc jeśli jest to sprzeczne z logiką biznesową, należy wtedy wprowadzić dodatkowy mechanizm śledzący stan danego zadania.