Прием данных из Azure Cosmos DB в azure Data Explorer

Azure Data Explorer поддерживает прием данных из Azure Cosmos DB для NoSql с помощью канала изменений. Подключение к данным канала изменений Cosmos DB — это конвейер приема, который прослушивает канал изменений Cosmos DB и передает данные в таблицу Data Explorer. Канал изменений прослушивает новые и обновленные документы, но не регистрирует удаления в журнале. Общие сведения о приеме данных в обозревателе данных Azure см. в разделе Обзор приема данных Azure Data Explorer.

Каждое подключение к данным прослушивает определенный контейнер Cosmos DB и прием данных в указанную таблицу (в одной таблице может приниматься несколько подключений). Метод приема поддерживает прием потоковой передачи (если он включен) и прием в очереди.

Из этой статьи вы узнаете, как настроить подключение к данным канала изменений Cosmos DB для приема данных в Azure Data Explorer с помощью управляемого системой удостоверения. Прежде чем приступить к работе, ознакомьтесь с рекомендациями .

Чтобы настроить соединитель, выполните следующие действия.

Шаг 1. Выберите таблицу azure Data Explorer и настройте ее сопоставление

Шаг 2. Создание подключения к данным Cosmos DB

Шаг 3 . Проверка подключения к данным

Предварительные требования

Шаг 1. Выберите таблицу Data Explorer Azure и настройте ее сопоставление

Перед созданием подключения к данным создайте таблицу, в которой будут храниться полученные данные, и примените сопоставление, соответствующее схеме, в исходном контейнере Cosmos DB. Если для вашего сценария требуется не простое сопоставление полей, можно использовать политики обновления для преобразования и сопоставления данных , полученных из канала изменений.

Ниже показан пример схемы элемента в контейнере Cosmos DB.

{
    "id": "17313a67-362b-494f-b948-e2a8e95e237e",
    "name": "Cousteau",
    "_rid": "pL0MAJ0Plo0CAAAAAAAAAA==",
    "_self": "dbs/pL0MAA==/colls/pL0MAJ0Plo0=/docs/pL0MAJ0Plo0CAAAAAAAAAA==/",
    "_etag": "\"000037fc-0000-0700-0000-626a44110000\"",
    "_attachments": "attachments/",
    "_ts": 1651131409
}

Чтобы создать таблицу и применить сопоставление таблиц, выполните следующие действия.

  1. В пользовательском веб-интерфейсе Azure Data Explorer в меню навигации слева выберите Запрос, а затем выберите базу данных, в которой нужно создать таблицу.

  2. Выполните следующую команду, чтобы создать таблицу с именем TestTable.

    .create table TestTable(Id:string, Name:string, _ts:long, _timestamp:datetime)
    
  3. Выполните следующую команду, чтобы создать сопоставление таблицы.

    Команда сопоставляет пользовательские свойства из документа JSON Cosmos DB со столбцами в таблице TestTable следующим образом:

    Свойство Cosmos DB Столбец таблицы Преобразование
    идентификатор Идентификатор Нет
    name Название Нет
    _Ts _ts Нет
    _Ts _Timestamp Используется DateTimeFromUnixSeconds для преобразования_ts (unix seconds) в _timestamp (datetime))

    Примечание

    Рекомендуется использовать следующие столбцы меток времени:

    • _ts. Используйте этот столбец для согласования данных с Cosmos DB.
    • _timestamp. Используйте этот столбец для выполнения эффективных фильтров времени в запросах Kusto. Дополнительные сведения см. в разделе Рекомендации по запросам.
    .create table TestTable ingestion json mapping "DocumentMapping"
    ```
    [
        {"column":"Id","path":"$.id"},
        {"column":"Name","path":"$.name"},
        {"column":"_ts","path":"$._ts"},
        {"column":"_timestamp","path":"$._ts", "transform":"DateTimeFromUnixSeconds"}
    ]
    ```
    

Преобразование и сопоставление данных с помощью политик обновления

