Copiare dati da o in MongoDB usando Azure Data Factory o Synapse Analytics

SI APPLICA A: Azure Data Factory Azure Synapse Analytics

Suggerimento

Provare Data Factory in Microsoft Fabric, una soluzione di analisi completa per le aziende. Microsoft Fabric copre tutti gli elementi, dallo spostamento dei dati all'analisi scientifica dei dati, all'analisi in tempo reale, alla business intelligence e alla creazione di report. Scopri come avviare gratuitamente una nuova versione di valutazione .

Questo articolo illustra come usare l'attività di copia nelle pipeline di Synapse Analytics di Azure Data Factory per copiare dati da e in un database MongoDB. Si basa sull'articolo di panoramica dell'attività di copia che presenta una panoramica generale sull'attività di copia.

Importante

Il nuovo connettore MongoDB offre un supporto mongoDB nativo migliorato. Se si usa il connettore MongoDB legacy nella soluzione, supportato così come è solo per la compatibilità con le versioni precedenti, vedere l'articolo connettore MongoDB (legacy).

Funzionalità supportate

Questo connettore MongoDB è supportato per le funzionalità seguenti:

Funzionalità supportate IR
Attività di copia (origine/sink) ① ②

(1) Runtime di integrazione di Azure (2) Runtime di integrazione self-hosted

Per un elenco degli archivi dati supportati come origini/sink, vedere la tabella Archivi dati supportati.

In particolare, questo connettore MongoDB supporta versioni fino alla 4.2.

Prerequisiti

Se l'archivio dati si trova all'interno di una rete locale, una rete virtuale di Azure o un cloud privato virtuale di Amazon, è necessario configurare un runtime di integrazione self-hosted per connettersi.

Se l'archivio dati è un servizio dati cloud gestito, è possibile usare Azure Integration Runtime. Se l'accesso è limitato agli indirizzi IP approvati nelle regole del firewall, è possibile aggiungere indirizzi IP del runtime di integrazione di Azure all'elenco elementi consentiti.

È anche possibile usare la funzionalità di runtime di integrazione della rete virtuale gestita in Azure Data Factory per accedere alla rete locale senza installare e configurare un runtime di integrazione self-hosted.

Per altre informazioni sui meccanismi di sicurezza di rete e sulle opzioni supportate da Data Factory, vedere strategie di accesso ai dati.

Introduzione

Per eseguire l'attività di copia con una pipeline, è possibile usare uno degli strumenti o SDK seguenti:

Creare un servizio collegato a MongoDB usando l'interfaccia utente

Usare la procedura seguente per creare un servizio collegato a MongoDB nell'interfaccia utente di portale di Azure.

  1. Passare alla scheda Gestisci nell'area di lavoro di Azure Data Factory o Synapse e selezionare Servizi collegati, quindi fare clic su Nuovo:

  2. Cercare MongoDB e selezionare il connettore MongoDB.

    Select the MongoDB connector.

  3. Configurare i dettagli del servizio, testare la connessione e creare il nuovo servizio collegato.

    Configure a linked service to MongoDB.

Dettagli di configurazione di Connessione or

Le sezioni seguenti riportano informazioni dettagliate sulle proprietà che vengono usate per definire entità di data factory specifiche per il connettore MongoDB.

Proprietà del servizio collegato

Per il servizio collegato di MongoDB sono supportate le proprietà seguenti:

Proprietà Descrizione Richiesto
Tipo La proprietà type deve essere impostata su: MongoDbV2
connectionString Specificare la stringa di connessione di MongoDB, ad esempio mongodb://[username:password@]host[:port][/[database][?options]]. Per altri dettagli, vedere la sezione sulla stringa di connessione nel manuale di MongoDB.

È anche possibile inserire un stringa di connessione in Azure Key Vault. Per altri dettagli, vedere Archiviare le credenziali in Azure Key Vault .
database Nome del database a cui si vuole accedere.
connectVia Il runtime di integrazione da usare per la connessione all'archivio dati. Per altre informazioni, vedere la sezione Prerequisiti. Se non specificato, viene usato il runtime di integrazione di Azure predefinito. No

Esempio:

{
    "name": "MongoDBLinkedService",
    "properties": {
        "type": "MongoDbV2",
        "typeProperties": {
            "connectionString": "mongodb://[username:password@]host[:port][/[database][?options]]",
            "database": "myDatabase"
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

Proprietà del set di dati

Per un elenco completo delle sezioni e delle proprietà disponibili per la definizione dei set di dati, vedere Set di dati e servizi collegati. Per il set di dati MongoDB sono supportate le proprietà seguenti:

Proprietà Descrizione Richiesto
Tipo La proprietà type del set di dati deve essere impostata su: MongoDbV2Collection
collectionName Nome della raccolta nel database MongoDB.

Esempio:

{
    "name": "MongoDbDataset",
    "properties": {
        "type": "MongoDbV2Collection",
        "typeProperties": {
            "collectionName": "<Collection name>"
        },
        "schema": [],
        "linkedServiceName": {
            "referenceName": "<MongoDB linked service name>",
            "type": "LinkedServiceReference"
        }
    }
}

Proprietà dell'attività di copia

Per un elenco completo delle sezioni e delle proprietà disponibili per la definizione delle attività, vedere l'articolo sulle pipeline. Questa sezione fornisce un elenco delle proprietà supportate dall'origine e dal sink MongoDB.

MongoDB come origine

Nella sezione origine dell'attività di copia sono supportate le proprietà seguenti:

Proprietà Descrizione Richiesto
Tipo La proprietà type dell'origine dell'attività di copia deve essere impostata su: MongoDbV2Source
filter Specifica il filtro di selezione usando gli operatori di query. Per restituire tutti i documenti in una raccolta, omettere questo parametro o passare un documento vuoto ({}). No
cursorMethods.project Specifica i campi da restituire nei documenti per la proiezione. Per restituire tutti i campi nei documenti corrispondenti, omettere questo parametro. No
cursorMethods.sort Specifica l'ordine in cui la query restituisce i documenti corrispondenti. Fare riferimento a cursor.sort(). No
cursorMethods.limit Specifica il numero massimo di documenti restituiti dal server. Fare riferimento a cursor.limit(). No
cursorMethods.skip Specifica il numero di documenti da ignorare e la posizione da cui MongoDB inizia a restituire i risultati. Fare riferimento a cursor.skip(). No
batchSize Specifica il numero di documenti da restituire in ogni batch di risposta dall'istanza di MongoDB. Nella maggior parte dei casi, la modifica della dimensione del batch non influisce sull'utente o sull'applicazione. Azure Cosmos DB limita ogni batch non può superare le dimensioni di 40 MB, ovvero la somma del numero batchSize delle dimensioni dei documenti, quindi ridurre questo valore se le dimensioni del documento sono elevate. No
(il valore predefinito è 100)

Suggerimento

Il servizio supporta l'utilizzo del documento BSON in modalità Strict. Assicurarsi che la query di filtro sia in modalità strict anziché in modalità shell. Per altre informazioni consultare il manuale di MongoDB.

Esempio:

"activities":[
    {
        "name": "CopyFromMongoDB",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<MongoDB input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "MongoDbV2Source",
                "filter": "{datetimeData: {$gte: ISODate(\"2018-12-11T00:00:00.000Z\"),$lt: ISODate(\"2018-12-12T00:00:00.000Z\")}, _id: ObjectId(\"5acd7c3d0000000000000000\") }",
                "cursorMethods": {
                    "project": "{ _id : 1, name : 1, age: 1, datetimeData: 1 }",
                    "sort": "{ age : 1 }",
                    "skip": 3,
                    "limit": 3
                }
            },
            "sink": {
                "type": "<sink type>"
            }
        }
    }
]

MongoDB come sink

Nella sezione sink dell'attività di copia sono supportate le proprietà seguenti:

Proprietà Descrizione Richiesto
Tipo La proprietà type del sink dell'attività di copia deve essere impostata su MongoDbV2Sink.
writeBehavior Descrive come scrivere dati in MongoDB. Valori consentiti: insert e upsert.

Il comportamento di upsert consiste nel sostituire il documento se esiste già un documento con lo stesso _id . In caso contrario, inserire il documento.

Nota: il servizio genera automaticamente un oggetto _id per un documento se _id non è specificato nel documento originale o nel mapping di colonne. È quindi necessario assicurarsi che il documento contenga un ID in modo che upsert funzioni come previsto.
No
(il valore predefinito è insert)
writeBatchSize La proprietà writeBatchSize controlla le dimensioni dei documenti da scrivere in ogni batch. È possibile provare ad aumentare il valore di writeBatchSize per migliorare le prestazioni e a ridurre il valore se le dimensioni dei documenti sono troppo grandi. No
(il valore predefinito è 10.000)
writeBatchTimeout Tempo di attesa per il completamento dell'operazione di inserimento batch prima del timeout. Il valore consentito è timespan. No
(il valore predefinito è 00:30:00 - 30 minuti)

Suggerimento

Per importare documenti JSON come sono, fare riferimento alla sezione Importare o esportare documenti JSON. Per copiare da dati in forma tabulare, fare riferimento a Mapping dello schema.

Esempio

"activities":[
    {
        "name": "CopyToMongoDB",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<Document DB output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "<source type>"
            },
            "sink": {
                "type": "MongoDbV2Sink",
                "writeBehavior": "upsert"
            }
        }
    }
]

Importare ed esportare documenti JSON

È possibile usare questo connettore MongoDB per:

  • Copiare i documenti tra due raccolte MongoDB così come sono.
  • Importare documenti JSON da varie origini a MongoDB, tra cui Da Azure Cosmos DB, Archiviazione BLOB di Azure, Azure Data Lake Store e altri archivi basati su file supportati.
  • Esportare documenti JSON da una raccolta MongoDB in diversi archivi basati su file.

Per ottenere una copia priva di schema, ignorare la sezione "structure" (chiamata anche schema) nel set di dati e il mapping dello schema nell'attività di copia.

Mapping dello schema

Per copiare dati da MongoDB a sink tabulare o invertito, fare riferimento al mapping dello schema.

Aggiornare il servizio collegato MongoDB

Ecco i passaggi che consentono di aggiornare il servizio collegato e le query correlate:

  1. Creare un nuovo servizio collegato MongoDB e configurarlo facendo riferimento alle proprietà del servizio collegato.

  2. Se si usano query SQL nelle pipeline che fanno riferimento al servizio collegato MongoDB precedente, sostituirle con le query MongoDB equivalenti. Vedere la tabella seguente per gli esempi di sostituzione:

    SQL query Query MongoDB equivalente
    SELECT * FROM users db.users.find({})
    SELECT username, age FROM users db.users.find({}, {username: 1, age: 1})
    SELECT username AS User, age AS Age, statusNumber AS Status, CASE WHEN Status = 0 THEN "Pending" CASE WHEN Status = 1 THEN "Finished" ELSE "Unknown" END AS statusEnum LastUpdatedTime + interval '2' hour AS NewLastUpdatedTime FROM users db.users.aggregate([{ $project: { _id: 0, User: "$username", Age: "$age", Status: "$statusNumber", statusEnum: { $switch: { branches: [ { case: { $eq: ["$Status", 0] }, then: "Pending" }, { case: { $eq: ["$Status", 1] }, then: "Finished" } ], default: "Unknown" } }, NewLastUpdatedTime: { $add: ["$LastUpdatedTime", 2 * 60 * 60 * 1000] } } }])
    SELECT employees.name, departments.name AS department_name FROM employees LEFT JOIN departments ON employees.department_id = departments.id; db.employees.aggregate([ { $lookup: { from: "departments", localField: "department_id", foreignField: "_id", as: "department" } }, { $unwind: "$department" }, { $project: { _id: 0, name: 1, department_name: "$department.name" } } ])

Per un elenco degli archivi dati supportati come origini e sink dall'attività di copia, vedere archivi dati supportati.