Communiceren met uw IoT-hub met behulp van het AMQP-protocol

Azure IoT Hub biedt ondersteuning voor OASIS Advanced Message Queueing Protocol (AMQP) versie 1.0 om tal van functies te bieden via apparaat- en service-gerichte eindpunten. In dit document wordt het gebruik van AMQP-clients beschreven om verbinding te maken met een IoT-hub om de IoT Hub gebruiken.

Serviceclient

Verbinding maken en verifiëren bij een IoT-hub (serviceclient)

Om verbinding te maken met een IoT-hub met behulp van AMQP, kan een client gebruikmaken van de op claims gebaseerde beveiliging (SAS) of Simple Authentication and Security Layer (SASL)-verificatie.

De volgende informatie is vereist voor de serviceclient:

Informatie Waarde
Hostnaam van IoT Hub <iot-hub-name>.azure-devices.net
Sleutelnaam service
Toegangssleutel Een primaire of secundaire sleutel die is gekoppeld aan de service
Shared Access Signature Een handtekening voor gedeelde toegang met een korte duur in de volgende indeling: SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI} . Zie Toegang tot de handtekening controleren om de code voor het genereren van deze handtekening IoT Hub.

In het volgende codefragment wordt de uAMQP-bibliotheek in Python gebruikt om verbinding te maken met een IoT-hub via een afzenderkoppeling.

import uamqp
import urllib
import time

# Use generate_sas_token implementation available here:
# https://docs.microsoft.com/azure/iot-hub/iot-hub-devguide-security#security-token-structure
from helper import generate_sas_token

iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
policy_name = 'service'
access_key = '<primary-or-secondary-key>'
operation = '<operation-link-name>'  # example: '/messages/devicebound'

username = '{policy_name}@sas.root.{iot_hub_name}'.format(
    iot_hub_name=iot_hub_name, policy_name=policy_name)
sas_token = generate_sas_token(hostname, access_key, policy_name)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

# Create a send or receive client
send_client = uamqp.SendClient(uri, debug=True)
receive_client = uamqp.ReceiveClient(uri, debug=True)

Cloud-naar-apparaat-berichten aanroepen (serviceclient)

Zie Cloud-naar-apparaat-berichten verzenden vanuit uw IoT-hub voor meer informatie over de uitwisseling van berichten van cloud naar apparaat tussen de service en de IoT-hub en tussen het apparaat en de IoT-hub. De serviceclient gebruikt twee koppelingen om berichten te verzenden en feedback te ontvangen over eerder verzonden berichten van apparaten, zoals beschreven in de volgende tabel:

Gemaakt door Type koppeling Koppelingspad Description
Service Afzenderkoppeling /messages/devicebound Cloud-naar-apparaat-berichten die bestemd zijn voor apparaten, worden door de service naar deze koppeling verzonden. Voor berichten die via deze koppeling worden verzonden, is de eigenschap ingesteld op het pad van de ontvanger van het To doelapparaat, /devices/<deviceID>/messages/devicebound .
Service Ontvangerkoppeling /messages/serviceBound/feedback Feedbackberichten over voltooiing, afwijzing en afkeuring die afkomstig zijn van apparaten die via deze koppeling door de service zijn ontvangen. Zie Cloud-naar-apparaat-berichtenverzenden vanuit een IoT-hub voor meer informatie over feedbackberichten.

Het volgende codefragment laat zien hoe u een cloud-naar-apparaat-bericht maakt en naar een apparaat verzendt met behulp van de uAMQP-bibliotheek in Python.

import uuid
# Create a message and set message property 'To' to the device-bound link on device
msg_id = str(uuid.uuid4())
msg_content = b"Message content goes here!"
device_id = '<device-id>'
to = '/devices/{device_id}/messages/devicebound'.format(device_id=device_id)
ack = 'full'  # Alternative values are 'positive', 'negative', and 'none'
app_props = {'iothub-ack': ack}
msg_props = uamqp.message.MessageProperties(message_id=msg_id, to=to)
msg = uamqp.Message(msg_content, properties=msg_props,
                    application_properties=app_props)

# Send the message by using the send client that you created and connected to the IoT hub earlier
send_client.queue_message(msg)
results = send_client.send_all_messages()

# Close the client if it's not needed
send_client.close()

Om feedback te ontvangen, maakt de serviceclient een ontvangerkoppeling. Het volgende codefragment laat zien hoe u een koppeling maakt met behulp van de uAMQP-bibliotheek in Python:

import json

operation = '/messages/serviceBound/feedback'

# ...
# Re-create the URI by using the preceding feedback path and authenticate it
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

receive_client = uamqp.ReceiveClient(uri, debug=True)
batch = receive_client.receive_message_batch(max_batch_size=10)
for msg in batch:
    print('received a message')
    # Check content_type in message property to identify feedback messages coming from device
    if msg.properties.content_type == 'application/vnd.microsoft.iothub.feedback.json':
        msg_body_raw = msg.get_data()
        msg_body_str = ''.join(msg_body_raw)
        msg_body = json.loads(msg_body_str)
        print(json.dumps(msg_body, indent=2))
        print('******************')
        for feedback in msg_body:
            print('feedback received')
            print('\tstatusCode: ' + str(feedback['statusCode']))
            print('\toriginalMessageId: ' + str(feedback['originalMessageId']))
            print('\tdeviceId: ' + str(feedback['deviceId']))
            print
    else:
        print('unknown message:', msg.properties.content_type)

Zoals u in de voorgaande code kunt zien, heeft een cloud-naar-apparaat-feedbackbericht het inhoudstype application/vnd.microsoft.iothub.feedback.json. U kunt de eigenschappen in de JSON-body van het bericht gebruiken om de leveringsstatus van het oorspronkelijke bericht af te afleiden:

  • Sleutel in de feedback-hoofdcode heeft een van de statusCode volgende waarden: Success, Expired, DeliveryCountExceeded, Rejected of Purged.

  • Sleutel deviceId in de hoofdcode van de feedback heeft de id van het doelapparaat.

  • Sleutel in de hoofdcode van de feedback heeft de originalMessageId id van het oorspronkelijke cloud-naar-apparaat-bericht dat is verzonden door de service. U kunt deze leveringsstatus gebruiken om feedback te correleren met cloud-naar-apparaat-berichten.

Telemetrieberichten ontvangen (serviceclient)

Standaard slaat uw IoT-hub opgenomen telemetrieberichten van apparaten op in een ingebouwde Event Hub. Uw serviceclient kan het AMQP-protocol gebruiken om de opgeslagen gebeurtenissen te ontvangen.

Hiervoor moet de serviceclient eerst verbinding maken met het eindpunt van de IoT-hub en een omleidingsadres ontvangen naar de ingebouwde Event Hubs. De serviceclient gebruikt vervolgens het opgegeven adres om verbinding te maken met de ingebouwde Event Hub.

In elke stap moet de client de volgende gegevens presenteren:

  • Geldige servicereferenties (service shared access signature-token).

  • Een goed opgemaakt pad naar de consumergroeppartitie waar het berichten van wil ophalen. Voor een bepaalde consumentengroep en partitie-id heeft het pad de volgende indeling: /messages/events/ConsumerGroups/<consumer_group>/Partitions/<partition_id> (de standaard consumergroep is $Default ).

  • Een optioneel filterpredicaat om een beginpunt in de partitie aan te wijzen. Dit predicaat kan de vorm hebben van een volgnummer, offset of een uit de enqueu bestaande tijdstempel.

In het volgende codefragment wordt de uAMQP-bibliotheek in Python gebruikt om de voorgaande stappen te demonstreren:

import json
import uamqp
import urllib
import time

# Use the generate_sas_token implementation that's available here: https://docs.microsoft.com/azure/iot-hub/iot-hub-devguide-security#security-token-structure
from helper import generate_sas_token

iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
policy_name = 'service'
access_key = '<primary-or-secondary-key>'
operation = '/messages/events/ConsumerGroups/{consumer_group}/Partitions/{p_id}'.format(
    consumer_group='$Default', p_id=0)

username = '{policy_name}@sas.root.{iot_hub_name}'.format(
    policy_name=policy_name, iot_hub_name=iot_hub_name)
sas_token = generate_sas_token(hostname, access_key, policy_name)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

# Optional filtering predicates can be specified by using endpoint_filter
# Valid predicates include:
# - amqp.annotation.x-opt-sequence-number
# - amqp.annotation.x-opt-offset
# - amqp.annotation.x-opt-enqueued-time
# Set endpoint_filter variable to None if no filter is needed
endpoint_filter = b'amqp.annotation.x-opt-sequence-number > 2995'

# Helper function to set the filtering predicate on the source URI


def set_endpoint_filter(uri, endpoint_filter=''):
    source_uri = uamqp.address.Source(uri)
    source_uri.set_filter(endpoint_filter)
    return source_uri


receive_client = uamqp.ReceiveClient(
    set_endpoint_filter(uri, endpoint_filter), debug=True)
try:
    batch = receive_client.receive_message_batch(max_batch_size=5)
except uamqp.errors.LinkRedirect as redirect:
    # Once a redirect error is received, close the original client and recreate a new one to the re-directed address
    receive_client.close()

    sas_auth = uamqp.authentication.SASTokenAuth.from_shared_access_key(
        redirect.address, policy_name, access_key)
    receive_client = uamqp.ReceiveClient(set_endpoint_filter(
        redirect.address, endpoint_filter), auth=sas_auth, debug=True)

# Start receiving messages in batches
batch = receive_client.receive_message_batch(max_batch_size=5)
for msg in batch:
    print('*** received a message ***')
    print(''.join(msg.get_data()))
    print('\t: ' + str(msg.annotations['x-opt-sequence-number']))
    print('\t: ' + str(msg.annotations['x-opt-offset']))
    print('\t: ' + str(msg.annotations['x-opt-enqueued-time']))

Voor een bepaalde apparaat-id gebruikt de IoT-hub een hash van de apparaat-id om te bepalen in welke partitie de berichten moeten worden opgeslagen. Het voorgaande codefragment laat zien hoe gebeurtenissen worden ontvangen van één partitie. Houd er echter rekening mee dat een typische toepassing vaak gebeurtenissen moet ophalen die zijn opgeslagen in alle Event Hub-partities.

Apparaatclient

Verbinding maken en verifiëren bij een IoT-hub (apparaatclient)

Als u via AMQP verbinding wilt maken met een IoT-hub, kan een apparaat gebruikmaken van op claims gebaseerde beveiliging (SAS) of Simple Authentication and Security Layer (SASL)-verificatie.

De volgende informatie is vereist voor de apparaatclient:

Informatie Waarde
Hostnaam van IoT Hub <iot-hub-name>.azure-devices.net
Toegangssleutel Een primaire of secundaire sleutel die is gekoppeld aan het apparaat
Shared Access Signature Een handtekening voor gedeelde toegang met een korte duur in de volgende indeling: SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI} . Zie Toegang tot de handtekening controleren om de code voor het genereren van deze handtekening IoT Hub.

In het volgende codefragment wordt de uAMQP-bibliotheek in Python gebruikt om verbinding te maken met een IoT-hub via een afzenderkoppeling.

import uamqp
import urllib
import uuid

# Use generate_sas_token implementation available here:
# https://docs.microsoft.com/azure/iot-hub/iot-hub-devguide-security#security-token-structure
from helper import generate_sas_token

iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
device_id = '<device-id>'
access_key = '<primary-or-secondary-key>'
username = '{device_id}@sas.{iot_hub_name}'.format(
    device_id=device_id, iot_hub_name=iot_hub_name)
sas_token = generate_sas_token('{hostname}/devices/{device_id}'.format(
    hostname=hostname, device_id=device_id), access_key, None)

# e.g., '/devices/{device_id}/messages/devicebound'
operation = '<operation-link-name>'
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

receive_client = uamqp.ReceiveClient(uri, debug=True)
send_client = uamqp.SendClient(uri, debug=True)

De volgende koppelingspaden worden ondersteund als apparaatbewerkingen:

Gemaakt door Type koppeling Koppelingspad Description
Apparaten Ontvangerkoppeling /devices/<deviceID>/messages/devicebound Cloud-naar-apparaat-berichten die bestemd zijn voor apparaten, worden door elk doelapparaat ontvangen via deze koppeling.
Apparaten Afzenderkoppeling /devices/<deviceID>/messages/events Apparaat-naar-cloud-berichten die vanaf een apparaat worden verzonden, worden via deze koppeling verzonden.
Apparaten Afzenderkoppeling /messages/serviceBound/feedback Cloud-naar-apparaat-berichtfeedback verzonden naar de service via deze koppeling door apparaten.

Cloud-naar-apparaat-opdrachten ontvangen (apparaatclient)

Cloud-naar-apparaat-opdrachten die naar apparaten worden verzonden, komen binnen via een /devices/<deviceID>/messages/devicebound koppeling. Apparaten kunnen deze berichten in batches ontvangen en zo nodig de nettolading van de berichtgegevens, berichteigenschappen, aantekeningen of toepassingseigenschappen in het bericht gebruiken.

In het volgende codefragment wordt de uAMQP-bibliotheek in Pythongebruikt om cloud-naar-apparaat-berichten te ontvangen door een apparaat.

# ...
# Create a receive client for the cloud-to-device receive link on the device
operation = '/devices/{device_id}/messages/devicebound'.format(
    device_id=device_id)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

receive_client = uamqp.ReceiveClient(uri, debug=True)
while True:
    batch = receive_client.receive_message_batch(max_batch_size=5)
    for msg in batch:
        print('*** received a message ***')
        print(''.join(msg.get_data()))

        # Property 'to' is set to: '/devices/device1/messages/devicebound',
        print('\tto:                     ' + str(msg.properties.to))

        # Property 'message_id' is set to value provided by the service
        print('\tmessage_id:             ' + str(msg.properties.message_id))

        # Other properties are present if they were provided by the service
        print('\tcreation_time:          ' + str(msg.properties.creation_time))
        print('\tcorrelation_id:         ' +
              str(msg.properties.correlation_id))
        print('\tcontent_type:           ' + str(msg.properties.content_type))
        print('\treply_to_group_id:      ' +
              str(msg.properties.reply_to_group_id))
        print('\tsubject:                ' + str(msg.properties.subject))
        print('\tuser_id:                ' + str(msg.properties.user_id))
        print('\tgroup_sequence:         ' +
              str(msg.properties.group_sequence))
        print('\tcontent_encoding:       ' +
              str(msg.properties.content_encoding))
        print('\treply_to:               ' + str(msg.properties.reply_to))
        print('\tabsolute_expiry_time:   ' +
              str(msg.properties.absolute_expiry_time))
        print('\tgroup_id:               ' + str(msg.properties.group_id))

        # Message sequence number in the built-in event hub
        print('\tx-opt-sequence-number:  ' +
              str(msg.annotations['x-opt-sequence-number']))

Telemetrieberichten verzenden (apparaatclient)

U kunt ook telemetrieberichten verzenden vanaf een apparaat met behulp van AMQP. Het apparaat kan eventueel een woordenlijst met toepassingseigenschappen of verschillende berichteigenschappen, zoals bericht-id, bieden.

In het volgende codefragment wordt de uAMQP-bibliotheek in Python gebruikt om apparaat-naar-cloud-berichten vanaf een apparaat te verzenden.

# ...
# Create a send client for the device-to-cloud send link on the device
operation = '/devices/{device_id}/messages/events'.format(device_id=device_id)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username), urllib.quote_plus(sas_token), hostname, operation)

send_client = uamqp.SendClient(uri, debug=True)

# Set any of the applicable message properties
msg_props = uamqp.message.MessageProperties()
msg_props.message_id = str(uuid.uuid4())
msg_props.creation_time = None
msg_props.correlation_id = None
msg_props.content_type = None
msg_props.reply_to_group_id = None
msg_props.subject = None
msg_props.user_id = None
msg_props.group_sequence = None
msg_props.to = None
msg_props.content_encoding = None
msg_props.reply_to = None
msg_props.absolute_expiry_time = None
msg_props.group_id = None

# Application properties in the message (if any)
application_properties = { "app_property_key": "app_property_value" }

# Create message
msg_data = b"Your message payload goes here"
message = uamqp.Message(msg_data, properties=msg_props, application_properties=application_properties)

send_client.queue_message(message)
results = send_client.send_all_messages()

for result in results:
    if result == uamqp.constants.MessageState.SendFailed:
        print result

Aanvullende opmerkingen

  • De AMQP-verbindingen kunnen worden onderbroken vanwege een netwerkfout of het verlopen van het verificatie-token (gegenereerd in de code). De serviceclient moet deze omstandigheden afhandelen en, indien nodig, de verbinding en koppelingen opnieuw tot stand brengen. Als een verificatie-token verloopt, kan de client een verbindingsuitval voorkomen door het token proactief te vernieuwen voordat het verloopt.

  • Uw client moet af en toe koppelingsomleidingen correct kunnen verwerken. Zie de documentatie van uw AMQP-client voor meer informatie over een dergelijke bewerking.

Volgende stappen

Zie de AMQP v1.0-specificatievoor meer informatie over het AMQP-protocol.

Zie voor meer informatie over IoT Hub messaging: