Velocizzare l'analisi di Big Data in tempo reale con il connettore Spark per Azure Cosmos DBAccelerate real-time big-data analytics with the Spark to Azure Cosmos DB connector

Il connettore Spark per Azure Cosmos DB consente ad Azure Cosmos DB di fungere da origine di input o sink di output per i processi Apache Spark.The Spark to Azure Cosmos DB connector enables Azure Cosmos DB to act as an input source or output sink for Apache Spark jobs. Connettendo Spark ad Azure Cosmos DB, è possibile risolvere più velocemente problemi di data science in rapida evoluzione usando Azure Cosmos DB per salvare in modo permanente i dati ed eseguire query su di essi in modo rapido.Connecting Spark to Azure Cosmos DB accelerates your ability to solve fast-moving data science problems where you can use Azure Cosmos DB to quickly persist and query data. Il connettore Spark per Azure Cosmos DB usa in modo efficiente gli indici gestiti nativi di Azure Cosmos DB.The Spark to Azure Cosmos DB connector efficiently utilizes the native Azure Cosmos DB managed indexes. Gli indici consentono colonne aggiornabili in fase di analisi e propagazione del filtraggio in base al predicato per i dati distribuiti a livello globale in rapida evoluzione, che spaziano da scenari IoT (Internet delle cose) a scenari di data science e analisi.The indexes enable updateable columns when you perform analytics and push-down predicate filtering against fast-changing globally distributed data, which range from Internet of Things (IoT) to data science and analytics scenarios.

Per usare Spark GraphX e le API Graph Gremlin di Azure Cosmos DB, vedere Azure Cosmos DB: eseguire analisi dei grafi con Spark e Apache TinkerPop Gremlin.For working with Spark GraphX and the Gremlin graph APIs of Azure Cosmos DB, see Perform graph analytics using Spark and Apache TinkerPop Gremlin.

ScaricareDownload

Per iniziare, scaricare il connettore Spark per Azure Cosmos DB dal repository azure-cosmosdb-spark in GitHub.To get started, download the Spark to Azure Cosmos DB connector from the azure-cosmosdb-spark repository on GitHub.

Componenti del connettoreConnector components

Sono i seguenti:The connector utilizes the following components:

Versioni supportate ufficialmente:Officially supported versions:

ComponenteComponent VersioneVersion
Apache SparkApache Spark 2.0+2.0+
ScalaScala 2.112.11
Azure DocumentDB Java SDKAzure DocumentDB Java SDK 1.10.01.10.0

Questo articolo consente di eseguire alcuni semplici esempi con Python (tramite pyDocumentDB) e le interfacce di Scala.This article helps you run some simple samples by using Python (via pyDocumentDB) and the Scala interfaces.

Per la connessione di Apache Spark e Azure Cosmos DB sono disponibili due approcci:There are two approaches to connect Apache Spark and Azure Cosmos DB:

Implementazione di pyDocumentDBpyDocumentDB implementation

La versione corrente di pyDocumentDB SDK consente di connettere Spark ad Azure Cosmos DB, come illustrato nel diagramma seguente:The current pyDocumentDB SDK enables you to connect Spark to Azure Cosmos DB as shown in the following diagram:

Flusso di dati da Spark ad Azure Cosmos DB tramite pyDocumentDB

Flusso di dati dell'implementazione di pyDocumentDBData flow of the pyDocumentDB implementation

Il flusso di dati è il seguente:The data flow is as follows:

  1. Il nodo master Spark si connette al nodo del gateway Azure Cosmos DB tramite pyDocumentDB.The Spark master node connects to the Azure Cosmos DB gateway node via pyDocumentDB. Un utente specifica solo le connessioni di Spark e Azure Cosmos DB.A user specifies only the Spark and Azure Cosmos DB connections. Le connessioni ai rispettivi nodi master e del gateway sono trasparenti per l'utente.Connections to the respective master and gateway nodes are transparent to the user.
  2. Il nodo del gateway esegue la query su Azure Cosmos DB, dove la query viene successivamente eseguita sulle partizioni della raccolta nei nodi dati.The gateway node makes the query against Azure Cosmos DB where the query subsequently runs against the collection's partitions in the data nodes. La risposta a queste query viene inviata di nuovo al nodo del gateway e il set di risultati viene restituito al nodo master Spark.The response for those queries is sent back to the gateway node, and that result set is returned to the Spark master node.
  3. Le query successive, ad esempio su un frame di dati Spark, vengono inviate ai nodi di lavoro Spark per l'elaborazione.Subsequent queries (for example, against a Spark DataFrame) are sent to the Spark worker nodes for processing.

La comunicazione tra Spark e Azure Cosmos DB è limitata al nodo master Spark e ai nodi del gateway Azure Cosmos DB.Communication between Spark and Azure Cosmos DB is limited to the Spark master node and Azure Cosmos DB gateway nodes. Le query vengono eseguite alla velocità consentita dal livello di trasporto tra i due nodi.The queries go as fast as the transport layer between these two nodes allows.

Installare pyDocumentDBInstall pyDocumentDB

È possibile installare pyDocumentDB sul nodo del driver usando pip, ad esempio:You can install pyDocumentDB on your driver node by using pip, for example:

pip install pyDocumentDB

Connettere Spark ad Azure Cosmos DB tramite pyDocumentDBConnect Spark to Azure Cosmos DB via pyDocumentDB

La semplicità del trasporto di comunicazione rende relativamente semplice l'esecuzione di una query da Spark ad Azure Cosmos DB con pyDocumentDB.The simplicity of the communication transport makes execution of a query from Spark to Azure Cosmos DB by using pyDocumentDB relatively simple.

Il frammento di codice seguente mostra come usare pyDocumentDB in un contesto Spark.The following code snippet shows how to use pyDocumentDB in a Spark context.

# Import Necessary Libraries
import pydocumentdb
from pydocumentdb import document_client
from pydocumentdb import documents
import datetime

# Configuring the connection policy (allowing for endpoint discovery)
connectionPolicy = documents.ConnectionPolicy()
connectionPolicy.EnableEndpointDiscovery
connectionPolicy.PreferredLocations = ["Central US", "East US 2", "Southeast Asia", "Western Europe","Canada Central"]


# Set keys to connect to Azure Cosmos DB
masterKey = 'le1n99i1w5l7uvokJs3RT5ZAH8dc3ql7lx2CG0h0kK4lVWPkQnwpRLyAN0nwS1z4Cyd1lJgvGUfMWR3v8vkXKA=='
host = 'https://doctorwho.documents.azure.com:443/'
client = document_client.DocumentClient(host, {'masterKey': masterKey}, connectionPolicy)

Come indicato nel frammento di codice:As noted in the code snippet:

  • Azure Cosmos DB Python SDK (pyDocumentDB) contiene tutti i parametri di connessione necessari.The Azure Cosmos DB Python SDK (pyDocumentDB) contains the all the necessary connection parameters. Il parametro relativo alle posizioni preferite determina ad esempio la replica di lettura e l'ordine di priorità.For example, the preferred locations parameter chooses the read replica and priority order.
  • Importare le librerie necessarie e configurare masterKey e host per creare il client Azure Cosmos DB (pydocumentdb.document_client).Import the necessary libraries and configure your masterKey and host to create the Azure Cosmos DB client (pydocumentdb.document_client).

Eseguire query Spark tramite pyDocumentDBExecute Spark Queries via pyDocumentDB

Gli esempi seguenti usano l'istanza di Azure Cosmos DB creata nel frammento precedente con le chiavi di sola lettura specificate.The following examples use the Azure Cosmos DB instance that was created in the previous snippet by using the specified read-only keys. Il frammento di codice seguente si connette alla raccolta airports.codes nell'account DoctorWho specificato in precedenza ed esegue una query per estrarre le città con aeroporto nello stato di Washington.The following code snippet connects to the airports.codes collection in the DoctorWho account as specified earlier and runs a query to extract the airport cities in Washington state.

# Configure Database and Collections
databaseId = 'airports'
collectionId = 'codes'

# Configurations the Azure Cosmos DB client will use to connect to the database and collection
dbLink = 'dbs/' + databaseId
collLink = dbLink + '/colls/' + collectionId

# Set query parameter
querystr = "SELECT c.City FROM c WHERE c.State='WA'"

# Query documents
query = client.QueryDocuments(collLink, querystr, options=None, partition_key=None)

# Query for partitioned collections
# query = client.QueryDocuments(collLink, query, options= { 'enableCrossPartitionQuery': True }, partition_key=None)

# Push into list `elements`
elements = list(query)

Dopo l'esecuzione della query tramite query, il risultato è un oggetto query_iterable.QueryIterable che viene convertito in un elenco Python.After the query has been executed via query, the result is a query_iterable.QueryIterable that is converted to a Python list. Un elenco Python può essere convertito facilmente in un frame di dati Spark usando il codice seguente:A Python list can be easily converted to a Spark DataFrame by using the following code:

# Create `df` Spark DataFrame from `elements` Python list
df = spark.createDataFrame(elements)

Perché usare pyDocumentDB per connettere Spark ad Azure Cosmos DB?Why use the pyDocumentDB to connect Spark to Azure Cosmos DB?

La connessione di Spark ad Azure Cosmos DB mediante pyDocumentDB viene usata in genere negli scenari in cui:Connecting Spark to Azure Cosmos DB by using pyDocumentDB is typically for scenarios where:

  • Si vuole usare Python.You want to use Python.
  • Si restituisce un set di risultati relativamente piccolo da Azure Cosmos DB a Spark.You are returning a relatively small result set from Azure Cosmos DB to Spark. Si noti che il set di dati sottostante in Azure Cosmos DB può essere piuttosto grande.Note that the underlying dataset in Azure Cosmos DB can be quite large. Si stanno applicando filtri, ovvero eseguendo filtri in base a predicati, sull'origine Azure Cosmos DB.You are applying filters, that is, running predicate filters, against your Azure Cosmos DB source.

Connettore Spark per Azure Cosmos DBSpark to Azure Cosmos DB connector

Il connettore Spark per Azure Cosmos DB usa Azure DocumentDB Java SDK e sposta i dati tra i nodi di lavoro Spark e Azure Cosmos DB, come illustrato nel diagramma seguente:The Spark to Azure Cosmos DB connector utilizes the Azure DocumentDB Java SDK and moves data between the Spark worker nodes and Azure Cosmos DB as shown in the following diagram:

Flusso di dati nel connettore Spark per Azure Cosmos DB

Il flusso di dati è il seguente:The data flow is as follows:

  1. Il nodo master Spark si connette al nodo del gateway Azure Cosmos DB per ottenere la mappa delle partizioni.The Spark master node connects to the Azure Cosmos DB gateway node to obtain the partition map. Un utente specifica solo le connessioni di Spark e Azure Cosmos DB.A user specifies only the Spark and Azure Cosmos DB connections. Le connessioni ai rispettivi nodi master e del gateway sono trasparenti per l'utente.Connections to the respective master and gateway nodes are transparent to the user.
  2. Queste informazioni vengono restituite al nodo master Spark.This information is provided back to the Spark master node. A questo punto sarà possibile analizzare la query per determinare le partizioni e le relative posizioni in Azure Cosmos DB a cui è necessario accedere.At this point, you should be able to parse the query to determine the partitions and their locations in Azure Cosmos DB that you need to access.
  3. Queste informazioni vengono trasmesse ai nodi di lavoro Spark.This information is transmitted to the Spark worker nodes.
  4. I nodi di lavoro Spark si connettono direttamente alle partizioni Azure Cosmos DB per estrarre i dati e restituiscono i dati alle partizioni Spark nei nodi di lavoro Spark.The Spark worker nodes connect to the Azure Cosmos DB partitions directly to extract the data and return the data to the Spark partitions in the Spark worker nodes.

La comunicazione tra Spark e Azure Cosmos DB è notevolmente più veloce perché lo spostamento dei dati avviene tra i nodi di lavoro Spark e i nodi dati (partizioni) Azure Cosmos DB.Communication between Spark and Azure Cosmos DB is significantly faster because the data movement is between the Spark worker nodes and the Azure Cosmos DB data nodes (partitions).

Creare il connettore Spark per Azure Cosmos DBBuild the Spark to Azure Cosmos DB connector

Attualmente il progetto del connettore usa Maven.Currently, the connector project uses maven. Per creare il connettore senza le dipendenze, è possibile eseguire:To build the connector without dependencies, you can run:

mvn clean package

È anche possibile scaricare le versioni più recenti del file JAR dalla cartella releases.You can also download the latest versions of the JAR from the releases folder.

Includere il file JAR di Spark per Azure Cosmos DBInclude the Azure Cosmos DB Spark JAR

Prima dell'esecuzione del codice, è necessario includere il file JAR di Spark per Azure Cosmos.Before you execute any code, you need to include the Azure Cosmos DB Spark JAR. Se si usa spark-shell, è possibile includere il file JAR usando l'opzione --jars.If you are using the spark-shell, then you can include the JAR by using the --jars option.

spark-shell --master $master --jars /$location/azure-cosmosdb-spark-0.0.3-jar-with-dependencies.jar

Se si vuole eseguire il file JAR senza dipendenze, usare il codice seguente:If you want to execute the JAR without dependencies, use the following code:

spark-shell --master $master --jars /$location/azure-cosmosdb-spark-0.0.3.jar,/$location/azure-documentdb-1.10.0.jar

Se si usa un servizio notebook, come Azure HDInsight Jupyter, è possibile usare i comandi spark magic:If you are using a notebook service such as Azure HDInsight Jupyter notebook service, you can use the spark magic commands:

%%configure
{ "jars": ["wasb:///example/jars/azure-documentdb-1.10.0.jar","wasb:///example/jars/azure-cosmosdb-spark-0.0.3.jar"],
  "conf": {
    "spark.jars.excludes": "org.scala-lang:scala-reflect"
   }
}

Il comando jars consente di includere i due file JAR necessari per azure-cosmosdb-spark (il file stesso e Azure DocumentDB Java SDK) ed escludere scala-reflect in modo da non interferire con le chiamate Livy (notebook Jupyter > Livy > Spark).The jars command enables you to include the two JARs that are needed for azure-cosmosdb-spark (itself and the Azure DocumentDB Java SDK) and exclude scala-reflect so that it does not interfere with the Livy calls (Jupyter notebook > Livy > Spark).

Connettere Spark ad Azure Cosmos DB con il connettoreConnect Spark to Azure Cosmos DB using the connector

Anche se il trasporto di comunicazione è un po' più complesso, l'esecuzione di una query da Spark ad Azure Cosmos DB con il connettore è notevolmente più veloce.Although the communication transport is a little more complicated, executing a query from Spark to Azure Cosmos DB by using the connector is significantly faster.

Il frammento di codice seguente illustra come usare il connettore in un contesto Spark.The following code snippet shows how to use the connector in a Spark context.

// Import Necessary Libraries
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark._
import com.microsoft.azure.cosmosdb.spark.config.Config

// Configure connection to your collection
val readConfig2 = Config(Map("Endpoint" -> "https://doctorwho.documents.azure.com:443/",
"Masterkey" -> "le1n99i1w5l7uvokJs3RT5ZAH8dc3ql7lx2CG0h0kK4lVWPkQnwpRLyAN0nwS1z4Cyd1lJgvGUfMWR3v8vkXKA==",
"Database" -> "DepartureDelays",
"preferredRegions" -> "Central US;East US2;",
"Collection" -> "flights_pcoll",
"SamplingRatio" -> "1.0"))

// Create collection connection
val coll = spark.sqlContext.read.cosmosDB(readConfig2)
coll.createOrReplaceTempView("c")

