Vorhersage: Bewölkt

Abschluss der Tour mit AppFabric-Warteschlangen

Joseph Fultz

Joseph FultzIn der Oktoberausgabe habe ich einige der neuen Funktionen des Windows Azure AppFabric Service Bus (msdn.microsoft.com/magazine/hh456395) vorgestellt. Diesen Monat setze ich die Behandlung des Szenarios fort, mit der ich begonnen habe, und beschreibe den Weg zurück. Im Szenario hatte ein Geschäft eine Inventarprüfung von benachbarten Geschäften angefordert, indem Prüfungsanforderungen zu einem Thema veröffentlicht wurden. Die Geschäfte, die das Thema abonniert haben, erhalten die Anforderungen auf der Basis eines Filters für das Abonnement, der die Nachrichten auf solche Nachrichten innerhalb der Region einschränkt, die nicht von diesen Geschäften gesendet wurden.

Für den Weg zurück nutze ich einige Funktionen der Windows Azure AppFabric-Buswarteschlangen. Zunächst gibt es zwei wichtige Eigenschaften, denen ich in der Brokered Message, die an das Thema gesendet wird, Werte zuweisen werde. Die erste Eigenschaft, der ich einen Wert zuweisen werde, ist die Eigenschaft "ReplyTo", um dem Empfänger mitzuteilen, wohin die Nachricht gesendet werden soll. Dabei handelt es sich um eine bestimmte Warteschlange, die vom Sender zum Zeitpunkt des Sendens der Nachricht erstellt wird. Ich möchte in diesem Fall eine Warteschlange statt eines Themas verwenden, da die Antwort an einen einzelnen Empfänger gesendet wird, anders als im Fall des Anforderungsmusters, bei dem es sich um einen Broadcast mit einer allgemeinen Hilfeanforderung handelt.

Der neue Bestandteil des Nachrichtenverlaufs wird in Abbildung 1 gezeigt. Es handelt sich um den Bestandteil, der sich nach rechts zur Reihe der Geschäfte richtet.

Inquiry and Response Round-Trip
Abbildung 1: Anfrage- und Antwortrundreise

Aktualisieren des Codes der ausgehenden Anforderung

In der ersten habe ich die Nachricht an das Thema gesendet und gezeigt, wie diese von den entsprechenden Abonnements empfangen wird. Zwei wichtige Schritte fehlen jedoch noch, um eine problemlose Antwort zu unterstützen. Der erste Schritt besteht in der Einrichtung der Eigenschaft "CorrelationId" von "BrokeredMessage". Die Eigenschaft ist Teil der API. Daher muss ich diese nicht als Teil der Datenklasse oder des Schemas einschließen. Da es sich um eine Zeichenfolge handelt, könnte der Name der Eigenschaft etwas aussagen. Auf jeden Fall muss der Name die Nachricht auf eindeutige Weise beschreiben, sodass eine Antwort mit einer übereinstimmenden CorrelationId keiner anderen Anforderung im System zugeordnet werden kann. Zu Beispielzwecken verwende ich eine GUID. Dies ist jedoch auch in der Praxis ein ziemlich sicheres Verfahren. Hier ist der Code:

BrokeredMessage msg = BrokeredMessage.CreateMessage(data);
// Add props to message for filtering
msg.Properties["Region"] = data.Region;
msg.Properties["RequesterID"] = data.RequesterID;
// Set properties for message
msg.TimeToLive = TimeSpan.FromSeconds(30);
msg.ReplyTo = returnQueueName;
msg.CorrelationId = Guid.NewGuid().ToString();

Die BrokeredMessage verfügt außerdem über eine Eigenschaft namens SessionId, die ich hier nicht verwende. SessionId ist eine weitere Eigenschaft für die logische Gruppierung, die auf der Ebene des Nachrichtenumschlags nützlich ist, da sie die Gruppierung miteinander verwandter Nachrichten ermöglicht. Die Frage ist, wie sich diese Eigenschaft von CorrelationId hinsichtlich des Zwecks unterscheidet. Es gibt zwei Szenarien, in denen SessionId besonders nützlich sein kann. Das erste Szenario besteht in einem System, in dem mehrere Daemons verschiedene, miteinander verwandte Anforderungen senden. Die Eigenschaft CorrelationId würde für die Weiterleitung der Antwort an den anfordernden Prozessor verwendet. Die Eigenschaft SessionId würde für die Gruppierung aller über die einzelnen Verarbeitungsknoten gesendeten und empfangenen Nachrichten verwendet. Eine solche Gruppierung ist nützlich, um den Status der Verarbeitung in einem System zu Analyse- und Debuggingzwecken zu ermitteln. Aus den gleichen Gründen handelt es sich hierbei um ein nützliches Konstrukt in Systemen, die im Rahmen übergeordneter Prozesse zahlreiche Anforderungen senden (z. B. Kaufvorgänge, Inventarprüfungen, Zahlungsbestätigungen, Versandvorgänge usw.), für die jedoch der genaue Fluss und die genaue Zeitplanung nicht garantiert sind.

Nach der Festlegung von CorrelationId für die ausgehende Nachricht muss ich als Nächstes die Eigenschaft "ReplyTo" festlegen. Ich könnte ein anderes Thema erstellen und alle Geschäfte auf Antworten überwachen lassen oder eine einzelne Warteschlange verwenden. Dies würde jedoch unnötigen Datenverkehr verursachen und in Zeiten hoher Auslastungen ziemlich wahrscheinlich zu Engpässen führen. Daher ist das Erstellen einer Antwortwarteschlange zum Zeitpunkt der Anforderung und die Benachrichtigung des Empfängers über deren Speicherort sinnvoll. Da es sich um eine Zeichenfolge handelt, ist jeder Name möglich. Ich schlage jedoch einen vollständig qualifizierten Namen vor, um Verwirrung oder Konflikte in zukünftigen Versionen der Software zu vermeiden. Sie könnten mit dem Namen der Warteschlange beginnen. Während Wartungen und Erweiterungen könnte dies jedoch zu Verwirrung führen, wenn das System beginnt, mehrere Dienstbus-Namespaces und untergeordnete Warteschlangen zu unterstützen. Zusätzlich ist eine vollständig qualifizierte Adresse besser für Empfänger geeignet, die das Microsoft .NET Framework nicht verwenden.

Die letzten beiden Schritte vor dem Absenden der Nachricht sind die Erstellung der Antwortwarteschlange und das Starten eines Timers, um nach Antworten zu suchen. Ich beginne mit der Methode "GetQueue". Als dieser Artikel geschrieben wurde, stellt die Dokumentation (bit.ly/pnByFw) für "GetQueue" für den Rückgabewert fest, dass "type" Microsoft.ServiceBus.Messaging.Queue, einem Warteschlangenhandle für die Warteschlange oder, wenn die Warteschlange im Dienstnamespace nicht vorhanden ist, Null entspricht. Dies ist jedoch nicht der Fall. In Wirklichkeit wird eine Ausnahme ausgelöst:

// Check if-exists and create response queue
try
{
  returnQ = this.ServiceBusNSClient.GetQueue(returnQueueName);
}
catch (System.Exception ex)
{
  Debug.WriteLine(ex.Message);
}
if (returnQ == null)
{
  returnQ = this.ServiceBusNSClient.CreateQueue(returnQueueName);
}
checkQTimer.Enabled = true;

Daher habe ich die Methode GetQueue in einem Try-Catch-Abschnitt eingeschlossen und von da die Programmierung fortgesetzt. Mein Beispielcode behebt typischerweise den Fehler. Nach dem Erstellen der Warteschlange weise ich diese einer Variablen zu, sodass ich auf diese für die Prüfung der Warteschlange verweisen und anschließend den Timer aktivieren kann, den ich beim Starten der Anwendung eingerichtet habe.

Aktualisieren des Empfängers und Antworten

Nachdem ich für den Absender die korrekten Benachrichtigungen eingerichtet habe, muss ich einige Ergänzungen durchführen, um auf die Anforderungen antworten zu können. Ich verwende eine zum größten Teil fest kodierte Antwort, liste jedoch die Nachrichten in einem Raster auf, sodass ich die Nachricht, auf die ich antworten werde, auswählen kann. Ich habe einen Ereignismechanismus für die Benachrichtigung der Oberfläche über neue Nachrichten eingerichtet. Nach dem Erhalt einer Nachricht dient die Geschäfts-ID des aktuellen Empfängers als Antwortsender. Anschließend wird die Oberfläche über die Nachricht zur Anzeige benachrichtigt:

recvSuccess = msgReceiver.TryReceive(TimeSpan.FromSeconds(3), out NewMsg);
if (recvSuccess)
{
  CreateInquiry.InventoryQueryData qryData = 
    NewMsg.GetBody<CreateInquiry.InventoryQueryData>();
  NewMsg.Properties.Add("ResponderId", this.StoreID);
  EventContainer.RaiseMessageReceived(NewMsg);
}

Innerhalb der Oberfläche habe ich das Ereignis abonniert, das die Nachricht empfangen wird. Anschließend übergebe ich dieses an eine Methode zur Aktualisierung der Oberflächenelemente, die die erforderliche InvokeRequired-Methodenprüfung durchführen werden:

void EventContainer_MessageReceivedEvent(object sender, MessagingEventArgs e)
{
  BrokeredMessage msg = (BrokeredMessage)e.MessageData;
  var RequestObject =
    msg.GetBody<CreateInquiry.InventoryQueryData>();
  RequestObject.ResponderID = msg.Properties["ResponderId"].ToString();
  this.ReceivedMessages.Add(RequestObject);
  UpdateUIElements(msg.MessageId, RequestObject);
}

Da der Code für das Abrufen der Nachrichten für das Thema und das Aktualisieren der Oberfläche fertig gestellt ist, kann ich nun die eingehenden Nachrichten visualisieren (siehe Abbildung 2).

The Inventory Topic Monitor
Abbildung 2 Der Inventar-Themamonitor

Diese Oberfläche (es ist offensichtlich, dass ich nicht aufgrund meiner Begabungen für die Gestaltung von Benutzerumgebungen eingestellt wurde) ermöglicht mir die Auswahl einer Nachricht und deren Beantwortung mit einer Menge, die im Textfeld unten festgelegt wird. Der Antwortcode ist eine ziemlich einfache und kurze Angelegenheit, da ich nur ein kurzes Stück Code hinzufügen muss, um die Menge für das Nachrichtenobjekt festzulegen und dieses anschließend an die Warteschlange zu senden, die in der Eigenschaft "ReplyTo" der Nachricht festgelegt ist.

In diesem Beispiel füge ich vom Thema empfangene Nachrichten einem dem Objekt Dictionary<string, BrokeredMessage> hinzu, wobei ich "BrokeredMessage.MessageId" als Schlüssel für das Wörterbuch verwende. Ich verwende einfach die Nachrichten-ID aus dem Raster, um diese aus dem Wörterbuch abzurufen, und erstelle anschließend eine neue BrokeredMessage, indem ich die gleiche CorrelationId zuweise und den Eigenschaften "ResponderId" und "Quantity" Werte zuweise. Genau wie in dem Fall, wenn Nachrichten vom Thema empfangen werden, verwende ich MessagingFactory, um einen QueueClient zu erstellen und von diesem Objekt einen MessageSender zu erstellen:

// Send message
  SharedSecretCredential credential =
    TransportClientCredentialBase.CreateSharedSecretCredential(
    Constants.issuerName, Constants.issuerKey);
  Uri sbUri = ServiceBusEnvironment.CreateServiceUri(
           "sb", Constants.sbNamespace, String.Empty);
MessagingFactory Factory =  MessagingFactory.Create(sbUri, credential);
QueueClient QClient = Factory.CreateQueueClient(msg.ReplyTo);
MessageSender Sender = QClient.CreateSender();
Sender.Send(ResponseMsg);

Dies sendet die Antwort zurück, sodass wir den Schwerpunkt zurück auf die ursprüngliche Anwendung verlegen müssen, um die Antwort zu verarbeiten.

Empfangen der Antwort

In diesem Beispiel möchte ich die Antworten nur aus der Warteschlange abrufen. Da der Code für das Senden der Anforderung geändert wurde, um die Überwachung der ReplyTo-Warteschlange zu starten, muss ich den tatsächlichen Code hinzufügen, um die Warteschlange zu überprüfen. Ich beginne, indem ich einen MessageReceiver erstelle und eine einfache While-Schleife einrichte, um alle verfügbaren Nachrichten in der Warteschlange dieses Mal abzurufen:

void checkQTimer_Tick(object sender, EventArgs e)
{
  MessageReceiver receiver =
    QClient.CreateReceiver(ReceiveMode.ReceiveAndDelete);
  BrokeredMessage NewMessage = null;
  while (receiver.TryReceive(TimeSpan.FromSeconds(1), out NewMessage))
  {
    InquiryResponse resp = new InquiryResponse();
    resp.CorrelationID = NewMessage.CorrelationId;
    resp.Message = NewMessage;
    InquiryResponses.Add(resp);
  }
 }

Wie zuvor, verwende ich die Methode "TryReceive". Dies funktioniert zwar für dieses Beispiel. Für eine echte Oberfläche würde ich dies jedoch etwas anders handhaben, da die Methode den Thread blockiert. Sie sollte daher besser auf einem anderen Thread ausgeführt werden. Ich möchte die gesamte BrokeredMessage aus der Liste mittels CorrelationId abrufen. Daher habe ich ein Objekt erstellt und verwende CorrelationId, um das Objekt später zu filtern. Ich möchte BrokeredMessage als Nachricht, da ich Daten abrufen möchte, die Teil des BrokeredMessage-Umschlags sind, der nun mein InquiryData-Objekt einkapselt (siehe Abbildung 3).

Inquiry Request and Responses
Abbildung 3: Anfrageanforderung und Antworten

Mittels der Änderung des SelectedIndexChanged-Codes von ListBox verwende ich einfach die CorrelationId, die ich als Objekt verwendet habe, um die relevanten Antworten aus der Liste abzurufen, die erstellt wird, während die Antworten in der Warteschlange eingehen:

string correlationId =
  lbRequests.SelectedItem.ToString();
List<InquiryResponse> CorrelatedList =
  InquiryResponses.FindAll(
  delegate(InquiryResponse resp)
{
  return resp.CorrelationID == correlationId;
});

Der letzte Schritt besteht darin, die Antworten dem DataGrid hinzuzufügen, um die Geschäfte anzuzeigen, die auf meine Inventaranfrage geantwortet haben. Für meine ziemlich grobe Oberfläche verwende ich GUIDs. Ich hoffe jedoch, dass es offensichtlich ist, dass diese durch benutzerfreundliche Beschreibungen ersetzt würden, sodass die GUIDs und IDs den Seiten für erweiterte Details vorbehalten sind.

Überprüfung

Als ehemaliger Militärangehöriger haben sich eine Menge Dinge in mein Gedächtnis eingebrannt. Ein Beispiel ist die "militärische Erziehung". Diese besteht darin, dass ich Ihnen sage, was wir tun werden, wir dies dann auch tun, und ich Ihnen anschließend sage, was wir getan haben. Ich befinde mich nun in dem Teil, in dem ich Ihnen sage, was wir getan haben. Ich habe diese Artikel ursprünglich als einzigen Artikel schreiben wollen, dann aber festgestellt, dass die Materie zu umfangreich für einen einzigen Artikel ist. Daher habe ich den Artikel in zwei Teile aufteilt: Im ersten Teil behandele ich das Absenden der Anforderung, und im zweiten Teil behandele ich, wie die Antwort abgerufen wird. Mein Ziel war, die grundlegenden Funktionen des Windows Azure AppFabric-Dienstbus vorzustellen und Ihnen einen allgemeinen Eindruck davon zu geben, wie dieser verwendet wird. Ich gebe gerne Kontext für Technologie. In diesem Fall habe ich als Kontext eine Produktabfrage zwischen verschiedenen Geschäften gewählt, da dies ein realistisches Szenario und die Verwendung einer Mischung aus Themen und Warteschlangen sinnvoll ist.  

Zum Zeitpunkt, als dieser Artikel verfasst wurde, wurde die CTP-Version des Windows Azure App­Fabric ServiceBus für den Mai veröffentlicht. Informationen zu dieser finden Sie unter bit.ly/it5Wo2. Außerdem können Sie an den Foren unter bit.ly/oJyCYx teilnehmen.

Joseph Fultz ist Softwarearchitekt bei Hewlett-Packard Co. und Mitglied der HP.com Global IT. Zuvor war er Softwarearchitekt bei Microsoft und arbeitete gemeinsam mit dessen wichtigsten Unternehmens- und ISV-Kunden an der Definition von Architekturen und dem Entwurf von Lösungen.

Unser Dank gilt dem folgenden technischen Experten für die Durchsicht dieses Artikels: Jim Keane