Kommunicera med din IoT-hubb med hjälp av AMQP-protokollet
Azure IoT Hub stöder OASIS Advanced Message Queueing Protocol (AMQP) version 1.0 för att leverera en mängd olika funktioner via enhetsuppriktade och tjänstriktade slutpunkter. Det här dokumentet beskriver användningen av AMQP-klienter för att ansluta till en IoT-hubb för att IoT Hub funktioner.
Tjänstklient
Anslut och autentisera till en IoT Hub (tjänstklient)
För att ansluta till en IoT-hubb med hjälp av AMQP kan en klient använda den anspråksbaserade säkerhetsautentisering (CBS) eller Simple Authentication and Security Layer-autentisering (SASL).
Följande information krävs för tjänstklienten:
| Information | Värde |
|---|---|
| Värdnamn för IoT-hubb | <iot-hub-name>.azure-devices.net |
| Nyckelnamn | service |
| Åtkomstnyckel | En primär eller sekundär nyckel som är associerad med tjänsten |
| Signatur för delad åtkomst | En kortlivad signatur för delad åtkomst i följande format: SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI} . Information om hur du hämtar koden för att generera signaturen finns i Kontrollera åtkomst till IoT Hub. |
Följande kodfragment använder uAMQP-biblioteket i Python för att ansluta till en IoT-hubb via en avsändarlänk.
import uamqp
import urllib
import time
# Use generate_sas_token implementation available here:
# https://docs.microsoft.com/azure/iot-hub/iot-hub-devguide-security#security-token-structure
from helper import generate_sas_token
iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
policy_name = 'service'
access_key = '<primary-or-secondary-key>'
operation = '<operation-link-name>' # example: '/messages/devicebound'
username = '{policy_name}@sas.root.{iot_hub_name}'.format(
iot_hub_name=iot_hub_name, policy_name=policy_name)
sas_token = generate_sas_token(hostname, access_key, policy_name)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
urllib.quote_plus(sas_token), hostname, operation)
# Create a send or receive client
send_client = uamqp.SendClient(uri, debug=True)
receive_client = uamqp.ReceiveClient(uri, debug=True)
Anropa meddelanden från moln till enhet (tjänstklient)
Mer information om moln-till-enhet-meddelandeutbytet mellan tjänsten och IoT-hubben och mellan enheten och IoT-hubben finns i Skicka meddelanden från molnet till enheten från din IoT-hubb. Tjänstklienten använder två länkar för att skicka meddelanden och ta emot feedback för tidigare skickade meddelanden från enheter, enligt beskrivningen i följande tabell:
| Skapades av | Länktyp | Länksökväg | Description |
|---|---|---|---|
| Tjänst | Avsändarlänk | /messages/devicebound |
Meddelanden från moln till enhet som är avsedda för enheter skickas till den här länken av tjänsten. Meddelanden som skickas via den här länken To har sin egenskap inställd på målenhetens mottagarlänksökväg, /devices/<deviceID>/messages/devicebound . |
| Tjänst | Mottagarlänk | /messages/serviceBound/feedback |
Feedbackmeddelanden om slutförande, avvisande och avvisning som kommer från enheter som tagits emot på den här länken av tjänsten. Mer information om feedbackmeddelanden finns i Skicka meddelanden från molnet till enheten från en IoT-hubb. |
Följande kodfragment visar hur du skapar ett moln-till-enhet-meddelande och skickar det till en enhet med hjälp av uAMQP-biblioteket i Python.
import uuid
# Create a message and set message property 'To' to the device-bound link on device
msg_id = str(uuid.uuid4())
msg_content = b"Message content goes here!"
device_id = '<device-id>'
to = '/devices/{device_id}/messages/devicebound'.format(device_id=device_id)
ack = 'full' # Alternative values are 'positive', 'negative', and 'none'
app_props = {'iothub-ack': ack}
msg_props = uamqp.message.MessageProperties(message_id=msg_id, to=to)
msg = uamqp.Message(msg_content, properties=msg_props,
application_properties=app_props)
# Send the message by using the send client that you created and connected to the IoT hub earlier
send_client.queue_message(msg)
results = send_client.send_all_messages()
# Close the client if it's not needed
send_client.close()
För att få feedback skapar tjänstklienten en mottagarlänk. Följande kodfragment visar hur du skapar en länk med hjälp av uAMQP-biblioteket i Python:
import json
operation = '/messages/serviceBound/feedback'
# ...
# Re-create the URI by using the preceding feedback path and authenticate it
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
urllib.quote_plus(sas_token), hostname, operation)
receive_client = uamqp.ReceiveClient(uri, debug=True)
batch = receive_client.receive_message_batch(max_batch_size=10)
for msg in batch:
print('received a message')
# Check content_type in message property to identify feedback messages coming from device
if msg.properties.content_type == 'application/vnd.microsoft.iothub.feedback.json':
msg_body_raw = msg.get_data()
msg_body_str = ''.join(msg_body_raw)
msg_body = json.loads(msg_body_str)
print(json.dumps(msg_body, indent=2))
print('******************')
for feedback in msg_body:
print('feedback received')
print('\tstatusCode: ' + str(feedback['statusCode']))
print('\toriginalMessageId: ' + str(feedback['originalMessageId']))
print('\tdeviceId: ' + str(feedback['deviceId']))
print
else:
print('unknown message:', msg.properties.content_type)
Som du ser i föregående kod har ett moln-till-enhet-feedbackmeddelande innehållstypen application/vnd.microsoft.iothub.feedback.json. Du kan använda egenskaperna i meddelandets JSON-brödtext för att dra slutsatsen om det ursprungliga meddelandets leveransstatus:
Nyckeln i feedbacktexten har något av
statusCodeföljande värden: Success, Expired, DeliveryCountExceeded, Rejected eller Purged.Nyckeln
deviceIdi feedbacktexten har målenhetens ID.Nyckeln
originalMessageIdi feedbacktexten innehåller ID:t för det ursprungliga moln-till-enhet-meddelandet som skickades av tjänsten. Du kan använda den här leveransstatusen för att korrelera feedback med meddelanden från moln till enhet.
Ta emot telemetrimeddelanden (tjänstklient)
Som standard lagrar din IoT-hubb inkommande telemetrimeddelanden i en inbyggd händelsehubb. Tjänstklienten kan använda AMQP-protokollet för att ta emot lagrade händelser.
För detta ändamål måste tjänstklienten först ansluta till IoT Hub-slutpunkten och ta emot en omdirigeringsadress till de inbyggda händelsehubben. Tjänstklienten använder sedan den angivna adressen för att ansluta till den inbyggda händelsehubben.
I varje steg måste klienten presentera följande uppgifter:
Giltiga autentiseringsuppgifter för tjänsten (signaturtoken för delad åtkomst för tjänsten).
En välformaterad sökväg till konsumentgruppens partition som den har för avsikt att hämta meddelanden från. För en viss konsumentgrupp och partitions-ID har sökvägen följande format:
/messages/events/ConsumerGroups/<consumer_group>/Partitions/<partition_id>(standardkonsumentgruppen är$Default).Ett valfritt filtrerings predikat för att ange en startpunkt i partitionen. Predikatet kan vara i form av ett sekvensnummer, en offset eller en tidsstämpel i rad.
Följande kodfragment använder uAMQP-biblioteket i Python för att demonstrera föregående steg:
import json
import uamqp
import urllib
import time
# Use the generate_sas_token implementation that's available here: https://docs.microsoft.com/azure/iot-hub/iot-hub-devguide-security#security-token-structure
from helper import generate_sas_token
iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
policy_name = 'service'
access_key = '<primary-or-secondary-key>'
operation = '/messages/events/ConsumerGroups/{consumer_group}/Partitions/{p_id}'.format(
consumer_group='$Default', p_id=0)
username = '{policy_name}@sas.root.{iot_hub_name}'.format(
policy_name=policy_name, iot_hub_name=iot_hub_name)
sas_token = generate_sas_token(hostname, access_key, policy_name)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
urllib.quote_plus(sas_token), hostname, operation)
# Optional filtering predicates can be specified by using endpoint_filter
# Valid predicates include:
# - amqp.annotation.x-opt-sequence-number
# - amqp.annotation.x-opt-offset
# - amqp.annotation.x-opt-enqueued-time
# Set endpoint_filter variable to None if no filter is needed
endpoint_filter = b'amqp.annotation.x-opt-sequence-number > 2995'
# Helper function to set the filtering predicate on the source URI
def set_endpoint_filter(uri, endpoint_filter=''):
source_uri = uamqp.address.Source(uri)
source_uri.set_filter(endpoint_filter)
return source_uri
receive_client = uamqp.ReceiveClient(
set_endpoint_filter(uri, endpoint_filter), debug=True)
try:
batch = receive_client.receive_message_batch(max_batch_size=5)
except uamqp.errors.LinkRedirect as redirect:
# Once a redirect error is received, close the original client and recreate a new one to the re-directed address
receive_client.close()
sas_auth = uamqp.authentication.SASTokenAuth.from_shared_access_key(
redirect.address, policy_name, access_key)
receive_client = uamqp.ReceiveClient(set_endpoint_filter(
redirect.address, endpoint_filter), auth=sas_auth, debug=True)
# Start receiving messages in batches
batch = receive_client.receive_message_batch(max_batch_size=5)
for msg in batch:
print('*** received a message ***')
print(''.join(msg.get_data()))
print('\t: ' + str(msg.annotations['x-opt-sequence-number']))
print('\t: ' + str(msg.annotations['x-opt-offset']))
print('\t: ' + str(msg.annotations['x-opt-enqueued-time']))
För ett visst enhets-ID använder IoT-hubben en hash för enhets-ID:t för att avgöra vilken partition meddelandena ska lagras i. Föregående kodfragment visar hur händelser tas emot från en enda sådan partition. Observera dock att ett typiskt program ofta behöver hämta händelser som lagras i alla händelsehubbpartitioner.
Enhetsklient
Anslut och autentisera till en IoT-hubb (enhetsklient)
För att ansluta till en IoT-hubb med hjälp av AMQP kan en enhet använda anspråksbaserad säkerhet (CBS) eller Simple Authentication and Security Layer-autentisering (SASL).
Följande information krävs för enhetsklienten:
| Information | Värde |
|---|---|
| Värdnamn för IoT-hubb | <iot-hub-name>.azure-devices.net |
| Åtkomstnyckel | En primär eller sekundär nyckel som är associerad med enheten |
| Signatur för delad åtkomst | En kortlivad signatur för delad åtkomst i följande format: SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI} . Information om hur du hämtar koden för att generera signaturen finns i Kontrollera åtkomst till IoT Hub. |
Följande kodfragment använder uAMQP-biblioteket i Python för att ansluta till en IoT-hubb via en avsändarlänk.
import uamqp
import urllib
import uuid
# Use generate_sas_token implementation available here:
# https://docs.microsoft.com/azure/iot-hub/iot-hub-devguide-security#security-token-structure
from helper import generate_sas_token
iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
device_id = '<device-id>'
access_key = '<primary-or-secondary-key>'
username = '{device_id}@sas.{iot_hub_name}'.format(
device_id=device_id, iot_hub_name=iot_hub_name)
sas_token = generate_sas_token('{hostname}/devices/{device_id}'.format(
hostname=hostname, device_id=device_id), access_key, None)
# e.g., '/devices/{device_id}/messages/devicebound'
operation = '<operation-link-name>'
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
urllib.quote_plus(sas_token), hostname, operation)
receive_client = uamqp.ReceiveClient(uri, debug=True)
send_client = uamqp.SendClient(uri, debug=True)
Följande länksökvägar stöds som enhetsåtgärder:
| Skapades av | Länktyp | Länksökväg | Description |
|---|---|---|---|
| Enheter | Mottagarlänk | /devices/<deviceID>/messages/devicebound |
Meddelanden från moln till enhet som är avsedda för enheter tas emot på den här länken av varje målenhet. |
| Enheter | Avsändarlänk | /devices/<deviceID>/messages/events |
Enhet-till-moln-meddelanden som skickas från en enhet skickas via den här länken. |
| Enheter | Avsändarlänk | /messages/serviceBound/feedback |
Meddelandefeedback från moln till enhet skickas till tjänsten via den här länken av enheter. |
Ta emot kommandon från moln till enhet (enhetsklient)
Moln-till-enhet-kommandon som skickas till enheter tas emot via en /devices/<deviceID>/messages/devicebound länk. Enheter kan ta emot dessa meddelanden i batchar och använda nyttolasten för meddelandedata, meddelandeegenskaper, anteckningar eller programegenskaper i meddelandet efter behov.
Följande kodfragment använder uAMQP-biblioteket i Pythonför att ta emot meddelanden från molnet till enheten från en enhet.
# ...
# Create a receive client for the cloud-to-device receive link on the device
operation = '/devices/{device_id}/messages/devicebound'.format(
device_id=device_id)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
urllib.quote_plus(sas_token), hostname, operation)
receive_client = uamqp.ReceiveClient(uri, debug=True)
while True:
batch = receive_client.receive_message_batch(max_batch_size=5)
for msg in batch:
print('*** received a message ***')
print(''.join(msg.get_data()))
# Property 'to' is set to: '/devices/device1/messages/devicebound',
print('\tto: ' + str(msg.properties.to))
# Property 'message_id' is set to value provided by the service
print('\tmessage_id: ' + str(msg.properties.message_id))
# Other properties are present if they were provided by the service
print('\tcreation_time: ' + str(msg.properties.creation_time))
print('\tcorrelation_id: ' +
str(msg.properties.correlation_id))
print('\tcontent_type: ' + str(msg.properties.content_type))
print('\treply_to_group_id: ' +
str(msg.properties.reply_to_group_id))
print('\tsubject: ' + str(msg.properties.subject))
print('\tuser_id: ' + str(msg.properties.user_id))
print('\tgroup_sequence: ' +
str(msg.properties.group_sequence))
print('\tcontent_encoding: ' +
str(msg.properties.content_encoding))
print('\treply_to: ' + str(msg.properties.reply_to))
print('\tabsolute_expiry_time: ' +
str(msg.properties.absolute_expiry_time))
print('\tgroup_id: ' + str(msg.properties.group_id))
# Message sequence number in the built-in event hub
print('\tx-opt-sequence-number: ' +
str(msg.annotations['x-opt-sequence-number']))
Skicka telemetrimeddelanden (enhetsklient)
Du kan också skicka telemetrimeddelanden från en enhet med hjälp av AMQP. Enheten kan också ange en ordlista med programegenskaper eller olika meddelandeegenskaper, till exempel meddelande-ID.
Följande kodfragment använder uAMQP-biblioteket i Python för att skicka "enhet till molnet"-meddelanden från en enhet.
# ...
# Create a send client for the device-to-cloud send link on the device
operation = '/devices/{device_id}/messages/events'.format(device_id=device_id)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username), urllib.quote_plus(sas_token), hostname, operation)
send_client = uamqp.SendClient(uri, debug=True)
# Set any of the applicable message properties
msg_props = uamqp.message.MessageProperties()
msg_props.message_id = str(uuid.uuid4())
msg_props.creation_time = None
msg_props.correlation_id = None
msg_props.content_type = None
msg_props.reply_to_group_id = None
msg_props.subject = None
msg_props.user_id = None
msg_props.group_sequence = None
msg_props.to = None
msg_props.content_encoding = None
msg_props.reply_to = None
msg_props.absolute_expiry_time = None
msg_props.group_id = None
# Application properties in the message (if any)
application_properties = { "app_property_key": "app_property_value" }
# Create message
msg_data = b"Your message payload goes here"
message = uamqp.Message(msg_data, properties=msg_props, application_properties=application_properties)
send_client.queue_message(message)
results = send_client.send_all_messages()
for result in results:
if result == uamqp.constants.MessageState.SendFailed:
print result
Ytterligare information
AMQP-anslutningarna kan avbrytas på grund av ett nätverksfel eller att autentiseringstoken upphör att gälla (genereras i koden). Tjänstklienten måste hantera dessa omständigheter och återupprätta anslutningen och länkarna om det behövs. Om en autentiseringstoken upphör att gälla kan klienten undvika att anslutningen slutar gälla genom att proaktivt förnya token innan den upphör att gälla.
Klienten måste ibland kunna hantera länkomdirigeringar på rätt sätt. Information om en sådan åtgärd finns i dokumentationen för AMQP-klienten.
Nästa steg
Mer information om AMQP-protokollet finns i AMQP v1.0-specifikationen.
Mer information om IoT Hub finns i: