Подключение к данным концентратора событий

Концентраторы событий Azure — это платформа потоковой передачи больших данных и служба приема событий. Azure обозреватель данных предлагает непрерывное получение из управляемых клиентом концентраторов событий.

Конвейер приема концентратора событий передает события в Azure обозреватель данных в несколько шагов. Сначала создайте концентратор событий в портал Azure. Затем вы создадите целевую таблицу в Azure обозреватель данных, в которой данные в определенном форматебудут приняты с использованием заданных свойств приема. Подключение к концентратору событий должно быть осведомлено о маршрутизации событий. Данные внедряются с выбранными свойствами в соответствии с сопоставлением свойств системы событий. Создайте подключение к концентратору событий, чтобы создать концентратор событий и Отправить события. Этим процессом можно управлять с помощью портал Azure, программно с помощью C# или Pythonили с помощью шаблона Azure Resource Manager.

Общие сведения о приеме данных в Azure обозреватель данных см. в статье Обзор приема данных в azure обозреватель данных.

Формат данных

  • Данные считываются из концентратора событий в форме объектов EVENTDATA .

  • См. раздел Поддерживаемые форматы.

    Примечание

    Концентратор событий не поддерживает формат RAW.

  • Данные можно сжимать с помощью GZip алгоритма сжатия. Укажите Compression в свойствах приема.

    • Сжатие данных не поддерживается для сжатых форматов (Avro, Parquet, ORC).
    • Пользовательские кодировки и встроенные Свойства системы не поддерживаются в сжатых данных.

Свойства приема

Свойства приема указывают на процесс приема, где следует маршрутизировать данные и как обработать их. Свойства приема событий можно указать с помощью свойства EVENTDATA. Properties. Задать можно следующие свойства.

Свойство Описание
Таблица Имя существующей целевой таблицы (с учетом регистра). Переопределяет Table набор на Data Connection панели.
Формат Формат данных. Переопределяет Data format набор на Data Connection панели.
инжестионмаппингреференце Имя существующего сопоставления приема , которое будет использоваться. Переопределяет Column mapping набор на Data Connection панели.
Сжатие Сжатие данных, None (по умолчанию) или GZip сжатие.
Кодирование Кодировка данных, значение по умолчанию — UTF8. Может быть любой из поддерживаемых кодировок .NET.
Теги Список тегов , связываемых с полученными данными в формате строки массива JSON. При использовании тегов возникают проблемы с производительностью .

Примечание

Принимаются только события, поставленные в очередь после создания подключения к данным.

Маршрутизация событий

При настройке подключения концентратора событий к кластеру Azure обозреватель данных необходимо указать свойства целевой таблицы (имя таблицы, формат данных, сжатие и сопоставление). Маршрутизация по умолчанию для данных также называется static routing . Можно также указать свойства целевой таблицы для каждого события с помощью свойств события. Соединение будет динамически маршрутизировать данные, как указано в свойствах EVENTDATA. Properties, переопределяя статические свойства для этого события.

В следующем примере задайте сведения о концентраторе событий и отправьте данные о метриках погоды в таблицу WeatherMetrics . Данные имеют json Формат. mapping1 предварительно определено для таблицы WeatherMetrics .

var eventHubNamespaceConnectionString=<connection_string>;
var eventHubName=<event_hub>;

// Create the data
var metric = new Metric { Timestamp = DateTime.UtcNow, MetricName = "Temperature", Value = 32 }; 
var data = JsonConvert.SerializeObject(metric);

// Create the event and add optional "dynamic routing" properties
var eventData = new EventData(Encoding.UTF8.GetBytes(data));
eventData.Properties.Add("Table", "WeatherMetrics");
eventData.Properties.Add("Format", "json");
eventData.Properties.Add("IngestionMappingReference", "mapping1");
eventData.Properties.Add("Tags", "['mydatatag']");

// Send events
var eventHubClient = EventHubClient.CreateFromConnectionString(eventHubNamespaceConnectionString, eventHubName);
eventHubClient.Send(eventData);
eventHubClient.Close();

Сопоставление свойств системы событий

Свойства системы хранят свойства, заданные службой концентраторов событий в момент постановки события в очередь. При подключении к концентратору событий Azure обозреватель данных выбранные свойства будут внедрены в целевую таблицу данных в таблице.

Примечание

  • Системные свойства поддерживаются для json и табличных форматов (csv, tsv и т. д.) и не поддерживаются для сжатых данных. При использовании неподдерживаемого формата данные по-прежнему будут приняты, но свойства будут игнорироваться.
  • Для табличных данных системные свойства поддерживаются только для сообщений о событиях с одной записью.
  • Для данных JSON системные свойства также поддерживаются для сообщений о событиях с несколькими записями. В таких случаях системные свойства добавляются только к первой записи сообщения о событии.
  • При сопоставлении в формате csv свойства добавляются в начало записи в порядке, определенном в таблице Системные свойства.
  • При сопоставлении в формате json свойства добавляются в соответствии с именами свойств, указанных в таблице Системные свойства.

Свойства системы

Концентратор событий предоставляет следующие свойства системы:

Свойство Тип данных Описание
x-opt-enqueued-time DATETIME Время в очереди для события в формате UTC
x-opt-sequence-number long Логический порядковый номер события в потоке секций концентратора событий
x-opt-offset строка Смещение события из потока секции концентратора событий. Идентификатор смещения уникален в пределах секции потока концентратора событий
Издатель x-opt- строка Имя издателя, если сообщение было отправлено в конечную точку издателя
x-opt-partition-key строка Ключ секции соответствующей секции, в которой хранится событие.

При работе с IOT Central концентраторами событий можно также внедрять системные свойства центра Интернета вещей в полезные данные. Полный список см. в разделе Свойства системы центра Интернета вещей.

Если в разделе источника данных таблицы были выбраны Свойства системы событий , необходимо включить свойства в схему таблицы и сопоставление.

Примеры сопоставления схем

Пример сопоставления схемы таблицы

Если данные содержат три столбца ( Timespan , Metric и Value ), а включаемые свойства — x-opt-enqueued-time и x-opt-offset , создайте или измените схему таблицы с помощью следующей команды:

    .create-merge table TestTable (TimeStamp: datetime, Metric: string, Value: int, EventHubEnqueuedTime:datetime, EventHubOffset:string)

Пример сопоставления CSV

Выполните следующие команды, чтобы добавить данные в начало записи. Обратите внимание на порядковые значения.

    .create table TestTable ingestion csv mapping "CsvMapping1"
    '['
    '   { "column" : "Timespan", "Properties":{"Ordinal":"2"}},'
    '   { "column" : "Metric", "Properties":{"Ordinal":"3"}},'
    '   { "column" : "Value", "Properties":{"Ordinal":"4"}},'
    '   { "column" : "EventHubEnqueuedTime", "Properties":{"Ordinal":"0"}},'
    '   { "column" : "EventHubOffset", "Properties":{"Ordinal":"1"}}'
    ']'

Пример сопоставления JSON

Данные добавляются с помощью сопоставления системных свойств. Выполните следующие команды:

    .create table TestTable ingestion json mapping "JsonMapping1"
    '['
    '    { "column" : "Timespan", "Properties":{"Path":"$.timestamp"}},'
    '    { "column" : "Metric", "Properties":{"Path":"$.metric"}},'
    '    { "column" : "Value", "Properties":{"Path":"$.value"}},'
    '    { "column" : "EventHubEnqueuedTime", "Properties":{"Path":"$.x-opt-enqueued-time"}},'
    '    { "column" : "EventHubOffset", "Properties":{"Path":"$.x-opt-offset"}}'
    ']'

Подключение к концентратору событий

Примечание

Для лучшей производительности создайте все ресурсы в том же регионе, что и кластер Azure обозреватель данных.

Создание концентратора событий

Создайте концентратор событий, если он еще не создан. Подключение к концентратору событий можно осуществлять с помощью портал Azure, программно с помощью C# или Pythonили с помощью шаблона Azure Resource Manager.

Примечание

  • Так как число секций неизменно, вам следует продумать масштаб заранее.
  • Группа потребителей должна быть уникальной для каждого потребителя. Создайте группу потребителей, выделенную для подключения к Azure обозреватель данных.

Отправка событий

См. пример приложения , которое создает данные и отправляет их в концентратор событий.

Пример создания демонстрационных данных см. в статье прием данных из концентратора событий в Azure обозреватель данных

Настройка решения географического аварийного восстановления

Концентратор событий предлагает решение географического аварийного восстановления . Обозреватель данных Azure не поддерживает Alias пространства имен концентратора событий. Чтобы реализовать географическое аварийное восстановление в решении, создайте два подключения к данным концентратора событий: одно для основного пространства имен и одно для дополнительного пространства имен. Обозреватель данных Azure будет прослушивать оба подключения концентратора событий.

Примечание

Пользователь обязан реализовать отработку отказа из основного пространства имен в дополнительное пространство имен.

Дальнейшие действия