Copia de datos de MongoDB con Azure Data Factory o Synapse Analytics

SE APLICA A: Azure Data Factory Azure Synapse Analytics

Sugerencia

Pruebe Data Factory en Microsoft Fabric, una solución de análisis todo en uno para empresas. Microsoft Fabric abarca todo, desde el movimiento de datos hasta la ciencia de datos, el análisis en tiempo real, la inteligencia empresarial y los informes. Obtenga información sobre cómo iniciar una nueva evaluación gratuita.

En este artículo se describe el uso de la actividad de copia en una canalización de Azure Data Factory o Synapse Analytics para copiar datos con una base de datos de MongoDB como origen y destino. El documento se basa en el artículo de introducción a la actividad de copia que describe información general de la actividad de copia.

Importante

El nuevo conector de MongoDB proporciona compatibilidad nativa mejorada con MongoDB. Si usa la versión heredada del conector de MongoDB en la solución, que se admite tal cual solo en el caso de la compatibilidad con versiones anteriores, consulte el artículo Conector de MongoDB (heredado).

Funcionalidades admitidas

Este conector de MongoDB es compatible con las capacidades siguientes:

Funcionalidades admitidas IR
Actividad de copia (origen/receptor) 7,7

① Azure Integration Runtime ② Entorno de ejecución de integración autohospedado

Para obtener una lista de los almacenes de datos que se admiten como orígenes y receptores, consulte la tabla de almacenes de datos admitidos.

En concreto, este conector de MongoDB admite hasta la versión 4.2.

Prerrequisitos

Si el almacén de datos se encuentra en una red local, una red virtual de Azure o una nube privada virtual de Amazon, debe configurar un entorno de ejecución de integración autohospedado para conectarse a él.

Si el almacén de datos es un servicio de datos en la nube administrado, puede usar Azure Integration Runtime. Si el acceso está restringido a las direcciones IP que están aprobadas en las reglas de firewall, puede agregar direcciones IP de Azure Integration Runtime a la lista de permitidos.

También puede usar la característica del entorno de ejecución de integración de red virtual administrada de Azure Data Factory para acceder a la red local sin instalar ni configurar un entorno de ejecución de integración autohospedado.

Consulte Estrategias de acceso a datos para más información sobre los mecanismos de seguridad de red y las opciones que admite Data Factory.

Introducción

Para realizar la actividad de copia con una canalización, puede usar una de los siguientes herramientas o SDK:

Creación de un servicio vinculado en MongoDB mediante la interfaz de usuario

Siga estos pasos para crear un servicio vinculado en MongoDB en la interfaz de usuario de Azure Portal.

  1. Vaya a la pestaña Administrar del área de trabajo de Azure Data Factory o Synapse y seleccione Servicios vinculados; luego haga clic en Nuevo:

  2. Busque MongoDB y seleccione el conector de MongoDB.

    Select the MongoDB connector.

  3. Configure los detalles del servicio, pruebe la conexión y cree el nuevo servicio vinculado.

    Configure a linked service to MongoDB.

Detalles de configuración del conector

Las secciones siguientes proporcionan detalles sobre las propiedades que se usan para definir entidades de Data Factory específicas del conector MongoDB.

Propiedades del servicio vinculado

Las siguientes propiedades son compatibles con el servicio vinculado de MongoDB:

Propiedad Descripción Obligatorio
type La propiedad type debe establecerse en: MongoDbV2
connectionString Especifique la cadena de conexión de MongoDB, por ejemplo mongodb://[username:password@]host[:port][/[database][?options]]. Consulte el manual de MongoDB sobre cadenas de conexión para más información.

También puede poner una cadena de conexión en Azure Key Vault. Consulte el artículo Almacenamiento de credenciales en Azure Key Vault para obtener información detallada.
database Nombre de la base de datos a la que desea acceder.
connectVia El entorno Integration Runtime que se usará para conectarse al almacén de datos. Obtenga más información en la sección Requisitos previos. Si no se especifica, se usará Azure Integration Runtime. No

Ejemplo:

{
    "name": "MongoDBLinkedService",
    "properties": {
        "type": "MongoDbV2",
        "typeProperties": {
            "connectionString": "mongodb://[username:password@]host[:port][/[database][?options]]",
            "database": "myDatabase"
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

Propiedades del conjunto de datos

Para ver una lista completa de las secciones y propiedades disponibles para definir conjuntos de datos, consulte Conjuntos de datos y servicios vinculados. Las siguientes propiedades son compatibles con el conjunto de datos de MongoDB:

Propiedad Descripción Obligatorio
type La propiedad type del conjunto de datos debe establecerse en: MongoDbV2Collection
collectionName Nombre de la colección en la base de datos de MongoDB.

Ejemplo:

{
    "name": "MongoDbDataset",
    "properties": {
        "type": "MongoDbV2Collection",
        "typeProperties": {
            "collectionName": "<Collection name>"
        },
        "schema": [],
        "linkedServiceName": {
            "referenceName": "<MongoDB linked service name>",
            "type": "LinkedServiceReference"
        }
    }
}

Propiedades de la actividad de copia

Si desea ver una lista completa de las secciones y propiedades disponibles para definir actividades, consulte el artículo sobre canalizaciones. En esta sección se proporciona una lista de las propiedades que admite el origen y receptor de MongoDB.

MongoDB como origen

Se admiten las siguientes propiedades en la sección source de la actividad de copia:

Propiedad Descripción Obligatorio
type La propiedad type del origen de la actividad de copia debe establecerse en: MongoDbV2Source
filter Especifica el filtro de selección mediante operadores de consulta. Para que se devuelvan todos los documentos de una colección, omita este parámetro o pase un documento vacío ({}). No
cursorMethods.project Especifica los campos a devolver en los documentos para la proyección. Para devolver todos los campos en los documentos coincidentes, omita este parámetro. No
cursorMethods.sort Especifica el orden en que la consulta devuelve los documentos coincidentes. Consulte cursor.sort(). No
cursorMethods.limit Especifica el número máximo de documentos que devuelve el servidor. Consulte cursor.limit(). No
cursorMethods.skip Especifica el número de documentos que se omitirán y desde donde empieza MongoDB a devolver resultados. Consulte cursor.skip(). No
batchSize Especifica el número de documentos a devolver en cada lote de la respuesta de la instancia de MongoDB. En la mayoría de los casos, modificar el tamaño del lote no afectará al usuario ni a la aplicación. Azure Cosmos DB limita el tamaño de cada lote a 40 MB como máximo, que es la suma de los números de batchSize del tamaño de los documentos, por lo que debe reducir este valor si el tamaño del documento es mayor. No
(el valor predeterminado es 100)

Sugerencia

El servicio admite el consumo de documentos BSON en modo strict. Asegúrese de que la consulta de filtro está en modo strict en lugar de en modo Shell. Para obtener una descripción más detallada, vea el manual de MongoDB.

Ejemplo:

"activities":[
    {
        "name": "CopyFromMongoDB",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<MongoDB input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "MongoDbV2Source",
                "filter": "{datetimeData: {$gte: ISODate(\"2018-12-11T00:00:00.000Z\"),$lt: ISODate(\"2018-12-12T00:00:00.000Z\")}, _id: ObjectId(\"5acd7c3d0000000000000000\") }",
                "cursorMethods": {
                    "project": "{ _id : 1, name : 1, age: 1, datetimeData: 1 }",
                    "sort": "{ age : 1 }",
                    "skip": 3,
                    "limit": 3
                }
            },
            "sink": {
                "type": "<sink type>"
            }
        }
    }
]

MongoDB como receptor

La sección sink de la actividad de copia admite las siguientes propiedades:

Propiedad Descripción Obligatorio
type La propiedad type del receptor de la actividad de copia se debe establecer en MongoDbV2Sink.
writeBehavior Describe cómo escribir datos en MongoDB. Valores permitidos: insert y upsert.

El comportamiento de upsert consiste en reemplazar el documento si ya existe un documento con el mismo _id; en caso contrario, inserta el documento.

Nota: El servicio genera automáticamente un _id para un documento si no se especifica un _id en el documento original o mediante la asignación de columnas. Esto significa que debe asegurarse de que, para que upsert funcione según lo esperado, el documento tenga un identificador.
No
(el valor predeterminado es insert)
writeBatchSize La propiedad writeBatchSize controla el tamaño de los documentos que se escribirán en cada lote. Puede intentar aumentar el valor de writeBatchSize para mejorar el rendimiento y reducir el valor si el documento tiene un tamaño grande. No
(el valor predeterminado es 10 000)
writeBatchTimeout Tiempo que se concede a la operación de inserción por lotes para que finalice antes de que se agote el tiempo de espera. El valor permitido es TimeSpan. No
(El valor predeterminado es 00:30:00 [30 minutos]).

Sugerencia

Para importar documentos JSON tal cual, vea la sección Importar o exportar documentos JSON; para copiar de datos con formato tabular, vea Asignación de esquemas.

Ejemplo

"activities":[
    {
        "name": "CopyToMongoDB",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<Document DB output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "<source type>"
            },
            "sink": {
                "type": "MongoDbV2Sink",
                "writeBehavior": "upsert"
            }
        }
    }
]

Importación y exportación de documentos JSON

Puede usar este conector de MongoDB para hacer fácilmente lo siguiente:

  • Copiar documentos entre dos colecciones de MongoDB tal cual.
  • Importar documentos JSON desde varios orígenes a MongoDB, incluidos Azure Cosmos DB, Azure Blob Storage, Azure Data Lake Store y otros almacenes basados en archivos compatibles.
  • Exportar documentos JSON de una colección de MongoDB a varios almacenes basados en archivos.

Para lograr esa copia independiente del esquema, omita la sección “structure” (estructura, también denominada schema) en el conjunto de datos y la asignación de esquemas en la actividad de copia.

Asignación de esquemas

Para copiar datos desde MongoDB en un receptor tabular o invertido, consulte la asignación de esquemas.

Actualización del servicio vinculado de MongoDB

Estos son los pasos que le ayudarán a actualizar el servicio vinculado y las consultas relacionadas:

  1. Cree un nuevo servicio vinculado de MongoDB y configúrelo haciendo referencia a las propiedades del servicio vinculado.

  2. Si usa consultas SQL en las canalizaciones que hacen referencia al servicio vinculado de MongoDB anterior, reemplácelas por las consultas de MongoDB equivalentes. Consulte la tabla siguiente para ver los ejemplos de reemplazo:

    Consulta SQL Consulta equivalente de MongoDB
    SELECT * FROM users db.users.find({})
    SELECT username, age FROM users db.users.find({}, {username: 1, age: 1})
    SELECT username AS User, age AS Age, statusNumber AS Status, CASE WHEN Status = 0 THEN "Pending" CASE WHEN Status = 1 THEN "Finished" ELSE "Unknown" END AS statusEnum LastUpdatedTime + interval '2' hour AS NewLastUpdatedTime FROM users db.users.aggregate([{ $project: { _id: 0, User: "$username", Age: "$age", Status: "$statusNumber", statusEnum: { $switch: { branches: [ { case: { $eq: ["$Status", 0] }, then: "Pending" }, { case: { $eq: ["$Status", 1] }, then: "Finished" } ], default: "Unknown" } }, NewLastUpdatedTime: { $add: ["$LastUpdatedTime", 2 * 60 * 60 * 1000] } } }])
    SELECT employees.name, departments.name AS department_name FROM employees LEFT JOIN departments ON employees.department_id = departments.id; db.employees.aggregate([ { $lookup: { from: "departments", localField: "department_id", foreignField: "_id", as: "department" } }, { $unwind: "$department" }, { $project: { _id: 0, name: 1, department_name: "$department.name" } } ])

Para obtener una lista de almacenes de datos que la actividad de copia admite como orígenes y receptores, vea Almacenes de datos que se admiten.