Come indicato nel frammento di codice:As noted in the code snippet:

  • azure-cosmosdb-spark contiene tutti i parametri di connessione necessari, che includono le posizioni preferite.azure-cosmosdb-spark contains the all the necessary connection parameters, which include the preferred locations. È ad esempio possibile scegliere la replica di lettura e l'ordine di priorità.For example, you can choose the read replica and priority order.
  • Importare semplicemente le librerie necessarie e configurare masterKey e host per creare il client Azure Cosmos DB.Just import the necessary libraries and configure your masterKey and host to create the Azure Cosmos DB client.

Eseguire query Spark tramite il connettoreExecute Spark queries via the connector

L'esempio seguente usa l'istanza di Azure Cosmos DB creata nel frammento precedente con le chiavi di sola lettura specificate.The following example uses the Azure Cosmos DB instance that was created in the previous snippet by using the specified read-only keys. Il frammento di codice seguente consente la connessione alla raccolta DepartureDelays.flights_pcoll (nell'account DoctorWho specificato in precedenza) ed esegue una query per estrarre le informazioni sui ritardi dei voli in partenza da Seattle.The following code snippet connects to the DepartureDelays.flights_pcoll collection (in the DoctorWho account as specified earlier) and runs a query to extract the flight delay information of flights that are departing from Seattle.

// Queries
var query = "SELECT c.date, c.delay, c.distance, c.origin, c.destination FROM c WHERE c.origin = 'SEA'"
val df = spark.sql(query)

// Run DF query (count)
df.count()

// Run DF query (show)
df.show()

Vantaggi dell'implementazione del connettore Spark per Azure Cosmos DBWhy use the Spark to Azure Cosmos DB connector implementation?

La connessione di Spark ad Azure Cosmos DB con il connettore viene usata in genere negli scenari in cui:Connecting Spark to Azure Cosmos DB by using the connector is typically for scenarios where:

  • Si vuole usare Scala e aggiornarlo in modo da includere un wrapper Python come indicato in Issue 3: Add Python wrapper and examples (Problema 3: Aggiungere il wrapper Python ed esempi).You want to use Scala and update it to include a Python wrapper as noted in Issue 3: Add Python wrapper and examples.
  • La quantità di dati da trasferire tra Apache Spark e Azure Cosmos DB è elevata.You have a large amount of data to transfer between Apache Spark and Azure Cosmos DB.

Per informazioni sulle differenze a livello di prestazioni delle query, vedere la wiki sulle esecuzioni di test di query.To give you an idea of the query performance difference, see the Query Test Runs wiki.

Esempio di aggregazione distribuitaDistributed aggregation example

Questa sezione fornisce alcuni esempi di come è possibile eseguire analisi e aggregazioni distribuite combinando Apache Spark e Azure Cosmos DB.This section provides some examples of how you can do distributed aggregations and analytics by using Apache Spark and Azure Cosmos DB together. Azure Cosmos DB supporta già le aggregazioni, come illustrato nel post di blog sulle aggregazioni su scala globale con Azure Cosmos DB.Azure Cosmos DB already supports aggregations, which is discussed in the Planet scale aggregates with Azure Cosmos DB blog. Ecco come passare a un livello superiore con Apache Spark.Here is how you can take it to the next level with Apache Spark.

Si noti che queste aggregazioni fanno riferimento al notebook del connettore Spark per Azure Cosmos DB.Note that these aggregations are in reference to the Spark to Azure Cosmos DB Connector notebook.

Eseguire la connessione ai dati di esempio sui voliConnect to flights sample data

Queste aggregazioni di esempio accedono ad alcuni dati sulle prestazioni dei voli archiviati nel database Azure Cosmos DB DoctorWho.These aggregation examples access some flight performance data that's stored in our DoctorWho Azure Cosmos DB database. Per connettersi al database, è necessario usare il frammento di codice seguente:To connect to it, you need to utilize the following code snippet:

// Import Spark to Azure Cosmos DB connector
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark._
import com.microsoft.azure.cosmosdb.spark.config.Config

// Connect to Azure Cosmos DB Database
val readConfig2 = Config(Map("Endpoint" -> "https://doctorwho.documents.azure.com:443/",
"Masterkey" -> "le1n99i1w5l7uvokJs3RT5ZAH8dc3ql7lx2CG0h0kK4lVWPkQnwpRLyAN0nwS1z4Cyd1lJgvGUfMWR3v8vkXKA==",
"Database" -> "DepartureDelays",
"preferredRegions" -> "Central US;East US 2;",
"Collection" -> "flights_pcoll",
"SamplingRatio" -> "1.0"))

// Create collection connection
val coll = spark.sqlContext.read.cosmosDB(readConfig2)
coll.createOrReplaceTempView("c")

Con questo frammento si eseguirà anche una query di base che trasferisce il set di dati filtrato da Azure Cosmos DB a Spark, che potrà eseguire aggregazioni distribuite.With this snippet, we are also going to run a base query that transfers the filtered set of data from Azure Cosmos DB to Spark where the latter can perform distributed aggregates. In questo caso la richiesta è relativa ai voli in partenza da Seattle (SEA).In this case, we are asking for flights that depart from Seattle (SEA).

// Run, get row count, and time query
val originSEA = spark.sql("SELECT c.date, c.delay, c.distance, c.origin, c.destination FROM c WHERE c.origin = 'SEA'")
originSEA.createOrReplaceTempView("originSEA")

I risultati seguenti sono stati generati eseguendo le query dal servizio notebook Jupyter.The following results were generated by running the queries from the Jupyter notebook service. Si noti che tutti i frammenti di codice sono generici e non specifici di un servizio.Note that all the code snippets are generic and not specific to any service.

Esecuzione di query LIMIT e COUNTRunning LIMIT and COUNT queries

Come avviene in genere in SQL/Spark SQL, è possibile iniziare con una query LIMIT:Just like you're used to in SQL/Spark SQL, let's start off with a LIMIT query:

Query LIMIT Spark

La query successiva è una semplice e rapida query COUNT:The next query is a simple and fast COUNT query:

Query COUNT Spark

Query GROUP BYGROUP BY query

In questo set successivo è possibile eseguire facilmente query GROUP BY sul database Azure Cosmos DB:In this next set, we can easily run GROUP BY queries against our Azure Cosmos DB database:

select destination, sum(delay) as TotalDelays
from originSEA
group by destination
order by sum(delay) desc limit 10

Grafico della query GROUP BY Spark

Query DISTINCT, ORDER BYDISTINCT, ORDER BY query

Di seguito una query DISTINCT, ORDER BY:And here is a DISTINCT, ORDER BY query:

Grafico della query GROUP BY Spark

Continuare l'analisi dei dati relativi ai voliContinue the flight data analysis

Per continuare l'analisi di questi dati, è possibile usare le query di esempio seguenti:You can use the following example queries to continue analysis of the flight data:

Prime 5 destinazioni (città) con ritardi in partenza da SeattleTop 5 delayed destinations (cities) departing from Seattle

select destination, sum(delay)
from originSEA
where delay < 0
group by destination
order by sum(delay) limit 5

Grafico dei ritardi massimi Spark

Calcolare i ritardi medi per città di destinazione dei voli in partenza da SeattleCalculate median delays by destination cities departing from Seattle

select destination, percentile_approx(delay, 0.5) as median_delay
from originSEA
where delay < 0
group by destination
order by percentile_approx(delay, 0.5)

Grafico dei ritardi medi Spark

Passaggi successiviNext steps

Se ancora non lo si è fatto, scaricare il connettore Spark per Azure Cosmos DB dal repository GitHub azure-cosmosdb-spark ed esplorare le risorse aggiuntive nel repository:If you haven't already, download the Spark to Azure Cosmos DB connector from the azure-cosmosdb-spark GitHub repository and explore the additional resources in the repo:

È possibile anche esaminare la guida ad Apache Spark SQL, DataFrame e set di dati e l'articolo su Apache Spark in Azure HDInsight.You might also want to review the Apache Spark SQL, DataFrames, and Datasets Guide and the Apache Spark on Azure HDInsight article.