AMQP Protokolünü kullanarak IoT hub'ı ile iletişim kurma

Azure IoT Hub, cihaza yönelik ve hizmete yönelik uç noktalar aracılığıyla çeşitli işlevler sunmak için GELIŞMIŞ İLETI SıRAYA ALMA PROTOKOLÜ (AMQP) sürüm 1.0'ı destekler. Bu belgede, IoT hub'ını kullanarak bir IoT hub'a bağlanmak için AMQP istemcilerinin IoT Hub açıkmektedir.

Hizmet istemcisi

Bağlan IoT hub'ı (hizmet istemcisi) için kimlik doğrulaması yapma ve kimlik doğrulaması yapma

Bir IoT hub'a AMQP kullanarak bağlanmak için, bir istemci talep tabanlı güvenlik (CBS) veya Basit Kimlik Doğrulama ve Güvenlik Katmanı (SASL) kimlik doğrulamasını kullanabilir.

Hizmet istemcisi için aşağıdaki bilgiler gereklidir:

Bilgi Değer
IoT hub ana bilgisayar adı <iot-hub-name>.azure-devices.net
Anahtar adı service
Erişim anahtarı Hizmetle ilişkili birincil veya ikincil anahtar
Paylaşılan erişim imzası Şu biçimde kısa süreli paylaşılan erişim SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI} imzası: . Bu imzayı oluşturmak için kodu almak için bkz. IoT Hub.

Aşağıdaki kod parçacığı, bir gönderen bağlantısı aracılığıyla bir IoT hub'a bağlanmak için Python'daki uAMQP kitaplığını kullanır.

import uamqp
import urllib
import time

# Use generate_sas_token implementation available here:
# https://docs.microsoft.com/azure/iot-hub/iot-hub-devguide-security#security-token-structure
from helper import generate_sas_token

iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
policy_name = 'service'
access_key = '<primary-or-secondary-key>'
operation = '<operation-link-name>'  # example: '/messages/devicebound'

username = '{policy_name}@sas.root.{iot_hub_name}'.format(
    iot_hub_name=iot_hub_name, policy_name=policy_name)
sas_token = generate_sas_token(hostname, access_key, policy_name)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

# Create a send or receive client
send_client = uamqp.SendClient(uri, debug=True)
receive_client = uamqp.ReceiveClient(uri, debug=True)

Buluttan cihaza iletileri çağırma (hizmet istemcisi)

Hizmetle IoT hub'ı arasında ve cihaz ile IoT hub'ı arasında buluttan cihaza ileti alışverişi hakkında bilgi edinmek için bkz. IoThub'ınıza buluttan cihaza iletiler gönderme. Hizmet istemcisi, aşağıdaki tabloda açıklandığı gibi, daha önce cihazlardan gönderilen iletiler için ileti göndermek ve geri bildirim almak üzere iki bağlantı kullanır:

Oluşturan: Bağlantı türü Bağlantı yolu Description
Hizmet Gönderen bağlantısı /messages/devicebound Cihazlara gönderilen buluttan cihaza iletiler hizmet tarafından bu bağlantıya gönderilir. Bu bağlantı üzerinden gönderilen iletilerin To özelliği, hedef cihazın alıcı bağlantı yoluna ayarlanmıştır. /devices/<deviceID>/messages/devicebound
Hizmet Alıcı bağlantısı /messages/serviceBound/feedback Hizmet tarafından bu bağlantıdan alınan cihazlardan gelen tamamlama, reddetme ve bırakma geri bildirim iletileri. Geri bildirim iletileri hakkında daha fazla bilgi için bkz. IoT hub'larından buluttan cihaza iletiler gönderme.

Aşağıdaki kod parçacığında, Python'da uAMQPkitaplığını kullanarak buluttan cihaza ileti oluşturma ve bunu bir cihaza gönderme adımları ve bilgiler yer almaktadır.

import uuid
# Create a message and set message property 'To' to the device-bound link on device
msg_id = str(uuid.uuid4())
msg_content = b"Message content goes here!"
device_id = '<device-id>'
to = '/devices/{device_id}/messages/devicebound'.format(device_id=device_id)
ack = 'full'  # Alternative values are 'positive', 'negative', and 'none'
app_props = {'iothub-ack': ack}
msg_props = uamqp.message.MessageProperties(message_id=msg_id, to=to)
msg = uamqp.Message(msg_content, properties=msg_props,
                    application_properties=app_props)

# Send the message by using the send client that you created and connected to the IoT hub earlier
send_client.queue_message(msg)
results = send_client.send_all_messages()

# Close the client if it's not needed
send_client.close()

Geri bildirim almak için hizmet istemcisi bir alıcı bağlantısı oluşturur. Aşağıdaki kod parçacığında, Python'da uAMQPkitaplığını kullanarak bağlantı oluşturma adımları ve bilgiler yer almaktadır:

import json

operation = '/messages/serviceBound/feedback'

# ...
# Re-create the URI by using the preceding feedback path and authenticate it
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

receive_client = uamqp.ReceiveClient(uri, debug=True)
batch = receive_client.receive_message_batch(max_batch_size=10)
for msg in batch:
    print('received a message')
    # Check content_type in message property to identify feedback messages coming from device
    if msg.properties.content_type == 'application/vnd.microsoft.iothub.feedback.json':
        msg_body_raw = msg.get_data()
        msg_body_str = ''.join(msg_body_raw)
        msg_body = json.loads(msg_body_str)
        print(json.dumps(msg_body, indent=2))
        print('******************')
        for feedback in msg_body:
            print('feedback received')
            print('\tstatusCode: ' + str(feedback['statusCode']))
            print('\toriginalMessageId: ' + str(feedback['originalMessageId']))
            print('\tdeviceId: ' + str(feedback['deviceId']))
            print
    else:
        print('unknown message:', msg.properties.content_type)

Yukarıdaki kodda gösterildiği gibi buluttan cihaza geri bildirim iletisi, application/vnd.microsoft.iothub.feedback.json içerik türüne sahip olur. Özgün iletinin teslim durumunu çıkarımk için iletinin JSON gövdesinin özelliklerini kullanabilirsiniz:

  • Geri statusCode bildirim gövdesinin anahtarı şu değerlerden birini içerir: Success, Expired, DeliveryCountExceeded, Rejected veya Purged.

  • Geri deviceId bildirim gövdesinin anahtarında hedef cihazın kimliği vardır.

  • Geri originalMessageId bildirim gövdesinin anahtarı, hizmet tarafından gönderilen özgün buluttan cihaza iletinin kimliğine sahip. Bu teslim durumunu, geri bildirimleri buluttan cihaza iletilerle arasında ilişkide kullanmak için kullanabilirsiniz.

Telemetri iletilerini alma (hizmet istemcisi)

Varsayılan olarak, IoT hub'ın gelen cihaz telemetri iletilerini yerleşik bir olay hub'larında depolar. Hizmet istemciniz, depolanan olayları almak için AMQP Protokolünü kullanabilir.

Bu amaçla, hizmet istemcisinin önce IoT hub uç noktasına bağlanması ve yerleşik olay hub'ları için yeniden yönlendirme adresi almaları gerekir. Ardından hizmet istemcisi, yerleşik olay hub'ı ile bağlantı için sağlanan adresi kullanır.

Her adımda istemcinin aşağıdaki bilgileri sun ihtiyacı vardır:

  • Geçerli hizmet kimlik bilgileri (hizmet paylaşılan erişim imzası belirteci).

  • İletileri almak için amacı olan tüketici grubu bölümünün iyi biçimlendirilmiş yolu. Verilen bir tüketici grubu ve bölüm kimliği için yol şu biçime sahip olur: /messages/events/ConsumerGroups/<consumer_group>/Partitions/<partition_id> (varsayılan tüketici $Default grubudur).

  • Bölümde bir başlangıç noktası oluşturmak için isteğe bağlı bir filtreleme predicate. Bu durum bir sıra numarası, uzaklık veya sıralandı zaman damgası şeklinde olabilir.

Aşağıdaki kod parçacığı önceki adımları göstermek için Python'daki uAMQP kitaplığını kullanır:

import json
import uamqp
import urllib
import time

# Use the generate_sas_token implementation that's available here: https://docs.microsoft.com/azure/iot-hub/iot-hub-devguide-security#security-token-structure
from helper import generate_sas_token

iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
policy_name = 'service'
access_key = '<primary-or-secondary-key>'
operation = '/messages/events/ConsumerGroups/{consumer_group}/Partitions/{p_id}'.format(
    consumer_group='$Default', p_id=0)

username = '{policy_name}@sas.root.{iot_hub_name}'.format(
    policy_name=policy_name, iot_hub_name=iot_hub_name)
sas_token = generate_sas_token(hostname, access_key, policy_name)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

# Optional filtering predicates can be specified by using endpoint_filter
# Valid predicates include:
# - amqp.annotation.x-opt-sequence-number
# - amqp.annotation.x-opt-offset
# - amqp.annotation.x-opt-enqueued-time
# Set endpoint_filter variable to None if no filter is needed
endpoint_filter = b'amqp.annotation.x-opt-sequence-number > 2995'

# Helper function to set the filtering predicate on the source URI


def set_endpoint_filter(uri, endpoint_filter=''):
    source_uri = uamqp.address.Source(uri)
    source_uri.set_filter(endpoint_filter)
    return source_uri


receive_client = uamqp.ReceiveClient(
    set_endpoint_filter(uri, endpoint_filter), debug=True)
try:
    batch = receive_client.receive_message_batch(max_batch_size=5)
except uamqp.errors.LinkRedirect as redirect:
    # Once a redirect error is received, close the original client and recreate a new one to the re-directed address
    receive_client.close()

    sas_auth = uamqp.authentication.SASTokenAuth.from_shared_access_key(
        redirect.address, policy_name, access_key)
    receive_client = uamqp.ReceiveClient(set_endpoint_filter(
        redirect.address, endpoint_filter), auth=sas_auth, debug=True)

# Start receiving messages in batches
batch = receive_client.receive_message_batch(max_batch_size=5)
for msg in batch:
    print('*** received a message ***')
    print(''.join(msg.get_data()))
    print('\t: ' + str(msg.annotations['x-opt-sequence-number']))
    print('\t: ' + str(msg.annotations['x-opt-offset']))
    print('\t: ' + str(msg.annotations['x-opt-enqueued-time']))

Belirli bir cihaz kimliği için IoT hub'ı, iletilerin depolan yer alan bölümü belirlemek için cihaz kimliğinin karması kullanır. Yukarıdaki kod parçacığı, olayların bu tür tek bir bölümden nasıl alınmıştır? Ancak tipik bir uygulamanın genellikle tüm olay hub'ı bölümlerine depolanmış olayları almaları gerektiğini unutmayın.

Cihaz istemcisi

Bağlan IoT hub'ı (cihaz istemcisi) için kimlik doğrulaması yapma ve kimlik doğrulaması yapma

Bir cihaz, AMQP kullanarak bir IoT hub'a bağlanmak için talep tabanlı güvenlik (CBS) veya Basit Kimlik Doğrulama ve Güvenlik Katmanı (SASL) kimlik doğrulamasını kullanabilir.

Cihaz istemcisi için aşağıdaki bilgiler gereklidir:

Bilgi Değer
IoT hub ana bilgisayar adı <iot-hub-name>.azure-devices.net
Erişim anahtarı Cihazla ilişkili birincil veya ikincil anahtar
Paylaşılan erişim imzası Şu biçimde kısa süreli paylaşılan erişim SharedAccessSignature sig={signature-string}&se={expiry}&skn={policyName}&sr={URL-encoded-resourceURI} imzası: . Bu imzayı oluşturmak için kodu almak için bkz. IoT Hub.

Aşağıdaki kod parçacığı, bir gönderen bağlantısı aracılığıyla bir IoT hub'a bağlanmak için Python'daki uAMQP kitaplığını kullanır.

import uamqp
import urllib
import uuid

# Use generate_sas_token implementation available here:
# https://docs.microsoft.com/azure/iot-hub/iot-hub-devguide-security#security-token-structure
from helper import generate_sas_token

iot_hub_name = '<iot-hub-name>'
hostname = '{iot_hub_name}.azure-devices.net'.format(iot_hub_name=iot_hub_name)
device_id = '<device-id>'
access_key = '<primary-or-secondary-key>'
username = '{device_id}@sas.{iot_hub_name}'.format(
    device_id=device_id, iot_hub_name=iot_hub_name)
sas_token = generate_sas_token('{hostname}/devices/{device_id}'.format(
    hostname=hostname, device_id=device_id), access_key, None)

# e.g., '/devices/{device_id}/messages/devicebound'
operation = '<operation-link-name>'
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

receive_client = uamqp.ReceiveClient(uri, debug=True)
send_client = uamqp.SendClient(uri, debug=True)

Aşağıdaki bağlantı yolları cihaz işlemleri olarak de desteklemektedir:

Oluşturan: Bağlantı türü Bağlantı yolu Description
Cihazlar Alıcı bağlantısı /devices/<deviceID>/messages/devicebound Cihazları hedef alan buluttan cihaza iletiler her bir hedef cihaz tarafından bu bağlantıdan alınmıştır.
Cihazlar Gönderen bağlantısı /devices/<deviceID>/messages/events Bir cihazdan gönderilen cihazdan buluta iletiler bu bağlantı üzerinden gönderilir.
Cihazlar Gönderen bağlantısı /messages/serviceBound/feedback Cihazlar tarafından bu bağlantı üzerinden hizmete gönderilen buluttan cihaza ileti geri bildirimi.

Buluttan cihaza komutları alma (cihaz istemcisi)

Cihazlara gönderilen buluttan cihaza komutlar bir bağlantıya /devices/<deviceID>/messages/devicebound gelir. Cihazlar bu iletileri toplu olarak alır ve gerektiğinde iletideki ileti veri yükünü, ileti özelliklerini, ek açıklamaları veya uygulama özelliklerini kullanabilir.

Aşağıdaki kod parçacığı, bir cihaz tarafından buluttan cihaza iletileri almak için Python'daki uAMQPkitaplığını kullanır.

# ...
# Create a receive client for the cloud-to-device receive link on the device
operation = '/devices/{device_id}/messages/devicebound'.format(
    device_id=device_id)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username),
                                  urllib.quote_plus(sas_token), hostname, operation)

receive_client = uamqp.ReceiveClient(uri, debug=True)
while True:
    batch = receive_client.receive_message_batch(max_batch_size=5)
    for msg in batch:
        print('*** received a message ***')
        print(''.join(msg.get_data()))

        # Property 'to' is set to: '/devices/device1/messages/devicebound',
        print('\tto:                     ' + str(msg.properties.to))

        # Property 'message_id' is set to value provided by the service
        print('\tmessage_id:             ' + str(msg.properties.message_id))

        # Other properties are present if they were provided by the service
        print('\tcreation_time:          ' + str(msg.properties.creation_time))
        print('\tcorrelation_id:         ' +
              str(msg.properties.correlation_id))
        print('\tcontent_type:           ' + str(msg.properties.content_type))
        print('\treply_to_group_id:      ' +
              str(msg.properties.reply_to_group_id))
        print('\tsubject:                ' + str(msg.properties.subject))
        print('\tuser_id:                ' + str(msg.properties.user_id))
        print('\tgroup_sequence:         ' +
              str(msg.properties.group_sequence))
        print('\tcontent_encoding:       ' +
              str(msg.properties.content_encoding))
        print('\treply_to:               ' + str(msg.properties.reply_to))
        print('\tabsolute_expiry_time:   ' +
              str(msg.properties.absolute_expiry_time))
        print('\tgroup_id:               ' + str(msg.properties.group_id))

        # Message sequence number in the built-in event hub
        print('\tx-opt-sequence-number:  ' +
              str(msg.annotations['x-opt-sequence-number']))

Telemetri iletileri gönderme (cihaz istemcisi)

AmQP kullanarak bir cihazdan telemetri iletileri de gönderebilirsiniz. Cihaz isteğe bağlı olarak uygulama özellikleri sözlüğü veya ileti kimliği gibi çeşitli ileti özellikleri sağlar.

Aşağıdaki kod parçacığı, cihazdan buluta iletiler göndermek için Python'daki uAMQP kitaplığını kullanır.

# ...
# Create a send client for the device-to-cloud send link on the device
operation = '/devices/{device_id}/messages/events'.format(device_id=device_id)
uri = 'amqps://{}:{}@{}{}'.format(urllib.quote_plus(username), urllib.quote_plus(sas_token), hostname, operation)

send_client = uamqp.SendClient(uri, debug=True)

# Set any of the applicable message properties
msg_props = uamqp.message.MessageProperties()
msg_props.message_id = str(uuid.uuid4())
msg_props.creation_time = None
msg_props.correlation_id = None
msg_props.content_type = None
msg_props.reply_to_group_id = None
msg_props.subject = None
msg_props.user_id = None
msg_props.group_sequence = None
msg_props.to = None
msg_props.content_encoding = None
msg_props.reply_to = None
msg_props.absolute_expiry_time = None
msg_props.group_id = None

# Application properties in the message (if any)
application_properties = { "app_property_key": "app_property_value" }

# Create message
msg_data = b"Your message payload goes here"
message = uamqp.Message(msg_data, properties=msg_props, application_properties=application_properties)

send_client.queue_message(message)
results = send_client.send_all_messages()

for result in results:
    if result == uamqp.constants.MessageState.SendFailed:
        print result

Ek notlar

  • Ağ hatası veya kimlik doğrulama belirtecini sona ermesi (kodda oluşturulan) nedeniyle AMQP bağlantıları kesintiye neden olabilir. Hizmet istemcisinin bu koşulları işlemesi ve gerekirse bağlantıyı ve bağlantıları yeniden kurması gerekir. Bir kimlik doğrulama belirtecinin süresi dolmazsa, istemci belirteci süresi dolmadan önce proaktif olarak yenileerek bağlantının düşmesini önler.

  • İstemcinizin bazen bağlantı yeniden yönlendirmelerini doğru şekilde işlemesi gerekir. Böyle bir işlemi anlamak için AMQP istemci belgelerinize bakın.

Sonraki adımlar

AMQP Protokolü hakkında daha fazla bilgi edinmek için bkz. AMQP v1.0 belirtimi.

Mesajlaşma hakkında daha fazla IoT Hub için bkz: