Копирование данных из базы данных MongoDB или в нее с помощью Фабрики данных Azure или Synapse Analytics

Область применения:Фабрика данных Azure Azure Synapse Analytics

Совет

Попробуйте использовать фабрику данных в Microsoft Fabric, решение для аналитики с одним интерфейсом для предприятий. Microsoft Fabric охватывает все, от перемещения данных до обработки и анализа данных в режиме реального времени, бизнес-аналитики и отчетности. Узнайте, как бесплатно запустить новую пробную версию !

В этой статье описывается, как с помощью действия копирования в конвейерах Фабрики данных Azure и Synapse Analytics копировать данные из базы данных MongoDB или в нее. Это продолжение статьи об обзоре действия копирования, в которой представлены общие сведения о действии копирования.

Важно!

Новый соединитель MongoDB обеспечивает улучшенную собственную поддержку MongoDB. При использовании предыдущего соединителя MongoDB в решении, которое поддерживается как есть только для обеспечения обратной совместимости, см. статью Соединитель MongoDB (для прежних версий).

Поддерживаемые возможности

Соединитель MongoDB поддерживается для следующих возможностей:

Поддерживаемые возможности IR
Действие копирования (источник/приемник) ① ②

① Среда выполнения интеграции Azure ② Локальная среда выполнения интеграции

Список хранилищ данных, которые поддерживаются в качестве источников/приемников, см. в таблице Поддерживаемые хранилища данных.

В частности, этот соединитель MongoDB поддерживает версии до 4.2 включительно.

Необходимые компоненты

Если хранилище данных размещено в локальной сети, виртуальной сети Azure или виртуальном частном облаке Amazon, для подключения к нему нужно настроить локальную среду выполнения интеграции.

Если же хранилище данных представляет собой управляемую облачную службу данных, можно использовать Azure Integration Runtime. Если доступ предоставляется только по IP-адресам, утвержденным в правилах брандмауэра, вы можете добавить IP-адреса Azure Integration Runtime в список разрешений.

Вы также можете использовать функцию среды выполнения интеграции в управляемой виртуальной сети в Фабрике данных Azure для доступа к локальной сети без установки и настройки локальной среды выполнения интеграции.

Дополнительные сведения о вариантах и механизмах обеспечения сетевой безопасности, поддерживаемых Фабрикой данных, см. в статье Стратегии получения доступа к данным.

Начало работы

Чтобы выполнить действие копирования с конвейером, можно воспользоваться одним из приведенных ниже средств или пакетов SDK:

Создание связанной службы для MongoDB с помощью пользовательского интерфейса

Выполните приведенные ниже действия, чтобы создать связанную службу для MongoDB с помощью пользовательского интерфейса на портале Azure.

  1. Перейдите на вкладку "Управление" в рабочей области Фабрики данных Azure или Synapse и выберите "Связанные службы", после чего нажмите "Создать":

  2. Выполните поиск по ключевому слову “MongoDB” и выберите соединитель MongoDB.

    Select the MongoDB connector.

  3. Настройте сведения о службе, проверьте подключение и создайте связанную службу.

    Configure a linked service to MongoDB.

Сведения о конфигурации соединителя

Следующие разделы содержат сведения о свойствах JSON, которые используются для определения сущностей фабрики данных, относящихся к соединителю MongoDB.

Свойства связанной службы

Для связанной службы MongoDB поддерживаются следующие свойства:

Свойство Описание: Обязательное поле
type Для свойства type необходимо задать значение MongoDbV2 Да
connectionString Укажите строку подключения MongoDB, например mongodb://[username:password@]host[:port][/[database][?options]]. Дополнительные сведения см. в руководстве MongoDB по строке подключения.

Строку подключения можно хранить в Azure Key Vault. Дополнительные сведения см. в разделе Хранение учетных данных в Azure Key Vault.
Да
database Имя базы данных, к которой нужно получить доступ. Да
connectVia Среда выполнения интеграции, используемая для подключения к хранилищу данных. Дополнительные сведения см. в разделе Предварительные условия. Если не указано другое, по умолчанию используется интегрированная среда выполнения Azure. No

Пример:

{
    "name": "MongoDBLinkedService",
    "properties": {
        "type": "MongoDbV2",
        "typeProperties": {
            "connectionString": "mongodb://[username:password@]host[:port][/[database][?options]]",
            "database": "myDatabase"
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

Свойства набора данных

Полный список разделов и свойств, используемых для определения наборов данных, приведен в статье Наборы данных и связанные службы в фабрике данных Azure. Набор данных MongoDB поддерживает следующие свойства.

Свойство Описание: Обязательное поле
type Свойство type набора данных должно иметь значение MongoDbV2Collection Да
collectionName Имя коллекции в базе данных MongoDB Да

Пример:

{
    "name": "MongoDbDataset",
    "properties": {
        "type": "MongoDbV2Collection",
        "typeProperties": {
            "collectionName": "<Collection name>"
        },
        "schema": [],
        "linkedServiceName": {
            "referenceName": "<MongoDB linked service name>",
            "type": "LinkedServiceReference"
        }
    }
}

Свойства действия копирования

Полный список разделов и свойств, используемых для определения действий, см. в статье Конвейеры и действия в фабрике данных Azure. Этот раздел содержит список свойств, поддерживаемых приемником и источником MongoDB.

MongoDB в качестве источника

В разделе source действия копирования поддерживаются следующие свойства:

Свойство Описание: Обязательное поле
type Свойство type источника действия копирования должно иметь значение MongoDbV2Source Да
Фильтр Задает фильтр выбора с помощью операторов запросов. Чтобы получить все документы в коллекции, не указывайте этот параметр или передайте пустой документ ({}). No
cursorMethods.project Определяет, какие поля в документах для проекции необходимо получить. Чтобы получить все поля в соответствующих документах, не указывайте этот параметр. No
cursorMethods.sort Определяет, в каком порядке запрос будет возвращать соответствующие документы. См. cursor.sort(). No
cursorMethods.limit Определяет максимальное количество документов, возвращаемых сервером. См. cursor.limit(). No
cursorMethods.skip Определяет, сколько документов нужно пропустить, прежде чем MongoDB начнет выдавать результаты. См. cursor.skip(). No
batchSize Определяет, сколько документов должно быть выдано в каждом пакете ответа от экземпляра MongoDB. В большинстве случаев изменение размера пакета не влияет на пользователя или приложение. Azure Cosmos DB ограничивает каждый пакет не может превышать 40 МБ размера, что является суммой размера пакетаSize размера документов, поэтому уменьшите это значение, если размер документа велик. No
(значение по умолчанию — 100)

Совет

Эта служба поддерживает использование документа BSON в строгом режиме. Убедитесь в том, что запрос фильтра находится в строгом режиме, а не в режиме оболочки. Более подробное описание см. в руководстве MongoDB.

Пример:

"activities":[
    {
        "name": "CopyFromMongoDB",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<MongoDB input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "MongoDbV2Source",
                "filter": "{datetimeData: {$gte: ISODate(\"2018-12-11T00:00:00.000Z\"),$lt: ISODate(\"2018-12-12T00:00:00.000Z\")}, _id: ObjectId(\"5acd7c3d0000000000000000\") }",
                "cursorMethods": {
                    "project": "{ _id : 1, name : 1, age: 1, datetimeData: 1 }",
                    "sort": "{ age : 1 }",
                    "skip": 3,
                    "limit": 3
                }
            },
            "sink": {
                "type": "<sink type>"
            }
        }
    }
]

MongoDB в качестве приемника

В разделе sink действия копирования поддерживаются следующие свойства:

Свойство Описание: Обязательное поле
type Свойство type приемника действия копирования должно иметь значение MongoDbV2Sink. Да
writeBehavior Описание процесса записи данных в MongoDB. Допустимые значения: insert и upsert.

Поведение upsert — замена документа, если документ с таким значением _id уже существует. В противном случае выполняется вставка документа.

Примечание. Служба автоматически создает _id для документа, если _id не указан в исходном документе или при сопоставлении столбцов. Это означает, что для правильной работы upsert у документа должен быть идентификатор.
No
(По умолчанию используется insert.)
writeBatchSize Свойство writeBatchSize определяет размер документов для записи в каждом пакете. Вы можете увеличить значение writeBatchSize для повышения производительности или уменьшить значение, если документ большого размера. No
(Значение по умолчанию — 10 000.)
writeBatchTimeout Время ожидания до выполнения операции пакетной вставки, пока не истечет срок ее действия. Разрешенным значением является временной диапазон. No
(по умолчанию используется 00:30:00 — 30 минут)

Совет

Чтобы импортировать документы JSON без изменений, см. раздел Импорт или экспорт документов JSON; чтобы копировать из данных в табличной форме, см. раздел Сопоставление схем.

Пример

"activities":[
    {
        "name": "CopyToMongoDB",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<Document DB output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "<source type>"
            },
            "sink": {
                "type": "MongoDbV2Sink",
                "writeBehavior": "upsert"
            }
        }
    }
]

Импорт и экспорт документов JSON

С помощью этого соединителя MongoDB можно легко:

  • копировать документы между коллекциями MongoDB "как есть".
  • Импортировать документы JSON из различных источников в MongoDB, включая Azure Cosmos DB, хранилище BLOB-объектов Azure, Azure Data Lake Store или другие поддерживаемые файловые хранилища.
  • экспортировать документы JSON из коллекции MongoDB в различные файловые хранилища.

Для такого независимого от схемы копирования пропустите раздел "структура" (также называемый схема) в наборе данных и сопоставление схемы в действии копирования.

Сопоставление схемы

Сведения о том, как скопировать данные из MongoDB в табличный приемник или наоборот, см. в разделе Сопоставление схем.

Обновление связанной службы MongoDB

Ниже приведены действия, которые помогут вам обновить связанную службу и связанные запросы:

  1. Создайте связанную службу MongoDB и настройте ее, ссылаясь на свойства связанной службы.

  2. Если в конвейерах используются запросы SQL, ссылающиеся на старую связанную службу MongoDB, замените их эквивалентными запросами MongoDB. См. следующую таблицу для примеров замены:

    SQL-запрос Эквивалентный запрос MongoDB
    SELECT * FROM users db.users.find({})
    SELECT username, age FROM users db.users.find({}, {username: 1, age: 1})
    SELECT username AS User, age AS Age, statusNumber AS Status, CASE WHEN Status = 0 THEN "Pending" CASE WHEN Status = 1 THEN "Finished" ELSE "Unknown" END AS statusEnum LastUpdatedTime + interval '2' hour AS NewLastUpdatedTime FROM users db.users.aggregate([{ $project: { _id: 0, User: "$username", Age: "$age", Status: "$statusNumber", statusEnum: { $switch: { branches: [ { case: { $eq: ["$Status", 0] }, then: "Pending" }, { case: { $eq: ["$Status", 1] }, then: "Finished" } ], default: "Unknown" } }, NewLastUpdatedTime: { $add: ["$LastUpdatedTime", 2 * 60 * 60 * 1000] } } }])
    SELECT employees.name, departments.name AS department_name FROM employees LEFT JOIN departments ON employees.department_id = departments.id; db.employees.aggregate([ { $lookup: { from: "departments", localField: "department_id", foreignField: "_id", as: "department" } }, { $unwind: "$department" }, { $project: { _id: 0, name: 1, department_name: "$department.name" } } ])

Список хранилищ данных, которые поддерживаются в качестве источников и приемников для действия Copy, приведен в таблице Поддерживаемые хранилища данных и форматы.