Если для вашего сценария требуется не простое сопоставление полей, можно использовать политики обновления для преобразования и сопоставления данных, полученных из канала изменений.

Политики обновления — это способ преобразования данных по мере их приема в таблицу. Они написаны на язык запросов Kusto и выполняются в конвейере приема. Их можно использовать для преобразования данных из приема канала изменений Cosmos DB, например в следующих сценариях:

  • Документы содержат массивы, которые было бы проще запрашивать, если они преобразуются в несколько строк с помощью mv-expand оператора .
  • Вы хотите отфильтровать документы. Например, можно отфильтровать документы по типу where с помощью оператора .
  • У вас есть сложная логика, которую нельзя представить в сопоставлении таблиц.

Сведения о создании политик обновлений и управлении ими см. в статье Общие сведения о политике обновления.

Шаг 2. Создание подключения к данным Cosmos DB

Для создания соединителя данных можно использовать следующие методы:

  1. В портал Azure перейдите на страницу обзора кластера и перейдите на вкладку Начало работы.

  2. На плитке Прием данных выберите Создать подключение к> даннымCosmos DB.

    Снимок экрана: вкладка

  3. В области Создание подключения к данным Cosmos DB заполните форму сведениями из таблицы:

    Снимок экрана: область подключения к данным с полями формы со значениями.

    Поле Описание
    Имя базы данных Выберите базу данных azure Data Explorer, в которую вы хотите принимать данные.
    Имя подключения к данным Укажите имя подключения к данным.
    Подписка Выберите подписку, содержащую учетную запись NoSQL Cosmos DB.
    Учетная запись Cosmos DB Выберите учетную запись Cosmos DB, из которой вы хотите принимать данные.
    База данных SQL Выберите базу данных Cosmos DB, из которой вы хотите принимать данные.
    Контейнер SQL Выберите контейнер Cosmos DB, из которого требуется принимать данные.
    Имя таблицы Укажите имя таблицы Data Explorer Azure, для которой требуется принимать данные.
    Имя сопоставления При необходимости укажите имя сопоставления , используемое для подключения к данным.
  4. При необходимости в разделе Дополнительные параметры выполните следующие действия.

    1. Укажите дату начала получения события. Это время, с которого соединитель начнет принимать данные. Если не указать время, соединитель начнет принимать данные с момента создания подключения к данным. Рекомендуемый формат даты — это стандарт ISO 8601 UTC, указанный следующим образом: yyyy-MM-ddTHH:mm:ss.fffffffZ.

    2. Выберите Назначаемое пользователем , а затем выберите удостоверение. По умолчанию управляемое удостоверение, назначаемое системой , используется подключением. При необходимости можно использовать удостоверение, назначаемое пользователем .

      Снимок экрана: область подключения к данным с параметрами

  5. Выберите Создать , чтобы установить подключение к данным.

Шаг 3. Проверка подключения к данным

  1. В контейнер Cosmos DB вставьте следующий документ:

    {
        "name":"Cousteau"
    }
    
  2. В пользовательском веб-интерфейсе azure Data Explorer выполните следующий запрос:

    TestTable
    

    Набор результатов должен выглядеть следующим образом:

    Снимок экрана: область результатов, показывающая полученный документ.

Примечание

Azure Data Explorer имеет политику агрегирования (пакетной обработки) для приема данных в очереди, предназначенную для оптимизации процесса приема. Политика пакетной обработки по умолчанию настраивается для запечатывания пакета, когда выполняется одно из следующих условий пакета: максимальное время задержки 5 минут, общий размер одного ГБ или 1000 BLOB-объектов. В связи с этим может возникнуть задержка. Дополнительные сведения см. на странице о политике пакетной обработки. Чтобы уменьшить задержку, настройте таблицу для поддержки потоковой передачи. См. статью Политика потоковой передачи.

Рекомендации

К каналу изменений Cosmos DB относятся следующие рекомендации.

  • Канал изменений не предоставляет события удаления .

    Канал изменений Cosmos DB содержит только новые и обновленные документы. Если вам нужно знать об удаленных документах, можно настроить канал с помощью мягкого маркера, чтобы пометить документ Cosmos DB как удаленный. Свойство добавляется для событий обновления, которые указывают, удален ли документ. Затем можно использовать where оператор в запросах, чтобы отфильтровать их.

    Например, если вы сопоставите удаленное свойство со столбцом таблицы IsDeleted, вы можете отфильтровать удаленные документы с помощью следующего запроса:

    TestTable
    | where not(IsDeleted)
    
  • Канал изменений предоставляет только последнее обновление документа.

    Чтобы понять последствия второго вопроса, изучите следующий сценарий:

    Контейнер Cosmos DB содержит документы A и B. Изменения в свойстве foo показаны в следующей таблице:

    Идентификатор документа Свойство foo Событие Метка времени документа (_ts)
    А Красный Создание 10
    B Синий Создание 20
    A Оранжевый Update 30
    А Розовый Обновление 40
    B Фиолетовый Update 50
    A Кармине Update 50
    B NeonBlue Update 70

    API канала изменений опрашивает соединитель данных через регулярные интервалы, обычно каждые несколько секунд. Каждый опрос содержит изменения, произошедшие в контейнере между вызовами, но только последнюю версию изменений для каждого документа.

    Чтобы проиллюстрировать проблему, рассмотрим последовательность вызовов API с метками времени 15, 35, 55 и 75 , как показано в следующей таблице:

    Метка времени вызова API Идентификатор документа Свойство foo Метка времени документа (_ts)
    15 А Красный 10
    35 B Синий 20
    35 A Оранжевый 30
    55 B Фиолетовый 50
    55 A Кармине 60
    75 B NeonBlue 70

    Сравнивая результаты API со списком изменений, внесенных в документ Cosmos DB, вы заметите, что они не совпадают. Событие обновления для документа A, выделенное в таблице изменений по метке времени 40, не отображается в результатах вызова API.

    Чтобы понять, почему событие не отображается, мы рассмотрим изменения в документе A между вызовами API по меткам времени 35 и 55. Между этими двумя вызовами документ A дважды изменялся следующим образом:

    Идентификатор документа Свойство foo Событие Метка времени документа (_ts)
    A Розовый Update 40
    A Кармине Update 50

    При вызове API с меткой времени 55 API канала изменений возвращает последнюю версию документа. В этом случае последней версией документа A является обновление на метке времени 50, которая представляет собой обновление свойства foo с Pink на Carmine.

    Из-за этого сценария соединитель данных может пропустить некоторые промежуточные изменения документа. Например, некоторые события могут быть пропущены, если служба подключения к данным не работает в течение нескольких минут или если частота изменений документов выше, чем частота опроса API. Однако фиксируется последнее состояние каждого документа.

  • Удаление и повторное создание контейнера Cosmos DB не поддерживается

    Azure Data Explorer отслеживает канал изменений, создавая контрольную точку своей "позиции" в канале. Это делается с помощью маркера продолжения на каждом физическом разделе контейнера. При удалении или повторном создании контейнера эти маркеры продолжения становятся недействительными и не сбрасываются; необходимо удалить и заново создать подключение к данным.

Оценка затрат

Насколько использование подключения к данным Cosmos DB влияет на использование единиц запросов (ЕЗ) контейнера Cosmos DB?

Соединитель вызывает API канала изменений Cosmos DB для каждой физической секции контейнера до одного раза в секунду. С этими вызовами связаны следующие затраты:

Стоимость Описание
Фиксированные затраты Фиксированные затраты — около 2 ЕЗ на физическую секцию каждую секунду.
Переменные затраты Переменные затраты составляет около 2 % от ЕЗ, используемых для записи документов, хотя они могут отличаться в зависимости от вашего сценария. Например, при записи 100 документов в контейнер Cosmos DB стоимость их написания составляет 1000 ЕЗ. Соответствующие затраты на использование соединителя для чтения этих документов составляют около 2 % от стоимости их записи, приблизительно 20 ЕЗ.