Comunicare con l'hub IoT usando il protocollo AMQP

hub IoT di Azure supporta OASIS Advanced Message Queuing Protocol (AMQP) versione 1.0 per offrire un'ampia gamma di funzionalità tramite endpoint rivolti ai dispositivi e rivolti ai servizi. Questo documento descrive l'uso dei client AMQP per connettersi a un hub IoT per usare hub IoT funzionalità.

Client del servizio

Connessione ed eseguire l'autenticazione in un hub IoT (client del servizio)

Per connettersi a un hub IoT tramite AMQP, un client può usare l'autenticazione CBS (Claims-Based Security) o l'autenticazione SASL (Simple Authentication and Security Layer).

Per il client del servizio sono necessarie le informazioni seguenti:

Informazioni Valore
Nome host dell'hub IoT <iot-hub-name>.azure-devices.net
Nome chiave service
Chiave di accesso Una chiave primaria o secondaria associata al servizio
Firma di accesso condiviso Firma di accesso condiviso di breve durata nel formato seguente: SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI}. Per ottenere il codice per la generazione di questa firma, vedere Controllare l'accesso a hub IoT.

Il frammento di codice seguente usa la libreria uAMQP in Python per connettersi a un hub IoT tramite un collegamento mittente.

import uamqp
import urllib
import time

# Use generate_sas_token implementation available here:
# https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token

iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
policy_name = 'service'
access_key = '<primary-or-secondary-key>'
operation = '<operation-link-name>'  # example: '/messages/devicebound'

username = '{policy_name}@sas.root.{iot_hub_name}'.format(
    iot_hub_name=iot_hub_name, policy_name=policy_name)
sas_token = generate_sas_token(hostname, access_key, policy_name)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

# Create a send or receive client
send_client = uamqp.SendClient(uri, debug=True)
receive_client = uamqp.ReceiveClient(uri, debug=True)

Richiamare messaggi da cloud a dispositivo (client del servizio)

Per informazioni sullo scambio di messaggi da cloud a dispositivo tra il servizio e l'hub IoT e tra il dispositivo e l'hub IoT, vedere Inviare messaggi da cloud a dispositivo dall'hub IoT. Il client del servizio usa due collegamenti per inviare messaggi e ricevere commenti e suggerimenti per i messaggi inviati in precedenza dai dispositivi, come descritto nella tabella seguente:

Autore Tipo di collegamento Percorso del collegamento Descrizione
Servizio Collegamento al mittente /messages/devicebound I messaggi da cloud a dispositivo destinati ai dispositivi vengono inviati a questo collegamento dal servizio. I messaggi inviati tramite questo collegamento hanno la proprietà To impostata sul percorso di collegamento del ricevitore del dispositivo di destinazione, /devices/<deviceID>/messages/devicebound.
Servizio Collegamento ricevitore /messages/serviceBound/feedback Messaggi di feedback di completamento, rifiuto e abbandono ricevuti dai dispositivi ricevuti in questo collegamento dal servizio. Per altre informazioni sui messaggi di feedback, vedere Inviare messaggi da cloud a dispositivo da un hub IoT.

Il frammento di codice seguente illustra come creare un messaggio da cloud a dispositivo e inviarlo a un dispositivo usando la libreria uAMQP in Python.

import uuid
# Create a message and set message property 'To' to the device-bound link on device
msg_id = str(uuid.uuid4())
msg_content = b"Message content goes here!"
device_id = '<device-id>'
to = '/devices/{device_id}/messages/devicebound'.format(device_id=device_id)
ack = 'full'  # Alternative values are 'positive', 'negative', and 'none'
app_props = {'iothub-ack': ack}
msg_props = uamqp.message.MessageProperties(message_id=msg_id, to=to)
msg = uamqp.Message(msg_content, properties=msg_props,
                    application_properties=app_props)

# Send the message by using the send client that you created and connected to the IoT hub earlier
send_client.queue_message(msg)
results = send_client.send_all_messages()

# Close the client if it's not needed
send_client.close()

Per ricevere commenti e suggerimenti, il client del servizio crea un collegamento ricevitore. Il frammento di codice seguente illustra come creare un collegamento usando la libreria uAMQP in Python:

import json

operation = '/messages/serviceBound/feedback'

# ...
# Re-create the URI by using the preceding feedback path and authenticate it
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

receive_client = uamqp.ReceiveClient(uri, debug=True)
batch = receive_client.receive_message_batch(max_batch_size=10)
for msg in batch:
    print('received a message')
    # Check content_type in message property to identify feedback messages coming from device
    if msg.properties.content_type == 'application/vnd.microsoft.iothub.feedback.json':
        msg_body_raw = msg.get_data()
        msg_body_str = ''.join(msg_body_raw)
        msg_body = json.loads(msg_body_str)
        print(json.dumps(msg_body, indent=2))
        print('******************')
        for feedback in msg_body:
            print('feedback received')
            print('\tstatusCode: ' + str(feedback['statusCode']))
            print('\toriginalMessageId: ' + str(feedback['originalMessageId']))
            print('\tdeviceId: ' + str(feedback['deviceId']))
            print
    else:
        print('unknown message:', msg.properties.content_type)

Come illustrato nel codice precedente, un messaggio di feedback da cloud a dispositivo ha un tipo di contenuto di applicazione/vnd.microsoft.iothub.feedback.json. È possibile usare le proprietà nel corpo JSON del messaggio per dedurre lo stato di recapito del messaggio originale:

  • La chiave statusCode nel corpo del feedback ha uno dei valori seguenti: Success, Expired, DeliveryCountExceeded, Rejected o Purged.

  • La chiave deviceId nel corpo del feedback ha l'ID del dispositivo di destinazione.

  • La chiave originalMessageId nel corpo del feedback ha l'ID del messaggio originale da cloud a dispositivo inviato dal servizio. È possibile usare questo stato di recapito per correlare il feedback ai messaggi da cloud a dispositivo.

Ricevere messaggi di telemetria (client del servizio)

Per impostazione predefinita, l'hub IoT archivia i messaggi di telemetria dei dispositivi inseriti in un hub eventi predefinito. Il client del servizio può usare il protocollo AMQP per ricevere gli eventi archiviati.

A questo scopo, il client del servizio deve prima connettersi all'endpoint dell'hub IoT e ricevere un indirizzo di reindirizzamento agli hub eventi predefiniti. Il client del servizio usa quindi l'indirizzo fornito per connettersi all'hub eventi predefinito.

In ogni passaggio, il client deve presentare le informazioni seguenti:

  • Credenziali del servizio valide (token di firma di accesso condiviso del servizio).

  • Percorso ben formattato della partizione del gruppo di consumer da cui intende recuperare i messaggi. Per un determinato gruppo di consumer e ID partizione, il percorso ha il formato seguente: /messages/events/ConsumerGroups/<consumer_group>/Partitions/<partition_id> (il gruppo di consumer predefinito è $Default).

  • Predicato di filtro facoltativo per designare un punto iniziale nella partizione. Questo predicato può essere sotto forma di numero di sequenza, offset o timestamp accodato.

Il frammento di codice seguente usa la libreria uAMQP in Python per illustrare i passaggi precedenti:

import json
import uamqp
import urllib
import time

# Use the generate_sas_token implementation that's available here: https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token

iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
policy_name = 'service'
access_key = '<primary-or-secondary-key>'
operation = '/messages/events/ConsumerGroups/{consumer_group}/Partitions/{p_id}'.format(
    consumer_group='$Default', p_id=0)

username = '{policy_name}@sas.root.{iot_hub_name}'.format(
    policy_name=policy_name, iot_hub_name=iot_hub_name)
sas_token = generate_sas_token(hostname, access_key, policy_name)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

# Optional filtering predicates can be specified by using endpoint_filter
# Valid predicates include:
# - amqp.annotation.x-opt-sequence-number
# - amqp.annotation.x-opt-offset
# - amqp.annotation.x-opt-enqueued-time
# Set endpoint_filter variable to None if no filter is needed
endpoint_filter = b'amqp.annotation.x-opt-sequence-number > 2995'

# Helper function to set the filtering predicate on the source URI


def set_endpoint_filter(uri, endpoint_filter=''):
    source_uri = uamqp.address.Source(uri)
    source_uri.set_filter(endpoint_filter)
    return source_uri


receive_client = uamqp.ReceiveClient(
    set_endpoint_filter(uri, endpoint_filter), debug=True)
try:
    batch = receive_client.receive_message_batch(max_batch_size=5)
except uamqp.errors.LinkRedirect as redirect:
    # Once a redirect error is received, close the original client and recreate a new one to the re-directed address
    receive_client.close()

    sas_auth = uamqp.authentication.SASTokenAuth.from_shared_access_key(
        redirect.address, policy_name, access_key)
    receive_client = uamqp.ReceiveClient(set_endpoint_filter(
        redirect.address, endpoint_filter), auth=sas_auth, debug=True)

# Start receiving messages in batches
batch = receive_client.receive_message_batch(max_batch_size=5)
for msg in batch:
    print('*** received a message ***')
    print(''.join(msg.get_data()))
    print('\t: ' + str(msg.annotations['x-opt-sequence-number']))
    print('\t: ' + str(msg.annotations['x-opt-offset']))
    print('\t: ' + str(msg.annotations['x-opt-enqueued-time']))

Per un determinato ID dispositivo, l'hub IoT usa un hash dell'ID dispositivo per determinare la partizione in cui archiviare i messaggi. Il frammento di codice precedente illustra come gli eventi vengono ricevuti da una singola partizione di questo tipo. Si noti tuttavia che un'applicazione tipica spesso deve recuperare gli eventi archiviati in tutte le partizioni dell'hub eventi.

Client del dispositivo

Connessione ed eseguire l'autenticazione in un hub IoT (client del dispositivo)

Per connettersi a un hub IoT tramite AMQP, un dispositivo può usare l'autenticazione CBS (Claims Based Security) o l'autenticazione SASL (Simple Authentication and Security Layer).

Per il client del dispositivo sono necessarie le informazioni seguenti:

Informazioni Valore
Nome host dell'hub IoT <iot-hub-name>.azure-devices.net
Chiave di accesso Una chiave primaria o secondaria associata al dispositivo
Firma di accesso condiviso Firma di accesso condiviso di breve durata nel formato seguente: SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI}. Per ottenere il codice per la generazione di questa firma, vedere Controllare l'accesso a hub IoT.

Il frammento di codice seguente usa la libreria uAMQP in Python per connettersi a un hub IoT tramite un collegamento mittente.

import uamqp
import urllib
import uuid

# Use generate_sas_token implementation available here:
# https://learn.microsoft.com/azure/iot-hub/iot-hub-devguide-security#sas-token-structure
from helper import generate_sas_token

iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
device_id = '<device-id>'
access_key = '<primary-or-secondary-key>'
username = '{device_id}@sas.{iot_hub_name}'.format(
    device_id=device_id, iot_hub_name=iot_hub_name)
sas_token = generate_sas_token('{hostname}/devices/{device_id}'.format(
    hostname=hostname, device_id=device_id), access_key, None)

# e.g., '/devices/{device_id}/messages/devicebound'
operation = '<operation-link-name>'
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

receive_client = uamqp.ReceiveClient(uri, debug=True)
send_client = uamqp.SendClient(uri, debug=True)

I percorsi di collegamento seguenti sono supportati come operazioni del dispositivo:

Autore Tipo di collegamento Percorso del collegamento Descrizione
Dispositivi Collegamento ricevitore /devices/<deviceID>/messages/devicebound I messaggi da cloud a dispositivo destinati ai dispositivi vengono ricevuti in questo collegamento da ogni dispositivo di destinazione.
Dispositivi Collegamento al mittente /devices/<deviceID>/messages/events I messaggi da dispositivo a cloud inviati da un dispositivo vengono inviati tramite questo collegamento.
Dispositivi Collegamento al mittente /messages/serviceBound/feedback Feedback dei messaggi da cloud a dispositivo inviati al servizio tramite questo collegamento da dispositivi.

Ricevere comandi da cloud a dispositivo (client del dispositivo)

I comandi da cloud a dispositivo inviati ai dispositivi arrivano su un /devices/<deviceID>/messages/devicebound collegamento. I dispositivi possono ricevere questi messaggi in batch e usare il payload dei dati dei messaggi, le proprietà dei messaggi, le annotazioni o le proprietà dell'applicazione nel messaggio in base alle esigenze.

Il frammento di codice seguente usa la libreria uAMQP in Python) per ricevere messaggi da cloud a dispositivo.

# ...
# Create a receive client for the cloud-to-device receive link on the device
operation = '/devices/{device_id}/messages/devicebound'.format(
    device_id=device_id)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

receive_client = uamqp.ReceiveClient(uri, debug=True)
while True:
    batch = receive_client.receive_message_batch(max_batch_size=5)
    for msg in batch:
        print('*** received a message ***')
        print(''.join(msg.get_data()))

        # Property 'to' is set to: '/devices/device1/messages/devicebound',
        print('\tto:                     ' + str(msg.properties.to))

        # Property 'message_id' is set to value provided by the service
        print('\tmessage_id:             ' + str(msg.properties.message_id))

        # Other properties are present if they were provided by the service
        print('\tcreation_time:          ' + str(msg.properties.creation_time))
        print('\tcorrelation_id:         ' +
              str(msg.properties.correlation_id))
        print('\tcontent_type:           ' + str(msg.properties.content_type))
        print('\treply_to_group_id:      ' +
              str(msg.properties.reply_to_group_id))
        print('\tsubject:                ' + str(msg.properties.subject))
        print('\tuser_id:                ' + str(msg.properties.user_id))
        print('\tgroup_sequence:         ' +
              str(msg.properties.group_sequence))
        print('\tcontent_encoding:       ' +
              str(msg.properties.content_encoding))
        print('\treply_to:               ' + str(msg.properties.reply_to))
        print('\tabsolute_expiry_time:   ' +
              str(msg.properties.absolute_expiry_time))
        print('\tgroup_id:               ' + str(msg.properties.group_id))

        # Message sequence number in the built-in event hub
        print('\tx-opt-sequence-number:  ' +
              str(msg.annotations['x-opt-sequence-number']))

Inviare messaggi di telemetria (client del dispositivo)

È anche possibile inviare messaggi di telemetria da un dispositivo usando AMQP. Il dispositivo può facoltativamente fornire un dizionario di proprietà dell'applicazione o varie proprietà del messaggio, ad esempio l'ID messaggio.

Per instradare i messaggi in base al corpo del messaggio, è necessario impostare la content_type proprietà su application/json;charset=utf-8. Per altre informazioni sul routing dei messaggi in base alle proprietà del messaggio o al corpo del messaggio, vedere la documentazione sulla sintassi delle query di routing dei messaggi hub IoT.

Il frammento di codice seguente usa la libreria uAMQP in Python per inviare messaggi da dispositivo a cloud da un dispositivo.

# ...
# Create a send client for the device-to-cloud send link on the device
operation = '/devices/{device_id}/messages/events'.format(device_id=device_id)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username), urllib.quote_plus(sas_token), hostname, operation)

send_client = uamqp.SendClient(uri, debug=True)

# Set any of the applicable message properties
msg_props = uamqp.message.MessageProperties()
msg_props.message_id = str(uuid.uuid4())
msg_props.creation_time = None
msg_props.correlation_id = None
msg_props.content_type = 'application/json;charset=utf-8'
msg_props.reply_to_group_id = None
msg_props.subject = None
msg_props.user_id = None
msg_props.group_sequence = None
msg_props.to = None
msg_props.content_encoding = None
msg_props.reply_to = None
msg_props.absolute_expiry_time = None
msg_props.group_id = None

# Application properties in the message (if any)
application_properties = { "app_property_key": "app_property_value" }

# Create message
msg_data = b"Your message payload goes here"
message = uamqp.Message(msg_data, properties=msg_props, application_properties=application_properties)

send_client.queue_message(message)
results = send_client.send_all_messages()

for result in results:
    if result == uamqp.constants.MessageState.SendFailed:
        print result

Note aggiuntive

  • Le connessioni AMQP potrebbero essere interrotte a causa di un errore di rete o della scadenza del token di autenticazione (generato nel codice). Il client del servizio deve gestire queste circostanze e ristabilire la connessione e i collegamenti, se necessario. Se un token di autenticazione scade, il client può evitare un rilascio di connessione rinnovando in modo proattivo il token prima della scadenza.

  • Il client deve occasionalmente essere in grado di gestire correttamente i reindirizzamenti dei collegamenti. Per comprendere tale operazione, vedere la documentazione del client AMQP.

Passaggi successivi

Per altre informazioni sul protocollo AMQP, vedere la specifica AMQP v1.0.

Per altre informazioni sulla messaggistica hub IoT, vedere: