AMQP 1.0 in azure Service Bus- en Event Hubs-protocolhandleiding

Het Advanced Message Queueing Protocol 1.0 is een gestandaardiseerd frame- en overdrachtsprotocol voor asynchroon, veilig en betrouwbaar overdragen van berichten tussen twee partijen. Het is het primaire protocol van Azure Service Bus Messaging en Azure Event Hubs.

AMQP 1.0 is het resultaat van brede samenwerking in de branche die middlewareleveranciers, zoals Microsoft en Red Hat, samenbrengt met veel middlewaregebruikers voor berichten zoals JP Morgan Chase die de financiële dienstverlening vertegenwoordigen. Het technische standaardisatieforum voor het AMQP-protocol en extensiespecificaties is OASIS en heeft formele goedkeuring bereikt als een internationale norm als ISO/IEC 19494:2014.

Doelstellingen

In dit artikel vindt u een overzicht van de kernconcepten van de AMQP 1.0-berichtspecificatie, samen met de uitbreidingsspecificaties die zijn ontwikkeld door het Technische Comité van OASIS AMQP en wordt uitgelegd hoe Azure Service Bus deze specificaties implementeert en voortbouwt.

Het doel is voor elke ontwikkelaar die een bestaande AMQP 1.0-clientstack gebruikt op elk platform om via AMQP 1.0 met Azure Service Bus te kunnen communiceren.

Algemene AMQP 1.0-stacks, zoals Apache Qpid Proton of AMQP.NET Lite, implementeren alle kern-AMQP 1.0-protocolelementen, zoals sessies of koppelingen. Deze fundamentele elementen worden soms verpakt met een API op een hoger niveau; Apache Proton biedt zelfs twee, de imperatieve Messenger-API en de reactieve Reactor-API.

In de volgende discussie gaan we ervan uit dat het beheer van AMQP-verbindingen, sessies en koppelingen en de verwerking van frameoverdrachten en stroombeheer worden verwerkt door de respectieve stack (zoals Apache Proton-C) en dat er niet veel nodig is als er specifieke aandacht is van toepassingsontwikkelaars. We gaan er abstract van uit dat er enkele API-primitieven bestaan, zoals de mogelijkheid om verbinding te maken en om een vorm van afzender- en ontvangerabstractieobjecten te maken, die vervolgens een vorm van send() en receive() bewerkingen hebben.

Bij het bespreken van geavanceerde mogelijkheden van Azure Service Bus, zoals het bladeren door berichten of het beheer van sessies, worden deze functies uitgelegd in AMQP-termen, maar ook als een gelaagde pseudo-implementatie bovenop deze veronderstelde API-abstractie.

Wat is AMQP?

AMQP is een frame- en overdrachtsprotocol. Frames betekent dat het structuur biedt voor binaire gegevensstromen die in beide richtingen van een netwerkverbinding stromen. De structuur biedt een afbakening voor afzonderlijke gegevensblokken, frames genoemd, die moeten worden uitgewisseld tussen de verbonden partijen. De overdrachtsmogelijkheden zorgen ervoor dat beide communicerende partijen een gedeeld begrip kunnen vaststellen over wanneer frames moeten worden overgedragen en wanneer overdrachten als volledig worden beschouwd.

In tegenstelling tot eerdere verlopen conceptversies van de AMQP-werkgroep die nog steeds worden gebruikt door een paar berichtbrokers, schrijft het uiteindelijke en gestandaardiseerde AMQP 1.0-protocol van de werkgroep de aanwezigheid van een berichtenbroker of een bepaalde topologie voor entiteiten binnen een berichtenbroker niet voor.

Het protocol kan worden gebruikt voor symmetrische peer-to-peercommunicatie, voor interactie met berichtbrokers die wachtrijen ondersteunen en entiteiten publiceren/abonneren, zoals Azure Service Bus wel doet. Het kan ook worden gebruikt voor interactie met berichteninfrastructuur waarbij de interactiepatronen verschillen van normale wachtrijen, zoals het geval is met Azure Event Hubs. Een Event Hub fungeert als een wachtrij wanneer er gebeurtenissen naar de hub worden verzonden, maar fungeert meer als een seriële opslagservice wanneer gebeurtenissen worden gelezen; het lijkt enigszins op een tapestation. De client kiest een offset in de beschikbare gegevensstroom en verwerkt vervolgens alle gebeurtenissen van die offset naar de meest recente beschikbare.

Het AMQP 1.0-protocol is ontworpen om uitbreidbaar te zijn, waardoor verdere specificaties de mogelijkheden ervan kunnen verbeteren. De drie uitbreidingsspecificaties die in dit document worden besproken, illustreren dit. Voor communicatie via een bestaande HTTPS/WebSockets-infrastructuur kan het configureren van de systeemeigen AMQP TCP-poorten lastig zijn. Een bindingsspecificatie definieert hoe AMQP moet worden gelaagd via WebSockets. Voor interactie met de berichteninfrastructuur op aanvraag-/antwoordmode voor beheerdoeleinden of voor geavanceerde functionaliteit definieert de AMQP-beheerspecificatie de vereiste basisinteractieprimitieven. Voor integratie van federatief autorisatiemodel definieert de op AMQP claims gebaseerde beveiligingsspecificatie hoe autorisatietokens worden gekoppeld aan koppelingen en vernieuwen.

Eenvoudige AMQP-scenario's

In deze sectie wordt het basisgebruik van AMQP 1.0 met Azure Service Bus uitgelegd, waaronder het maken van verbindingen, sessies en koppelingen, en het overdragen van berichten naar en van Service Bus-entiteiten, zoals wachtrijen, onderwerpen en abonnementen.

De meest gezaghebbende bron voor meer informatie over hoe AMQP werkt, is de AMQP 1.0-specificatie, maar de specificatie is geschreven om de implementatie nauwkeurig te begeleiden en niet om het protocol te leren. Deze sectie is gericht op het introduceren van zoveel terminologie als nodig is voor het beschrijven van hoe Service Bus AMQP 1.0 gebruikt. Voor een uitgebreidere inleiding tot AMQP, evenals een bredere bespreking van AMQP 1.0, kunt u deze videocursus bekijken.

Verbinding maken ions en sessies

AMQP roept de containers voor communicerende programma's aan; deze bevatten knooppunten, die de communicerende entiteiten binnen die containers zijn. Een wachtrij kan een dergelijk knooppunt zijn. AMQP maakt multiplexing mogelijk, zodat één verbinding kan worden gebruikt voor veel communicatiepaden tussen knooppunten; Een toepassingsclient kan bijvoorbeeld gelijktijdig van de ene wachtrij ontvangen en verzenden naar een andere wachtrij via dezelfde netwerkverbinding.

Diagram showing Sessions and Connections between containers.

De netwerkverbinding wordt dus verankerd aan de container. Deze wordt gestart door de container in de clientrol die een uitgaande TCP-socketverbinding maakt met een container in de ontvangerrol, die luistert naar en accepteert binnenkomende TCP-verbindingen. De handshake van de verbinding omvat het onderhandelen over de protocolversie, het declareren of onderhandelen van het gebruik van Tls/SSL (Transport Level Security) en een handshake voor verificatie/autorisatie in het verbindingsbereik dat is gebaseerd op SASL.

Voor Azure Service Bus of Azure Event Hubs is het gebruik van TLS altijd vereist. Het ondersteunt verbindingen via TCP-poort 5671, waarbij de TCP-verbinding eerst wordt overlappen met TLS voordat het AMQP-protocol handshake wordt ingevoerd en ook verbindingen via TCP-poort 5672 ondersteunt, waarbij de server onmiddellijk een verplichte upgrade van de verbinding met TLS biedt met behulp van het door AMQP voorgeschreven model. De AMQP WebSockets-binding maakt een tunnel via TCP-poort 443 die vervolgens gelijk is aan AMQP 5671-verbindingen.

Na het instellen van de verbinding en TLS biedt Service Bus twee SASL-mechanismeopties:

  • SASL PLAIN wordt vaak gebruikt voor het doorgeven van gebruikersnaam- en wachtwoordreferenties aan een server. Service Bus heeft geen accounts, maar heeft de naam Gedeelde toegangsbeveiligingsregels, die rechten verlenen en zijn gekoppeld aan een sleutel. De naam van een regel wordt gebruikt als de gebruikersnaam en de sleutel (als base64-gecodeerde tekst) wordt gebruikt als het wachtwoord. De rechten die aan de gekozen regel zijn gekoppeld, bepalen de bewerkingen die zijn toegestaan voor de verbinding.
  • SASL ANONYMOUS wordt gebruikt voor het omzeilen van SASL-autorisatie wanneer de client het CBS-model (claims-based-security) wil gebruiken dat later wordt beschreven. Met deze optie kan een clientverbinding gedurende korte tijd anoniem tot stand worden gebracht, waarbij de client alleen kan communiceren met het CBS-eindpunt en de CBS-handshake moet worden voltooid.

Nadat de transportverbinding tot stand is gebracht, declareren de containers elk de maximale framegrootte die ze willen verwerken. Na een time-out voor inactiviteit wordt de verbinding eenzijdig verbroken als er geen activiteit op de verbinding is.

Ze declareren ook hoeveel gelijktijdige kanalen worden ondersteund. Een kanaal is een unidirectioneel, uitgaand, virtueel overdrachtspad boven op de verbinding. Een sessie neemt een kanaal van elk van de onderling verbonden containers om een bidirectioneel communicatiepad te vormen.

Sessies hebben een stroombeheermodel op basis van vensters; wanneer een sessie wordt gemaakt, declareert elke partij hoeveel frames het wil accepteren in het ontvangstvenster. Als de partijen frames uitwisselen, vullen overgedragen frames dat venster en draagt het over wanneer het venster vol is en totdat het venster opnieuw wordt ingesteld of uitgebreid met behulp van de stroom performatief (performatief is de AMQP-term voor bewegingen op protocolniveau uitgewisseld tussen de twee partijen).

Dit model op basis van vensters is ongeveer vergelijkbaar met het TCP-concept van stroombeheer op basis van vensters, maar op sessieniveau in de socket. Het protocolconcept van het toestaan van meerdere gelijktijdige sessies bestaat, zodat verkeer met hoge prioriteit kan worden gehaast langs beperkt normaal verkeer, zoals op een snelweg express lane.

Azure Service Bus gebruikt momenteel precies één sessie voor elke verbinding. De maximale framegrootte van Service Bus is 262.144 bytes (256-K bytes) voor Service Bus Standard. Het is 1048576 (100 MB) voor Service Bus Premium en Event Hubs. Service Bus legt geen beperkingsvensters op sessieniveau op, maar stelt het venster regelmatig opnieuw in als onderdeel van stroombeheer op koppelingsniveau (zie de volgende sectie).

Verbinding maken ions, kanalen en sessies zijn kortstondig. Als de onderliggende verbinding samenvalt, moeten verbindingen, TLS-tunnel, SASL-autorisatiecontext en sessies opnieuw worden hersteld.

Vereisten voor uitgaande AMQP-poort

Voor clients die AMQP-verbindingen via TCP gebruiken, moeten poorten 5671 en 5672 worden geopend in de lokale firewall. Naast deze poorten is het mogelijk nodig om extra poorten te openen als de functie EnableLinkRedirect is ingeschakeld. EnableLinkRedirect is een nieuwe berichtenfunctie die helpt bij het overslaan van één hop tijdens het ontvangen van berichten, waardoor de doorvoer wordt verhoogd. De client communiceert rechtstreeks met de back-endservice via poortbereik 104XX, zoals wordt weergegeven in de volgende afbeelding.

List of destination ports

Een .NET-client mislukt met een SocketException ('Er is geprobeerd toegang te krijgen tot een socket op een manier die niet is toegestaan door de toegangsmachtigingen') als deze poorten worden geblokkeerd door de firewall. De functie kan worden uitgeschakeld door de instelling EnableAmqpLinkRedirect=false in de verbindingsreeks, waardoor de clients via poort 5671 moeten communiceren met de externe service.

De AMQP WebSocket-binding biedt een mechanisme voor het tunnelen van een AMQP-verbinding via een WebSocket-transport. Met deze binding maakt u een tunnel via tcp-poort 443, die gelijk is aan AMQP 5671-verbindingen. Gebruik AMQP WebSockets als u zich achter een firewall bevindt die TCP-verbindingen blokkeert via poorten 5671, 5672, maar TCP-verbindingen via poort 443 (https) toestaat.

AMQP draagt berichten over via koppelingen. Een koppeling is een communicatiepad dat is gemaakt via een sessie waarmee berichten in één richting kunnen worden overgedragen; de overdrachtsstatusonderhandeling verloopt via de koppeling en bidirectioneel tussen de verbonden partijen.

Screenshot showing a Session carrying a link connection between two containers.

Koppelingen kunnen op elk gewenst moment worden gemaakt door een container en via een bestaande sessie, waardoor AMQP verschilt van vele andere protocollen, waaronder HTTP en MQTT, waarbij de start van overdrachten en overdrachtspad een exclusieve bevoegdheid is van de partij die de socketverbinding maakt.

De koppelings-initiërende container vraagt de tegenovergestelde container om een koppeling te accepteren en kiest een rol van afzender of ontvanger. Daarom kan een container het maken van unidirectionele of bidirectionele communicatiepaden initiëren, waarbij de laatste is gemodelleerd als paren van koppelingen.

Koppelingen worden benoemd en gekoppeld aan knooppunten. Zoals in het begin is aangegeven, zijn knooppunten de communicerende entiteiten in een container.

In Service Bus is een knooppunt direct gelijk aan een wachtrij, een onderwerp, een abonnement of een deadlettersubqueue van een wachtrij of abonnement. De knooppuntnaam die in AMQP wordt gebruikt, is daarom de relatieve naam van de entiteit in de Service Bus-naamruimte. Als een wachtrij een naam myqueueheeft, is dat ook de naam van het AMQP-knooppunt. Een onderwerpabonnement volgt de HTTP-API-conventie door te worden gesorteerd in een resourceverzameling 'abonnementen' en dus heeft een abonnementssubsub in een onderwerp mytopic de naam van het AMQP-knooppunt mytopic/subscriptions/sub.

De verbindende client is ook vereist voor het gebruik van een lokale knooppuntnaam voor het maken van koppelingen; Service Bus is niet prescriptief over deze knooppuntnamen en interpreteert ze niet. AMQP 1.0-clientstacks maken over het algemeen gebruik van een schema om ervoor te zorgen dat deze kortstondige knooppuntnamen uniek zijn binnen het bereik van de client.

Overboekingen

Zodra er een koppeling tot stand is gebracht, kunnen berichten via die koppeling worden overgedragen. In AMQP wordt een overdracht uitgevoerd met een expliciet protocolbeweging (de overdrachtsgebaar ) waarmee een bericht van de afzender naar de ontvanger wordt verplaatst via een koppeling. Een overdracht is voltooid wanneer deze wordt "vereffend", wat betekent dat beide partijen een gedeeld begrip hebben vastgesteld van het resultaat van die overdracht.

A diagram showing a message's transfer between the Sender and Receiver and disposition that results from it.

In het eenvoudigste geval kan de afzender ervoor kiezen om berichten 'vooraf vereffend' te verzenden, wat betekent dat de client niet geïnteresseerd is in het resultaat en de ontvanger geen feedback geeft over het resultaat van de bewerking. Deze modus wordt ondersteund door Service Bus op het niveau van het AMQP-protocol, maar wordt niet weergegeven in een van de client-API's.

Het normale geval is dat berichten ongesetd worden verzonden en dat de ontvanger vervolgens acceptatie of afwijzing aangeeft met behulp van de verwijdering performatief. Afwijzing treedt op wanneer de ontvanger het bericht om welke reden dan ook niet kan accepteren en het afwijzingsbericht informatie bevat over de reden. Dit is een foutstructuur die is gedefinieerd door AMQP. Als berichten worden geweigerd vanwege interne fouten in Service Bus, retourneert de service extra informatie binnen die structuur die kan worden gebruikt voor het verstrekken van diagnostische hints aan ondersteuningsmedewerkers als u ondersteuningsaanvragen indient. U vindt later meer informatie over fouten.

Een speciale vorm van afwijzing is de vrijgegeven toestand, die aangeeft dat de ontvanger geen technisch bezwaar heeft tegen de overdracht, maar ook geen belang heeft bij het vereffenen van de overdracht. Dit geval bestaat bijvoorbeeld wanneer een bericht wordt bezorgd bij een Service Bus-client en de client ervoor kiest het bericht af te laten omdat het niet kan worden uitgevoerd als gevolg van het verwerken van het bericht; de bezorging van het bericht zelf is niet fout. Een variatie van die status is de gewijzigde status, waarmee wijzigingen in het bericht worden toegestaan terwijl het wordt vrijgegeven. Deze status wordt momenteel niet gebruikt door Service Bus.

De AMQP 1.0-specificatie definieert een verdere verwijderingsstatus met de naam ontvangen, die specifiek helpt bij het afhandelen van herstel van koppelingen. Herstel van koppelingen maakt het mogelijk om de status van een koppeling en eventuele in behandeling zijnde leveringen boven op een nieuwe verbinding en sessie te reconstitueren, wanneer de vorige verbinding en sessie verloren zijn gegaan.

Service Bus biedt geen ondersteuning voor herstel van koppelingen; als de client de verbinding met Service Bus verliest met een niet-verbonden berichtoverdracht in behandeling, gaat deze berichtoverdracht verloren en moet de client opnieuw verbinding maken, de koppeling opnieuw maken en de overdracht opnieuw proberen.

Daarom bieden Service Bus en Event Hubs ondersteuning voor 'ten minste één keer' overdracht waarbij de afzender kan worden verzekerd van het bericht dat is opgeslagen en geaccepteerd, maar niet 'precies één keer' overdrachten op AMQP-niveau ondersteunen, waarbij het systeem probeert de koppeling te herstellen en doorgaat met onderhandelen over de bezorgingsstatus om duplicatie van de berichtoverdracht te voorkomen.

Service Bus biedt ondersteuning voor dubbele detectie als optionele functie voor wachtrijen en onderwerpen om mogelijke dubbele verzendingen te compenseren. Bij dubbele detectie worden de bericht-id's van alle binnenkomende berichten vastgelegd tijdens een door de gebruiker gedefinieerd tijdvenster en worden alle berichten die met dezelfde bericht-id's worden verzonden tijdens hetzelfde venster op de achtergrond verwijderd.

Stroombeheer

Naast het model voor stroombeheer op sessieniveau dat eerder is besproken, heeft elke koppeling een eigen stroombeheermodel. Met stroombeheer op sessieniveau wordt de container beschermd tegen te veel frames tegelijk te verwerken. Met stroombeheer op koppelingsniveau wordt de toepassing belast met het aantal berichten dat deze wil verwerken vanaf een koppeling en wanneer.

Screenshot of a log showing Source, Destination, Source Port, Destination Port, and Protocol Name. In the first row the Destination Port 10401 (0x28 A 1) is outlined in black.

Bij een koppeling kunnen overdrachten alleen plaatsvinden wanneer de afzender voldoende koppelingstegoed heeft. Het tegoed van de koppeling is een teller die door de ontvanger is ingesteld met behulp van de stroom die het bereik heeft van een koppeling. Wanneer aan de afzender een koppelingstegoed wordt toegewezen, wordt geprobeerd dat tegoed te gebruiken door berichten af te leveren. Bij elke berichtbezorging wordt het resterende koppelingstegoed met 1 verminderd. Wanneer het koppelingstegoed wordt verbruikt, worden leveringen gestopt.

Wanneer Service Bus de rol van ontvanger heeft, krijgt de afzender direct voldoende tegoed voor koppelingen, zodat berichten onmiddellijk kunnen worden verzonden. Wanneer het koppelingstegoed wordt gebruikt, verzendt Service Bus af en toe een stroom die presteert naar de afzender om het saldo van het koppelingstegoed bij te werken.

In de rol van afzender verzendt Service Bus berichten om eventuele openstaande koppelingstegoeden te gebruiken.

Een 'receive'-aanroep op API-niveau vertaalt zich in een stroom die door de client naar Service Bus wordt verzonden. Service Bus verbruikt dat tegoed door het eerste beschikbare, ontgrendelde bericht uit de wachtrij te nemen, te vergrendelen en over te dragen. Als er geen bericht beschikbaar is voor levering, blijft een openstaand tegoed via een koppeling met die specifieke entiteit geregistreerd in de volgorde van aankomst en worden berichten vergrendeld en overgedragen zodra ze beschikbaar komen, om een openstaand tegoed te gebruiken.

De vergrendeling van een bericht wordt vrijgegeven wanneer de overdracht wordt geregeld in een van de terminalstatussen die zijn geaccepteerd, geweigerd of vrijgegeven. Het bericht wordt verwijderd uit Service Bus wanneer de terminalstatus wordt geaccepteerd. Het blijft in Service Bus en wordt geleverd aan de volgende ontvanger wanneer de overdracht een van de andere statussen bereikt. Service Bus verplaatst het bericht automatisch naar de wachtrij voor deadletters van de entiteit wanneer het maximale aantal toegestane bezorgingen voor de entiteit bereikt vanwege herhaalde afwijzingen of releases.

Hoewel de Service Bus-API's vandaag nog geen dergelijke optie beschikbaar maken, kan een CLIENT met een AMQP-protocol op een lager niveau het koppelingskredietmodel gebruiken om de interactie 'pull-style' om te zetten van het uitgeven van één eenheid tegoed voor elke ontvangstaanvraag in een 'push-style'-model door een groot aantal koppelingstegoeden uit te geven en vervolgens berichten te ontvangen zodra ze beschikbaar zijn zonder verdere interactie. Push wordt ondersteund via de eigenschapsinstellingen ServiceBusProcessor.PrefetchCount of ServiceBusReceiver.PrefetchCount . Wanneer ze niet nul zijn, gebruikt de AMQP-client deze als koppelingstegoed.

In deze context is het belangrijk om te begrijpen dat de klok voor het verlopen van de vergrendeling van het bericht in de entiteit begint wanneer het bericht wordt opgehaald uit de entiteit, niet wanneer het bericht op de draad wordt geplaatst. Wanneer de client aangeeft dat de gereedheid voor het ontvangen van berichten door koppelingstegoed wordt uitgegeven, wordt ervan uitgegaan dat deze berichten actief worden opgehaald via het netwerk en klaar zijn om ze af te handelen. Anders is de berichtvergrendeling mogelijk verlopen voordat het bericht zelfs wordt bezorgd. Het gebruik van beheer van een koppelingstegoedstroom moet rechtstreeks overeenkomen met de onmiddellijke gereedheid voor het verwerken van beschikbare berichten die naar de ontvanger worden verzonden.

Kortom, de volgende secties bieden een schematisch overzicht van de performatieve stroom tijdens verschillende API-interacties. In elke sectie wordt een andere logische bewerking beschreven. Sommige van deze interacties kunnen 'lui' zijn, wat betekent dat ze alleen worden uitgevoerd wanneer dat nodig is. Het maken van een afzender van een bericht veroorzaakt mogelijk geen netwerkinteractie totdat het eerste bericht wordt verzonden of aangevraagd.

De pijlen in de volgende tabel geven de richting van de performatieve stroom weer.

Berichtontvanger maken

Klant Service Bus
--> attach(<br/>name={link name},<br/>handle={numeric handle},<br/>role=**receiver**,<br/>source={entity name},<br/>target={client link ID}<br/>) Client koppelt aan entiteit als ontvanger
Service Bus-antwoorden die het einde van de koppeling koppelen <-- attach(<br/>name={link name},<br/>handle={numeric handle},<br/>role=**sender**,<br/>source={entity name},<br/>target={client link ID}<br/>)

Afzender van bericht maken

Klant Service Bus
--> attach(<br/>name={link name},<br/>handle={numeric handle},<br/>role=**sender**,<br/>source={client link ID},<br/>target={entity name}<br/>) Geen actie
Geen actie <-- attach(<br/>name={link name},<br/>handle={numeric handle},<br/>role=**receiver**,<br/>source={client link ID},<br/>target={entity name}<br/>)

Afzender van bericht maken (fout)

Klant Service Bus
--> attach(<br/>name={link name},<br/>handle={numeric handle},<br/>role=**sender**,<br/>source={client link ID},<br/>target={entity name}<br/>) Geen actie
Geen actie <-- attach(<br/>name={link name},<br/>handle={numeric handle},<br/>role=**receiver**,<br/>source=null,<br/>target=null<br/>)<br/><br/><-- detach(<br/>handle={numeric handle},<br/>closed=**true**,<br/>error={error info}<br/>)

Berichtontvanger/afzender sluiten

Klant Service Bus
--> detach(<br/>handle={numeric handle},<br/>closed=**true**<br/>) Geen actie
Geen actie <-- detach(<br/>handle={numeric handle},<br/>closed=**true**<br/>)

Verzenden (geslaagd)

Klant Service Bus
--> transfer(<br/>delivery-id={numeric handle},<br/>delivery-tag={binary handle},<br/>settled=**false**,,more=**false**,<br/>state=**null**,<br/>resume=**false**<br/>) Geen actie
Geen actie <-- disposition(<br/>role=receiver,<br/>first={delivery ID},<br/>last={delivery ID},<br/>settled=**true**,<br/>state=**accepted**<br/>)

Verzenden (fout)

Klant Service Bus
--> transfer(<br/>delivery-id={numeric handle},<br/>delivery-tag={binary handle},<br/>settled=**false**,,more=**false**,<br/>state=**null**,<br/>resume=**false**<br/>) Geen actie
Geen actie <-- disposition(<br/>role=receiver,<br/>first={delivery ID},<br/>last={delivery ID},<br/>settled=**true**,<br/>state=**rejected**(<br/>error={error info}<br/>)<br/>)

Ontvangen

Klant Service Bus
--> flow(<br/>link-credit=1<br/>) Geen actie
Geen actie < transfer(<br/>delivery-id={numeric handle},<br/>delivery-tag={binary handle},<br/>settled=**false**,<br/>more=**false**,<br/>state=**null**,<br/>resume=**false**<br/>)
--> disposition(<br/>role=**receiver**,<br/>first={delivery ID},<br/>last={delivery ID},<br/>settled=**true**,<br/>state=**accepted**<br/>) Geen actie

Ontvangen van meerdere berichten

Klant Service Bus
--> flow(<br/>link-credit=3<br/>) Geen actie
Geen actie < transfer(<br/>delivery-id={numeric handle},<br/>delivery-tag={binary handle},<br/>settled=**false**,<br/>more=**false**,<br/>state=**null**,<br/>resume=**false**<br/>)
Geen actie < transfer(<br/>delivery-id={numeric handle+1},<br/>delivery-tag={binary handle},<br/>settled=**false**,<br/>more=**false**,<br/>state=**null**,<br/>resume=**false**<br/>)
Geen actie < transfer(<br/>delivery-id={numeric handle+2},<br/>delivery-tag={binary handle},<br/>settled=**false**,<br/>more=**false**,<br/>state=**null**,<br/>resume=**false**<br/>)
--> disposition(<br/>role=receiver,<br/>first={delivery ID},<br/>last={delivery ID+2},<br/>settled=**true**,<br/>state=**accepted**<br/>) Geen actie

Berichten

In de volgende secties wordt uitgelegd welke eigenschappen van de standaard AMQP-berichtsecties worden gebruikt door Service Bus en hoe deze worden toegewezen aan de Service Bus-API-set.

Elke eigenschap die door de toepassing moet worden gedefinieerd, moet worden toegewezen aan de kaart van application-properties AMQP.

Veldnaam Gebruik API-naam
Duurzaam - -
priority - -
ttl Time to live voor dit bericht TimeToLive
first-acquirer - -
leveringsaantal - DeliveryCount

properties

Veldnaam Gebruik API-naam
message-id Toepassingsgedefinieerde, vrije-formulier-id voor dit bericht. Wordt gebruikt voor dubbele detectie. Messageid
user-id Door de toepassing gedefinieerde gebruikers-id, niet geïnterpreteerd door Service Bus. Niet toegankelijk via de Service Bus-API.
tot Door de toepassing gedefinieerde doel-id, niet geïnterpreteerd door Service Bus. Tot
subject Toepassingsgedefinieerde berichtdoel-id, niet geïnterpreteerd door Service Bus. Onderwerp
beantwoorden Door de toepassing gedefinieerde indicator voor het antwoordpad, niet geïnterpreteerd door Service Bus. ReplyTo
correlatie-id Toepassingsgedefinieerde correlatie-id, niet geïnterpreteerd door Service Bus. CorrelationId
inhoudstype Toepassingsgedefinieerde inhoudstypeindicator voor de hoofdtekst, niet geïnterpreteerd door Service Bus. Contenttype
inhoudscodering Toepassingsgedefinieerde inhoudscoderingsindicator voor de hoofdtekst, niet geïnterpreteerd door Service Bus. Niet toegankelijk via de Service Bus-API.
absolute verlooptijd Declareert op welk absolute chatbericht het bericht verloopt. Genegeerd bij invoer (header TTL wordt waargenomen), gezaghebbend voor uitvoer. Niet toegankelijk via de Service Bus-API.
aanmaaktijd Declareert op welk moment het bericht is gemaakt. Niet gebruikt door Service Bus Niet toegankelijk via de Service Bus-API.
groeps-id Door de toepassing gedefinieerde id voor een gerelateerde set berichten. Wordt gebruikt voor Service Bus-sessies. Sessionid
groepsvolgorde Teller waarmee het relatieve volgnummer van het bericht in een sessie wordt geïdentificeerd. Genegeerd door Service Bus. Niet toegankelijk via de Service Bus-API.
reply-to-group-id - ReplyToSessionId

Berichtaantekeningen

Er zijn enkele andere service bus-berichteigenschappen, die geen deel uitmaken van AMQP-berichteigenschappen en die worden doorgegeven als MessageAnnotations op het bericht.

Kaartsleutel voor aantekeningen Gebruik API-naam
x-opt-scheduled-enqueue-time Declareert op welk moment het bericht moet worden weergegeven op de entiteit ScheduledEnqueueTime
x-opt-partition-key Toepassingsgedefinieerde sleutel die bepaalt in welke partitie het bericht moet terechtkomen. PartitionKey
x-opt-via-partition-key Toepassingsgedefinieerde partitiesleutelwaarde wanneer een transactie wordt gebruikt voor het verzenden van berichten via een overdrachtswachtrij. TransactionPartitionKey
x-opt-enqueued-time Door de service gedefinieerde UTC-tijd die de werkelijke tijd aangeeft waarop het bericht wordt verzonden. Genegeerd bij invoer. EnqueuedTime
x-opt-sequence-number Door de service gedefinieerd uniek nummer dat is toegewezen aan een bericht. SequenceNumber
x-opt-offset Door de service gedefinieerd volgnummer van het bericht. EnqueuedSequenceNumber
x-opt-locked-until Service gedefinieerd. De datum en tijd totdat het bericht wordt vergrendeld in de wachtrij/het abonnement. LockedUntil
x-opt-deadletter-source Service gedefinieerd. Als het bericht wordt ontvangen uit de wachtrij met dode brieven, vertegenwoordigt het de bron van het oorspronkelijke bericht. DeadLetterSource

Transactiemogelijkheid

Een transactie groepeert twee of meer bewerkingen in een uitvoeringsbereik. Een dergelijke transactie moet er van nature voor zorgen dat alle bewerkingen die tot een bepaalde groep bewerkingen behoren, gezamenlijk slagen of mislukken. De bewerkingen worden gegroepeerd op een id txn-id.

Voor transactionele interactie fungeert de client als een transaction controller , waarmee de bewerkingen worden gecontroleerd die moeten worden gegroepeerd. Service Bus Service fungeert als een transactional resource en voert werkzaamheden uit zoals aangevraagd door de transaction controller.

De client en service communiceren via een control link , die door de client tot stand wordt gebracht. De declare berichten discharge worden door de controller verzonden via de besturingskoppeling om respectievelijk transacties toe te wijzen en te voltooien (ze vertegenwoordigen niet de afbakening van transactionele werkzaamheden). Het werkelijke verzenden/ontvangen wordt niet uitgevoerd op deze koppeling. Elke aangevraagde transactionele bewerking wordt expliciet aangeduid met de gewenste txn-id en kan daarom plaatsvinden op elke koppeling op de Verbinding maken ion. Als de koppeling naar het besturingselement wordt gesloten terwijl er niet-ontladen transacties zijn gemaakt, worden alle dergelijke transacties onmiddellijk teruggedraaid en worden pogingen om verdere transactionele werkzaamheden uit te voeren, tot fouten geleid. Berichten op de koppeling voor besturingselementen mogen niet vooraf worden vereffend.

Elke verbinding moet een eigen besturingskoppeling initiëren om transacties te kunnen starten en beëindigen. De service definieert een speciaal doel dat fungeert als een coordinator. De client/controller brengt een besturingselementkoppeling naar dit doel tot stand. Besturingskoppeling bevindt zich buiten de grens van een entiteit, dat wil gezegd, dezelfde besturingskoppeling kan worden gebruikt om transacties voor meerdere entiteiten te initiëren en te lossen.

Een transactie starten

Transactioneel werk starten. de verwerkingsverantwoordelijke moet een txn-id van de coördinator verkrijgen. Dit doet u door een declare typebericht te verzenden. Als de declaratie is geslaagd, reageert de coördinator met een verwijderingsresultaat dat de toegewezen txn-idheeft.

Client (controller) Richting Service Bus (coördinator)
attach(
name={link name},
... ,
role=sender,
target=Coördinator
)
------>
<------ attach(
name={link name},
... ,
target=Coordinator()
)
transfer(
delivery-id=0, ...)
{ AmqpValue (Declare()}
------>
<------ verwijdering(
first=0, last=0,
state=Declared(
txn-id={transaction ID}
))

Een transactie ontladen

De controller sluit het transactionele werk af door een discharge bericht naar de coördinator te sturen. De controller geeft aan dat het transactionele werk wil doorvoeren of terugdraaien door de fail vlag op de ontladingsbody in te stellen. Als de coördinator de ontlading niet kan voltooien, wordt het bericht afgewezen met dit resultaat met transaction-errorhet resultaat .

Opmerking: fail=true verwijst naar terugdraaien van een transactie en fail=false verwijst naar Commit.

Client (controller) Richting Service Bus (coördinator)
transfer(
delivery-id=0, ...)
{ AmqpValue (Declare()}
------>
<------ verwijdering(
first=0, last=0,
state=Declared(
txn-id={transaction ID}
))
. . .
Transactioneel werk
op andere koppelingen
. . .
transfer(
delivery-id=57, ...)
{ AmqpValue (
Ontlading(txn-id=0,
fail=false)
}
------>
<------ verwijdering(
first=57, last=57,
state=Accepted())

Een bericht in een transactie verzenden

Alle transactionele werkzaamheden worden uitgevoerd met de transactionele leveringsstatus transactional-state die de txn-id draagt. Bij het verzenden van berichten wordt de transactionele status uitgevoerd door het overdrachtskader van het bericht.

Client (controller) Richting Service Bus (coördinator)
transfer(
delivery-id=0, ...)
{ AmqpValue (Declare()}
------>
<------ verwijdering(
first=0, last=0,
state=Declared(
txn-id={transaction ID}
))
transfer(
handle=1,
delivery-id=1,
state=
TransactionalState(
txn-id=0)
)
{ payload }
------>
<------ verwijdering(
first=1, last=1,
state=TransactionalState(
txn-id=0,
outcome=Accepted()
))

Een bericht in een transactie verwijderen

Berichtverschiking bevat bewerkingen zoals CompleteDeadLetter / DeferAbandon / / . Als u deze bewerkingen binnen een transactie wilt uitvoeren, geeft u de transactional-state bewerking door aan de verwijdering.

Client (controller) Richting Service Bus (coördinator)
transfer(
delivery-id=0, ...)
{ AmqpValue (Declare()}
------>
<------ verwijdering(
first=0, last=0,
state=Declared(
txn-id={transaction ID}
))
<------ transfer(
handle=2,
delivery-id=11,
state=null)
{ payload }
verwijdering(
first=11, last=11,
state=TransactionalState(
txn-id=0,
outcome=Accepted()
))
------>

Geavanceerde Service Bus-mogelijkheden

In deze sectie worden geavanceerde mogelijkheden van Azure Service Bus behandeld die zijn gebaseerd op conceptuitbreidingen voor AMQP, die momenteel worden ontwikkeld in het Technical Committee for AMQP van OASIS. Service Bus implementeert de nieuwste versies van deze concepten en neemt wijzigingen aan die zijn geïntroduceerd omdat deze concepten de standaardstatus bereiken.

Notitie

Geavanceerde bewerkingen van Service Bus Messaging worden ondersteund via een aanvraag-/antwoordpatroon. De details van deze bewerkingen worden beschreven in het artikel AMQP 1.0 in Service Bus: bewerkingen op basis van aanvragen en antwoorden.

AMQP-beheer

De AMQP-beheerspecificatie is de eerste van de conceptextensies die in dit artikel worden besproken. Deze specificatie definieert een set protocollen die boven op het AMQP-protocol zijn gelaagd, waarmee beheerinteracties met de berichteninfrastructuur via AMQP mogelijk zijn. De specificatie definieert algemene bewerkingen, zoals maken, lezen, bijwerken en verwijderen voor het beheren van entiteiten in een berichteninfrastructuur en een set querybewerkingen.

Voor al deze gebaren is een interactie tussen de client en de berichteninfrastructuur vereist. Daarom definieert de specificatie hoe dat interactiepatroon boven op AMQP moet worden gemodelleerd: de client maakt verbinding met de berichteninfrastructuur, initieert een sessie en maakt vervolgens een paar koppelingen. Op de ene koppeling fungeert de client als afzender en op de andere als ontvanger, waardoor een paar koppelingen worden gemaakt die als bidirectioneel kanaal kunnen fungeren.

Logische bewerking Klant Service Bus
Antwoordpad voor aanvraag maken --> attach(<br/>name={*link name*},<br/>handle={*numeric handle*},<br/>role=**sender**,<br/>source=**null**,<br/>target=”myentity/$management”<br/>) Geen actie
Antwoordpad voor aanvraag maken Geen actie \<-- attach(<br/>name={*link name*},<br/>handle={*numeric handle*},<br/>role=**receiver**,<br/>source=null,<br/>target=”myentity”<br/>)
Antwoordpad voor aanvraag maken --> attach(<br/>name={*link name*},<br/>handle={*numeric handle*},<br/>role=**receiver**,<br/>source=”myentity/$management”,<br/>target=”myclient$id”<br/>)
Antwoordpad voor aanvraag maken Geen actie \<-- attach(<br/>name={*link name*},<br/>handle={*numeric handle*},<br/>role=**sender**,<br/>source=”myentity”,<br/>target=”myclient$id”<br/>)

Als deze koppeling is geïmplementeerd, is de implementatie van de aanvraag/reactie eenvoudig: een aanvraag is een bericht dat wordt verzonden naar een entiteit binnen de berichteninfrastructuur die dit patroon begrijpt. In dat aanvraagbericht wordt het antwoordveld in deeigenschappensectie ingesteld op de doel-id voor de koppeling waarop het antwoord moet worden bezorgd. De verwerkingsentiteit verwerkt de aanvraag en levert vervolgens het antwoord via de koppeling waarvan de doel-id overeenkomt met de aangegeven antwoord-id .

Het patroon vereist uiteraard dat de clientcontainer en de door de client gegenereerde id voor de antwoordbestemming uniek zijn voor alle clients en, om veiligheidsredenen, ook moeilijk te voorspellen zijn.

De berichtenuitwisseling die wordt gebruikt voor het beheerprotocol en voor alle andere protocollen die hetzelfde patroon gebruiken, vinden plaats op toepassingsniveau; ze definiëren geen nieuwe AMQP-protocolbewegingen. Dat is opzettelijk, zodat toepassingen direct kunnen profiteren van deze extensies met compatibele AMQP 1.0-stacks.

Service Bus implementeert momenteel geen van de kernfuncties van de beheerspecificatie, maar het aanvraag-/antwoordpatroon dat is gedefinieerd door de beheerspecificatie is fundamenteel voor de functie op basis van claims en voor bijna alle geavanceerde mogelijkheden die in de volgende secties worden besproken:

Autorisatie op basis van claims

Het cbs-specificatieconcept (AMQP Claims-Based-Authorization) bouwt voort op het aanvraag-/antwoordpatroon voor beheerspecificaties en beschrijft een gegeneraliseerd model voor het gebruik van federatieve beveiligingstokens met AMQP.

Het standaardbeveiligingsmodel van AMQP dat in de inleiding wordt besproken, is gebaseerd op SASL en kan worden geïntegreerd met de AMQP-verbindingshanddruk. Het gebruik van SASL heeft het voordeel dat het een uitbreidbaar model biedt waarvoor een set mechanismen is gedefinieerd waaruit elk protocol dat formeel leunt op SASL kan profiteren. Onder deze mechanismen is 'PLAIN' voor de overdracht van gebruikersnamen en wachtwoorden, 'EXTERNAL' om te binden aan TLS-beveiliging, 'ANONIEM' om de afwezigheid van expliciete verificatie/autorisatie uit te drukken, en een breed scala aan aanvullende mechanismen waarmee verificatie- en/of autorisatiereferenties of tokens kunnen worden doorgegeven.

SASL-integratie van AMQP heeft twee nadelen:

  • Alle referenties en tokens zijn gericht op de verbinding. Een berichteninfrastructuur wil mogelijk gedifferentieerd toegangsbeheer bieden per entiteit; Zo kan de bearer van een token bijvoorbeeld worden verzonden naar wachtrij A, maar niet naar wachtrij B. Als de autorisatiecontext is verankerd aan de verbinding, is het niet mogelijk om één verbinding te gebruiken en toch verschillende toegangstokens te gebruiken voor wachtrij A en wachtrij B.
  • Toegangstokens zijn doorgaans slechts gedurende een beperkte tijd geldig. Deze geldigheid vereist dat de gebruiker periodiek tokens opnieuw aanvraagt en de tokenverlener de mogelijkheid biedt om te weigeren een nieuw token uit te geven als de toegangsmachtigingen van de gebruiker zijn gewijzigd. AMQP-verbindingen kunnen lange tijd duren. Het SASL-model biedt alleen een kans om een token in te stellen op het moment van verbinding, wat betekent dat de berichteninfrastructuur de client moet loskoppelen wanneer het token verloopt of het risico moet accepteren dat continue communicatie met een client die toegangsrechten heeft, in de tussentijd is ingetrokken.

De AMQP CBS-specificatie, geïmplementeerd door Service Bus, maakt een elegante tijdelijke oplossing mogelijk voor beide problemen: Hiermee kan een client toegangstokens koppelen aan elk knooppunt en deze tokens bijwerken voordat ze verlopen, zonder de berichtstroom te onderbreken.

CBS definieert een virtueel beheerknooppunt, met de naam $cbs, dat moet worden geleverd door de berichteninfrastructuur. Het beheerknooppunt accepteert tokens namens andere knooppunten in de berichteninfrastructuur.

Het protocolbeweging is een aanvraag-/antwoorduitwisseling zoals gedefinieerd door de beheerspecificatie. Dat betekent dat de client een paar koppelingen met het $cbs knooppunt tot stand brengt en vervolgens een aanvraag doorgeeft op de uitgaande koppeling en vervolgens wacht op het antwoord op de binnenkomende koppeling.

Het aanvraagbericht heeft de volgende toepassingseigenschappen:

Sleutel Optioneel Waardetype Waarde-inhoud
operation Nee tekenreeks put-token
type Nee tekenreeks Het type token dat wordt geplaatst.
name Nee tekenreeks De doelgroep waarop het token van toepassing is.
expiration Ja tijdstempel De verlooptijd van het token.

De naameigenschap identificeert de entiteit waaraan het token moet worden gekoppeld. In Service Bus is het het pad naar de wachtrij of het onderwerp/abonnement. De typeeigenschap identificeert het tokentype:

Tokentype Beschrijving van token Type hoofdtekst Aantekeningen
jwt JSON Web Token (JWT) AMQP-waarde (tekenreeks)
servicebus.windows.net:sastoken Service Bus SAS-token AMQP-waarde (tekenreeks) -

Tokens verlenen rechten. Service Bus weet over drie fundamentele rechten: 'Verzenden' maakt verzenden mogelijk, 'Luisteren' maakt ontvangen en 'Beheren' het bewerken van entiteiten mogelijk. Service Bus SAS-tokens verwijzen naar regels die zijn geconfigureerd in de naamruimte of entiteit en deze regels worden geconfigureerd met rechten. Als u het token ondertekent met de sleutel die aan die regel is gekoppeld, wordt het token dus de respectieve rechten uitgesproken. Met het token dat is gekoppeld aan een entiteit met behulp van een put-token , kan de verbonden client communiceren met de entiteit volgens de tokenrechten. Voor een koppeling waarbij de client de rol afzender opneemt, is het recht Verzenden vereist. Voor het overnemen van de ontvangerrol is het recht 'Luisteren' vereist.

Het antwoordbericht heeft de volgende waarden voor toepassingseigenschappen

Sleutel Optioneel Waardetype Waarde-inhoud
status-code Nee geheel getal (int) HTTP-antwoordcode [RFC2616].
status-description Ja tekenreeks Beschrijving van de status.

De client kan put-token herhaaldelijk aanroepen en voor elke entiteit in de berichteninfrastructuur. De tokens zijn gericht op de huidige client en verankerd op de huidige verbinding, wat betekent dat de server alle bewaarde tokens verwijdert wanneer de verbinding afneemt.

De huidige Service Bus-implementatie staat alleen CBS toe in combinatie met de SASL-methode 'ANONIEM'. Er moet altijd een SSL/TLS-verbinding bestaan vóór de SASL-handshake.

Het ANONIEME mechanisme moet daarom worden ondersteund door de gekozen AMQP 1.0-client. Anonieme toegang betekent dat de eerste verbindingshanddruk, inclusief het maken van de eerste sessie, plaatsvindt zonder dat Service Bus weet wie de verbinding maakt.

Zodra de verbinding en sessie tot stand is gebracht, zijn het koppelen van de koppelingen aan het $cbs knooppunt en het verzenden van de put-tokenaanvraag de enige toegestane bewerkingen. Een geldig token moet worden ingesteld met behulp van een put-tokenaanvraag voor een bepaald entiteitsknooppunt binnen 20 seconden nadat de verbinding tot stand is gebracht, anders wordt de verbinding eenzijdig verwijderd door Service Bus.

De client is vervolgens verantwoordelijk voor het bijhouden van de vervaldatum van het token. Wanneer een token verloopt, worden alle koppelingen op de verbinding met de betreffende entiteit door Service Bus onmiddellijk weggezet. Om te voorkomen dat er een probleem optreedt, kan de client het token voor het knooppunt vervangen door een nieuw knooppunt op elk gewenst moment via het virtuele $cbs-beheerknooppunt met dezelfde put-tokenbeweging en zonder dat het nettoladingverkeer dat op verschillende koppelingen stroomt, binnenkomt.

Send-via-functionaliteit

Send-via/Transfer sender is een functionaliteit waarmee Service Bus een bepaald bericht via een andere entiteit naar een doelentiteit kan doorsturen. Deze functie wordt gebruikt voor het uitvoeren van bewerkingen tussen entiteiten in één transactie.

Met deze functionaliteit maakt u een afzender en maakt u de koppeling naar de via-entity. Tijdens het tot stand brengen van de koppeling wordt aanvullende informatie doorgegeven om de werkelijke bestemming van de berichten/overdrachten op deze koppeling tot stand te brengen. Zodra de koppeling is voltooid, worden alle berichten die op deze koppeling worden verzonden, automatisch doorgestuurd naar de doelentiteit via entiteit.

Opmerking: verificatie moet worden uitgevoerd voor zowel via entiteitals doelentiteit voordat deze koppeling tot stand wordt gebracht.

Klant Richting Service Bus
attach(<br/>name={link name},<br/>role=sender,<br/>source={client link ID},<br/>target=**{via-entity}**,<br/>**properties=map [(<br/>com.microsoft:transfer-destination-address=<br/>{destination-entity} )]** ) ------>
<------ attach(<br/>name={link name},<br/>role=receiver,<br/>source={client link ID},<br/>target={via-entity},<br/>properties=map [(<br/>com.microsoft:transfer-destination-address=<br/>{destination-entity} )] )

Volgende stappen

Zie Het overzicht van Service Bus AMQP voor meer informatie over AMQP.