Пошаговое руководство. Использование функции "Сбор" в Центрах событий с PythonEvent Hubs Capture walkthrough: Python

Сбор — это функция Центров событий Azure.Capture is a feature of Azure Event Hubs. Вы можете использовать захват для автоматической доставки потоковых данных в концентраторе событий в выбранную учетную запись хранилища BLOB-объектов Azure.You can use Capture to automatically deliver the streaming data in your event hub to an Azure Blob storage account of your choice. Эта возможность упрощает выполнение пакетной обработки данных потоковой передачи в реальном времени.This capability makes it easy to do batch processing on real-time streaming data. В этой статье мы расскажем, как использовать функцию "Сбор" в Центрах событий с Python.This article describes how to use Event Hubs Capture with Python. Дополнительные сведения о записи концентраторов событий см. в статье Захват событий с помощью концентраторов событий Azure.For more information about Event Hubs Capture, see Capture events through Azure Event Hubs.

В этом пошаговом руководстве для демонстрации функции записи используется пакет SDK для Azure Python .This walkthrough uses the Azure Python SDK to demonstrate the Capture feature. Программа sender.py отправляет смоделированные данные телеметрии окружающей среды в концентраторы событий в формате JSON.The sender.py program sends simulated environmental telemetry to Event Hubs in JSON format. Концентратор событий использует функцию записи для записи этих данных в хранилище BLOB-объектов в пакетах.The event hub uses the Capture feature to write this data to Blob storage in batches. Приложение capturereader.py считывает эти большие двоичные объекты, создает файл добавления для каждого устройства и записывает данные в CSV -файлы на каждом устройстве.The capturereader.py app reads these blobs, creates an append file for each of your devices, and writes the data to .csv files on each device.

В этом пошаговом руководстве описаны следующие операции:In this walkthrough, you:

  • Создайте учетную запись хранилища BLOB-объектов Azure и контейнер в портал Azure.Create an Azure Blob storage account and container in the Azure portal.
  • Включите запись концентраторов событий и направьте ее в учетную запись хранения.Enable Event Hubs Capture and direct it to your storage account.
  • Отправка данных в концентратор событий с помощью скрипта Python.Send data to your event hub by using a Python script.
  • Чтение и обработка файлов из записи концентраторов событий с помощью другого скрипта Python.Read and process files from Event Hubs Capture by using another Python script.

Технические условияPrerequisites

  • Python 3,4 или более поздней версии с установленным и обновленным pip.Python 3.4 or later, with pip installed and updated.

  • Подписка Azure.An Azure subscription. Если у вас еще нет подписки Azure, создайте бесплатную учетную запись, прежде чем начать работу.If you don't have one, create a free account before you begin.

  • Активное пространство имен концентраторов событий и концентратор событий, созданное с помощью инструкций в разделе Краткое руководство по созданию концентратора событий с использованием портал Azure.An active Event Hubs namespace and event hub, created by following the instructions at Quickstart: Create an event hub using Azure portal. Запишите имя пространства имен и концентратора событий, которое будет использоваться далее в этом пошаговом руководстве.Make a note of your namespace and event hub names to use later in this walkthrough.

    Примечание

    Если у вас уже есть контейнер хранилища для использования, можно включить запись и выбрать контейнер хранилища при создании концентратора событий.If you already have a storage container to use, you can enable Capture and select the storage container when you create the Event Hub.

  • Имя ключа общего доступа концентратора событий и значение первичного ключа.Your Event Hubs shared access key name and primary key value. Найдите или создайте эти значения в разделе политики общего доступа на странице концентраторов событий.Find or create these values under Shared access policies on your Event Hubs page. Имя ключа доступа по умолчанию — RootManageSharedAccessKey.The default access key name is RootManageSharedAccessKey. Скопируйте имя ключа доступа и значение первичного ключа для дальнейшего использования в этом пошаговом руководстве.Copy the access key name and the primary key value to use later in this walkthrough.

Создание учетной записи хранилища BLOB-объектов Azure и контейнераCreate an Azure Blob storage account and container

Создайте учетную запись хранения и контейнер, которые следует использовать для записи.Create a storage account and container to use for the capture.

  1. Войдите на портале Azure.Sign in to the Azure portal.

  2. На панели навигации слева выберите учетные записи хранения, а затем на экране учетные записи хранения выберите добавить.In the left navigation, select Storage accounts, and on the Storage accounts screen, select Add.

  3. На экране создания учетной записи хранения выберите подписку и группу ресурсов и присвойте учетной записи хранения имя.On the storage account creation screen, select a subscription and resource group, and give the storage account a name. Можно оставить другие значения по умолчанию.You can leave the other selections at default. Выберите Проверка + создать, просмотрите параметры, а затем щелкните создать.Select Review + create, review the settings, and then select Create.

    Создание учетной записи хранения

  4. После завершения развертывания выберите Переход к ресурсу, а затем на экране учетная запись хранения выберите контейнеры.When the deployment completes, select Go to resource, and on the storage account Overview screen, select Containers.

  5. На экране контейнеры выберите + контейнер.On the Containers screen, select + Container.

  6. На экране новый контейнер присвойте контейнеру имя, а затем нажмите кнопку ОК.On the New container screen, give the container a name, and then select OK. Запишите имя контейнера, которое будет использоваться далее в этом пошаговом руководстве.Make a note of the container name to use later in the walkthrough.

  7. В левой области навигации экрана контейнеры выберите ключи доступа.In the left navigation of the Containers screen, select Access keys. Скопируйте имя учетной записи храненияи значение ключа в поле Key1, чтобы использовать его позже в этом пошаговом руководстве.Copy the Storage account name, and the Key value under key1, to use later in the walkthrough.

Включить запись концентраторов событийEnable Event Hubs Capture

  1. В портал Azure перейдите к концентратору событий, выбрав его пространство имен концентраторов событий из всех ресурсов, выбрав концентраторы событий в левой области навигации, а затем выбрав концентратор событий.In the Azure portal, navigate to your event hub by selecting its Event Hubs Namespace from All resources, selecting Event hubs in the left navigation, and then selecting your event hub.
  2. На экране обзора концентратора событий выберите Захват событий.On the event hub Overview screen, select Capture events.
  3. На экране Capture (запись ) выберите вкл.On the Capture screen, select On. Затем в разделе контейнер хранилища Azureвыберите выбрать контейнер.Then, under Azure Storage Container, select Select Container.
  4. На экране контейнеры выберите контейнер хранилища, который необходимо использовать, а затем нажмите кнопку Выбрать.On the Containers screen, select the storage container you want to use, and then select Select.
  5. На экране Capture (запись ) выберите Save Changes (сохранить изменения).On the Capture screen, select Save changes.

Создание скрипта Python для отправки событий в концентратор событийCreate a Python script to send events to Event Hub

Этот скрипт отправляет 200 событий в ваш концентратор событий.This script sends 200 events to your event hub. События — это простые считывания среды в формате JSON.The events are simple environmental readings sent in JSON.

  1. Откройте предпочитаемый редактор Python, например Visual Studio Code.Open your favorite Python editor, such as Visual Studio Code.

  2. Создайте новый файл с именем sender.py.Create a new file called sender.py.

  3. Вставьте следующий код в sender.py.Paste the following code into sender.py. Замените собственные значения для концентраторов событий <namespace >, <AccessKeyName >, <primary значение ключа > и <eventhub >.Substitute your own values for the Event Hubs <namespace>, <AccessKeyName>, <primary key value>, and <eventhub>.

    import uuid
    import datetime
    import random
    import json
    from azure.servicebus.control_client import ServiceBusService
    
    sbs = ServiceBusService(service_namespace='<namespace>', shared_access_key_name='<AccessKeyName>', shared_access_key_value='<primary key value>')
    devices = []
    for x in range(0, 10):
        devices.append(str(uuid.uuid4()))
    
    for y in range(0,20):
        for dev in devices:
            reading = {'id': dev, 'timestamp': str(datetime.datetime.utcnow()), 'uv': random.random(), 'temperature': random.randint(70, 100), 'humidity': random.randint(70, 100)}
            s = json.dumps(reading)
            sbs.send_event('<eventhub>', s)
        print(y)
    
  4. Сохраните файл.Save the file.

Создание скрипта Python для чтения файлов записиCreate a Python script to read Capture files

Этот сценарий считывает записанные файлы и создает файл для каждого устройства, чтобы записывать данные только для этого устройства.This script reads the captured files and creates a file for each of your devices to write the data only for that device.

  1. В редакторе Python создайте новый файл с именем capturereader.py.In your Python editor, create a new file called capturereader.py.

  2. Вставьте следующий код в capturereader.py.Paste the following code into capturereader.py. Замените сохраненные значения для учетной записи <storageaccount >, > ключа доступа <storage и <storagecontainer >.Substitute your saved values for your <storageaccount>, <storage account access key>, and <storagecontainer>.

    import os
    import string
    import json
    import avro.schema
    from avro.datafile import DataFileReader, DataFileWriter
    from avro.io import DatumReader, DatumWriter
    from azure.storage.blob import BlockBlobService
    
    def processBlob(filename):
        reader = DataFileReader(open(filename, 'rb'), DatumReader())
        dict = {}
        for reading in reader:
            parsed_json = json.loads(reading["Body"])
            if not 'id' in parsed_json:
                return
            if not parsed_json['id'] in dict:
                list = []
                dict[parsed_json['id']] = list
            else:
                list = dict[parsed_json['id']]
                list.append(parsed_json)
        reader.close()
        for device in dict.keys():
            deviceFile = open(device + '.csv', "a")
            for r in dict[device]:
                deviceFile.write(", ".join([str(r[x]) for x in r.keys()])+'\n')
    
    def startProcessing(accountName, key, container):
        print('Processor started using path: ' + os.getcwd())
        block_blob_service = BlockBlobService(account_name=accountName, account_key=key)
        generator = block_blob_service.list_blobs(container)
        for blob in generator:
            #content_length == 508 is an empty file, so only process content_length > 508 (skip empty files)
            if blob.properties.content_length > 508:
                print('Downloaded a non empty blob: ' + blob.name)
                cleanName = str.replace(blob.name, '/', '_')
                block_blob_service.get_blob_to_path(container, blob.name, cleanName)
                processBlob(cleanName)
                os.remove(cleanName)
            block_blob_service.delete_blob(container, blob.name)
    startProcessing('<storageaccount>', '<storage account access key>', '<storagecontainer>')
    

Выполнение скриптов PythonRun the Python scripts

  1. Откройте командную строку с Python в пути и выполните следующие команды, чтобы установить пакеты необходимых компонентов Python:Open a command prompt that has Python in its path, and run these commands to install the Python prerequisite packages:

    pip install azure-storage
    pip install azure-servicebus
    pip install avro-python3
    

    Если у вас более ранняя версия azure-storage или azure, может потребоваться использовать параметр --upgrade.If you have an earlier version of azure-storage or azure, you might need to use the --upgrade option.

    Также может потребоваться выполнить следующую команду.You might also need to run the following command. Выполнение этой команды не является обязательным в большинстве систем.Running this command isn't necessary on most systems.

    pip install cryptography
    
  2. В каталоге, в котором вы сохранили sender.py и capturereader.py, выполните следующую команду:From the directory where you saved sender.py and capturereader.py, run this command:

    start python sender.py
    

    Команда запускает новый процесс Python для запуска отправителя.The command starts a new Python process to run the sender.

  3. После завершения записи выполните следующую команду:When the capture finishes running, run this command:

    python capturereader.py
    

    Обработчик записи загружает все непустые большие двоичные объекты из контейнера учетной записи хранения и записывает результаты в виде CSV -файлов в локальный каталог.The capture processor downloads all the non-empty blobs from the storage account container and writes the results as .csv files into the local directory.

Дальнейшие действияNext steps

Дополнительные сведения о концентраторах событий см. в следующих статьях:To learn more about Event Hubs, see: