您现在访问的是微软AZURE全球版技术文档网站,若需要访问由世纪互联运营的MICROSOFT AZURE中国区技术文档网站,请访问 https://docs.azure.cn.

使用 IoT 中心将文件从设备上传到云 (Python)

本文演示如何使用 IoT 中心的文件上传功能将文件上传到 Azure Blob 存储。 本教程介绍如何:

  • 安全地提供存储容器用于文件上传。

  • 使用 Python 客户端通过 IoT 中心上传文件。

从设备将遥测数据发送到 IoT 中心快速入门演示了 IoT 中心基本的设备到云的消息传送功能。 但是,在某些情况下,无法轻松地将设备发送的数据映射为 IoT 中心接受的相对较小的设备到云消息。 需要从设备上传文件时,仍可以使用 IoT 中心的安全性和可靠性。

在本教程的末尾,你将运行 Python 控制台应用:

  • FileUpload.py,该应用使用 Python 设备 SDK 将文件上传到存储中 。

可以在 GitHub 的 https://github.com/Azure/azure-iot-sdk-python/blob/master/azure-iot-device/samples/async-hub-scenarios/upload_to_blob.py 中找到更高级的文件上传应用版本。 若要运行此版本,必须了解 x.509 证书、密钥和通行短语。 因为此版本不是上传文件所必需的,所以下面提供的代码不使用 X.509。

备注

IoT 中心通过 Azure IoT 设备 SDK 对许多设备平台和语言(包括 C、Java、Javascript 和 Python)提供 SDK 支持。 有关如何使用 Python 将设备连接到本教程中的代码(通常是连接到 Azure IoT 中心)的说明,请参阅 Azure IoT Python SDK

重要

使用 X.509 证书颁发机构 (CA) 身份验证的设备上的文件上传功能为公共预览版,并且必须启用预览模式。 它在使用 x.509 指纹身份验证的设备上已正式发布。 若要了解有关使用 IoT 中心进行 x.509 身份验证的详细信息,请参阅支持的 x.509 证书

先决条件

  • 有效的 Azure 帐户。 (如果没有帐户,只需几分钟即可创建一个免费帐户。)

  • 建议使用 Python 版本 3.7 或更高版本。 请确保根据安装程序的要求,使用 32 位或 64 位安装。 在安装过程中出现提示时,请确保将 Python 添加到特定于平台的环境变量中。 有关支持的其他 Python 版本,请参阅 SDK 文档中的 Azure IoT 设备功能

    重要

    由于本文中的设备代码使用异步 API,因此不能使用 Python 2.7。

  • 确保已在防火墙中打开端口 8883。 本文中的设备示例使用 MQTT 协议,该协议通过端口 8883 进行通信。 在某些公司和教育网络环境中,此端口可能被阻止。 有关解决此问题的更多信息和方法,请参阅连接到 IoT 中心(MQTT)

创建 IoT 中心

此部分介绍如何使用 Azure 门户创建 IoT 中心。

  1. 登录 Azure 门户

  2. 从 Azure 主页中选择“+ 创建资源”按钮,然后在“搜索市场”字段中输入“IoT 中心”。

  3. 在搜索结果中选择“IoT 中心”,然后选择“创建” 。

  4. 在“基本信息”选项卡上,按如下所示填写字段:

    • 订阅:选择要用于中心的订阅。

    • 资源组:选择一个资源组或新建一个资源组。 若要新建资源组,请选择“新建”并填写要使用的名称。 若要使用现有的资源组,请选择它。 有关详细信息,请参阅管理 Azure 资源管理器资源组

    • 区域:选择中心所在的区域。 选择最靠近你的位置。 某些功能(如 IoT 中心设备流)仅适用于特定区域。 对于这些受限功能,你必须选择受支持的区域之一。

    • IoT 中心名称:输入中心的名称。 该名称必须全局唯一。

    重要

    由于 IoT 中心将作为 DNS 终结点公开可发现,因此请务必避免在命名它时输入任何敏感信息或个人身份信息。

    在 Azure 门户中创建中心。

  5. 在完成时选择“下一步:网络”,继续创建中心。

    选择可以连接到 IoT 中心的终结点。 你可以选择默认设置“公共终结点(所有网络)”,也可选择“公共终结点(选定的 IP 范围)”或“专用终结点” 。 接受此示例的默认设置。

    选择可以连接的终结点。

  6. 在完成时选择“下一步:管理”,继续创建中心。

    使用 Azure 门户为新的中心设置大小和规模。

    可在此处接受默认设置。 如果需要,可以修改以下任何字段:

    • 定价和缩放层:选择的层。 可以根据你需要的功能数以及每天通过解决方案发送的消息数从多个层级中进行选择。 免费层适用于测试和评估。 允许 500 台设备连接到中心,每天最多可传输 8,000 条消息。 每个 Azure 订阅可以在免费层中创建一个 IoT 中心。

      如果正在完成 IoT 中心设备流的快速入门,请选择免费层。

    • IoT 中心单元:每日每单位允许的消息数取决于中心的定价层。 例如,如果希望中心支持 700,000 条消息引入,请选择两个 S1 层单位。 有关其他层选项的详细信息,请参阅选择合适的 IoT 中心层

    • Defender for IoT:启用此功能可为 IoT 和设备添加额外的一层威胁防护。 此选项不可用于免费层的中心。 有关此功能的详细信息,请参阅适用于 IoT 的 Azure 安全中心

    • 高级设置 > 设备到云的分区:此属性将设备到云消息与这些消息的同步读取器数目相关联。 大多数中心只需要 4 个分区。

  7. 在完成时选择“下一步:标记”继续到下一屏幕。

    标记是名称/值对。 可以为多个资源和资源组分配相同的标记,以便对资源进行分类并合并计费。 有关详细信息,请参阅使用标记来组织 Azure 资源

    使用 Azure 门户为中心分配标记。

  8. 在完成时选择“下一步:查看+创建”可查看选择。 你会看到类似于此屏幕的内容,但其中包含创建中心时选择的值。

    查看用于创建新中心的信息。

  9. 选择“创建”以创建新的中心。 创建中心需要几分钟时间。

将 Azure 存储帐户关联到 IoT 中心

由于模拟设备应用将文件上传到 Blob,因此必须拥有与 IoT 中心关联的 Azure 存储帐户。 将 Azure 存储帐户与 IoT 中心相关联时,IoT 中心会生成一个 SAS URI。 设备可以使用此 SAS URI 安全地将文件上传到 Blob 容器。 IoT 中心服务和设备 SDK 协调生成 SAS URI 的过程,并使其可供设备用来上传文件。

按照使用 Azure 门户配置文件上传中的说明进行操作。 确保有一个 Blob 容器与你的 IoT 中心关联并且文件通知已启用。

在门户中启用文件通知

从设备应用上传文件

本部分中操作将会创建可将文件上传到 IoT 中心的设备应用。

  1. 在命令提示符处,运行以下命令来安装 azure-iot-device 包。 可以使用此包与 IoT 中心协调文件上传操作。

    pip install azure-iot-device
    
  2. 在命令提示符处,运行以下命令来安装 azure.storage.blob 包。 可以使用此包执行文件上传操作。

    pip install azure.storage.blob
    
  3. 创建将上传到 blob 存储的测试文件。

  4. 使用文本编辑器,在工作文件夹中创建一个 FileUpload.py 文件 。

  5. 在 FileUpload.py 文件的开头添加以下 import 语句和变量 。

    import os
    import asyncio
    from azure.iot.device.aio import IoTHubDeviceClient
    from azure.core.exceptions import AzureError
    from azure.storage.blob import BlobClient
    
    CONNECTION_STRING = "[Device Connection String]"
    PATH_TO_FILE = r"[Full path to local file]"
    
  6. 在文件中,将 [Device Connection String] 替换为 IoT 中心设备的连接字符串。 将 [Full path to local file] 替换为你创建的测试文件的路径,或是设备上要上传的任何文件的路径。

  7. 创建一个函数以将文件上传到 blob 存储:

    async def store_blob(blob_info, file_name):
        try:
            sas_url = "https://{}/{}/{}{}".format(
                blob_info["hostName"],
                blob_info["containerName"],
                blob_info["blobName"],
                blob_info["sasToken"]
            )
    
            print("\nUploading file: {} to Azure Storage as blob: {} in container {}\n".format(file_name, blob_info["blobName"], blob_info["containerName"]))
    
            # Upload the specified file
            with BlobClient.from_blob_url(sas_url) as blob_client:
                with open(file_name, "rb") as f:
                    result = blob_client.upload_blob(f, overwrite=True)
                    return (True, result)
    
        except FileNotFoundError as ex:
            # catch file not found and add an HTTP status code to return in notification to IoT Hub
            ex.status_code = 404
            return (False, ex)
    
        except AzureError as ex:
            # catch Azure errors that might result from the upload operation
            return (False, ex)
    

    此函数分析传递给它的 blob_info 结构,以创建用于初始化 azure.storage.blob.BlobClient 的 URL。 然后,它使用此客户端将文件上传到 Azure Blob 存储。

  8. 添加下述用来连接客户端并上传文件的代码:

    async def main():
        try:
            print ( "IoT Hub file upload sample, press Ctrl-C to exit" )
    
            conn_str = CONNECTION_STRING
            file_name = PATH_TO_FILE
            blob_name = os.path.basename(file_name)
    
            device_client = IoTHubDeviceClient.create_from_connection_string(conn_str)
    
            # Connect the client
            await device_client.connect()
    
            # Get the storage info for the blob
            storage_info = await device_client.get_storage_info_for_blob(blob_name)
    
            # Upload to blob
            success, result = await store_blob(storage_info, file_name)
    
            if success == True:
                print("Upload succeeded. Result is: \n") 
                print(result)
                print()
    
                await device_client.notify_blob_upload_status(
                    storage_info["correlationId"], True, 200, "OK: {}".format(file_name)
                )
    
            else :
                # If the upload was not successful, the result is the exception object
                print("Upload failed. Exception is: \n") 
                print(result)
                print()
    
                await device_client.notify_blob_upload_status(
                    storage_info["correlationId"], False, result.status_code, str(result)
                )
    
        except Exception as ex:
            print("\nException:")
            print(ex)
    
        except KeyboardInterrupt:
            print ( "\nIoTHubDeviceClient sample stopped" )
    
        finally:
            # Finally, disconnect the client
            await device_client.disconnect()
    
    
    if __name__ == "__main__":
        asyncio.run(main())
        #loop = asyncio.get_event_loop()
        #loop.run_until_complete(main())
        #loop.close()
    

    此代码创建一个异步的 IoTHubDeviceClient ,并使用以下 API 来管理通过 IoT 中心进行的文件上传操作:

    • get_storage_info_for_blob 从 IoT 中心获取你之前创建的链接存储帐户的相关信息。 此信息包括主机名、容器名称、blob 名称和 SAS 令牌。 存储信息传递到 store_blob 函数(在上一步中创建),因此该函数中的 BlobClient 可以通过 Azure 存储进行身份验证。 get_storage_info_for_blob 方法还会返回一个 correlation_id,后者在 notify_blob_upload_status 方法中使用。 IoT 中心使用 correlation_id 来标记你在处理的 blob。

    • notify_blob_upload_status 向 IoT 中心通知你的 blob 存储操作的状态。 你将向其传递通过 get_storage_info_for_blob 方法获取的 correlation_id。 IoT 中心使用它来通知任何可能正在侦听文件上传任务状态通知的服务。

  9. 保存并关闭 UploadFile.py 文件 。

运行应用程序

现即可运行应用程序。

  1. 在工作文件夹的命令提示符处,运行以下命令:

    python FileUpload.py
    
  2. 以下屏幕截图显示来自 FileUpload 应用的输出 :

    simulated-device 应用的输出

  3. 可以使用门户查看所配置的存储容器中上传的文件:

    上传的文件

后续步骤

在本教程中,你已学习了如何使用 IoT 中心的文件上传功能来简化从设备进行的文件上传。 可以使用以下文章继续探索 IoT 中心功能和方案:

使用以下链接详细了解 Azure Blob 存储: