Velocizzare l'analisi di Big Data in tempo reale con il connettore Spark per Azure Cosmos DB

Il connettore Spark per Azure Cosmos DB consente a Cosmos DB di fungere da origine di input o sink di output per i processi Apache Spark. Connettendo Spark ad Azure Cosmos DB, è possibile risolvere più velocemente problemi di data science in rapida evoluzione usando Cosmos DB per salvare in modo permanente i dati e sottoporli a query in tempi rapidi. Il connettore Spark per Azure Cosmos DB usa in modo efficiente gli indici gestiti nativi di Cosmos DB. Gli indici consentono colonne aggiornabili in fase di analisi e propagazione del filtraggio in base al predicato per i dati distribuiti a livello globale in rapida evoluzione, che spaziano da scenari IoT (Internet delle cose) a scenari di data science e analisi.

Per usare Spark GraphX e le API Graph Gremlin di Azure Cosmos DB, vedere Azure Cosmos DB: eseguire analisi dei grafi con Spark e Apache TinkerPop Gremlin.

Scaricare

Per iniziare, scaricare il connettore Spark per Azure Cosmos DB (anteprima) dal repository azure-documentdb-spark in GitHub.

Componenti del connettore

Sono i seguenti:

  • Azure Cosmos DB consente ai clienti di effettuare il provisioning e ridimensionare in modo elastico la velocità effettiva e le risorse di archiviazione in un numero qualsiasi di aree geografiche. Il servizio offre:

  • Apache Spark è un potente motore di elaborazione open source incentrato su velocità, semplicità d'uso e analisi avanzata.

  • Apache Spark in Azure HDInsight per poter distribuire Apache Spark nel cloud per distribuzioni cruciali usando Azure HDInsight.

Versioni supportate ufficialmente:

Componente Versione
Apache Spark 2.0+
Scala 2.11
Azure DocumentDB Java SDK 1.10.0

Questo articolo consente di eseguire alcuni semplici esempi con Python (tramite pyDocumentDB) e le interfacce di Scala.

Per la connessione di Apache Spark e Azure Cosmos DB sono disponibili due approcci:

Implementazione di pyDocumentDB

La versione corrente di pyDocumentDB SDK consente di connettere Spark a Cosmos DB come illustrato nel diagramma seguente:

Flusso di dati da Spark a Cosmos DB tramite pyDocumentDB

Flusso di dati dell'implementazione di pyDocumentDB

Il flusso di dati è il seguente:

  1. Il nodo master Spark si connette al nodo del gateway Cosmos DB tramite pyDocumentDB. Un utente specifica solo le connessioni di Spark e Cosmos DB. Le connessioni ai rispettivi nodi master e del gateway sono trasparenti per l'utente.
  2. Il nodo del gateway esegue la query su Cosmos DB, dove la query viene successivamente eseguita sulle partizioni della raccolta nei nodi dati. La risposta a queste query viene inviata di nuovo al nodo del gateway e il set di risultati viene restituito al nodo master Spark.
  3. Le query successive, ad esempio su un frame di dati Spark, vengono inviate ai nodi di lavoro Spark per l'elaborazione.

La comunicazione tra Spark e Cosmos DB è limitata al nodo master Spark e ai nodi del gateway Cosmos DB. Le query vengono eseguite alla velocità consentita dal livello di trasporto tra i due nodi.

Installare pyDocumentDB

È possibile installare pyDocumentDB sul nodo del driver usando pip, ad esempio:

pip install pyDocumentDB

Connettere Spark a Cosmos DB tramite pyDocumentDB

La semplicità del trasporto di comunicazione rende relativamente semplice l'esecuzione di una query da Spark a Cosmos DB con pyDocumentDB.

Il frammento di codice seguente mostra come usare pyDocumentDB in un contesto Spark.

# Import Necessary Libraries
import pydocumentdb
from pydocumentdb import document_client
from pydocumentdb import documents
import datetime

# Configuring the connection policy (allowing for endpoint discovery)
connectionPolicy = documents.ConnectionPolicy()
connectionPolicy.EnableEndpointDiscovery
connectionPolicy.PreferredLocations = ["Central US", "East US 2", "Southeast Asia", "Western Europe","Canada Central"]


# Set keys to connect to Cosmos DB
masterKey = 'le1n99i1w5l7uvokJs3RT5ZAH8dc3ql7lx2CG0h0kK4lVWPkQnwpRLyAN0nwS1z4Cyd1lJgvGUfMWR3v8vkXKA=='
host = 'https://doctorwho.documents.azure.com:443/'
client = document_client.DocumentClient(host, {'masterKey': masterKey}, connectionPolicy)

Come indicato nel frammento di codice:

  • Cosmos DB Python SDK (pyDocumentDB) contiene tutti i parametri di connessione necessari. Il parametro relativo alle posizioni preferite determina ad esempio la replica di lettura e l'ordine di priorità.
  • Importare le librerie necessarie e configurare masterKey e host per creare il client Cosmos DB (pydocumentdb.document_client).

Eseguire query Spark tramite pyDocumentDB

Gli esempi seguenti usano l'istanza di Cosmos DB creata nel frammento precedente con le chiavi di sola lettura specificate. Il frammento di codice seguente si connette alla raccolta airports.codes nell'account DoctorWho specificato in precedenza ed esegue una query per estrarre le città con aeroporto nello stato di Washington.

# Configure Database and Collections
databaseId = 'airports'
collectionId = 'codes'

# Configurations the Cosmos DB client will use to connect to the database and collection
dbLink = 'dbs/' + databaseId
collLink = dbLink + '/colls/' + collectionId

# Set query parameter
querystr = "SELECT c.City FROM c WHERE c.State='WA'"

# Query documents
query = client.QueryDocuments(collLink, querystr, options=None, partition_key=None)

# Query for partitioned collections
# query = client.QueryDocuments(collLink, query, options= { 'enableCrossPartitionQuery': True }, partition_key=None)

# Push into list `elements`
elements = list(query)

Dopo l'esecuzione della query tramite query, il risultato è un oggetto query_iterable.QueryIterable che viene convertito in un elenco Python. Un elenco Python può essere convertito facilmente in un frame di dati Spark usando il codice seguente:

# Create `df` Spark DataFrame from `elements` Python list
df = spark.createDataFrame(elements)

Perché usare pyDocumentDB per connettere Spark a Cosmos DB?

La connessione di Spark a Cosmos DB mediante pyDocumentDB viene usata in genere negli scenari in cui:

  • Si vuole usare Python.
  • Si restituisce un set di risultati relativamente piccolo da Cosmos DB a Spark. Si noti che il set di dati sottostante in Cosmos DB può essere piuttosto grande. Si stanno applicando filtri, ovvero eseguendo filtri in base a predicati, nell'origine Cosmos DB.

Connettore Spark per Cosmos DB

Il connettore Spark per Cosmos DB usa Azure DocumentDB Java SDK e sposta i dati tra i nodi di lavoro Spark e Cosmos DB come illustrato nel diagramma seguente:

Flusso di dati nel connettore Spark per Cosmos DB

Il flusso di dati è il seguente:

  1. Il nodo master Spark si connette al nodo del gateway Cosmos DB per ottenere la mappa delle partizioni. Un utente specifica solo le connessioni di Spark e Cosmos DB. Le connessioni ai rispettivi nodi master e del gateway sono trasparenti per l'utente.
  2. Queste informazioni vengono restituite al nodo master Spark. A questo punto sarà possibile analizzare la query per determinare le partizioni e le relative posizioni in Cosmos DB a cui è necessario accedere.
  3. Queste informazioni vengono trasmesse ai nodi di lavoro Spark.
  4. I nodi di lavoro Spark si connettono direttamente alle partizioni Cosmos DB per estrarre i dati e restituiscono i dati alle partizioni Spark nei nodi di lavoro Spark.

La comunicazione tra Spark e Cosmos DB è notevolmente più veloce perché lo spostamento dei dati avviene tra i nodi di lavoro Spark e i nodi dati (partizioni) Cosmos DB.

Creare il connettore Spark per Cosmos DB

Attualmente il progetto del connettore usa Maven. Per creare il connettore senza le dipendenze, è possibile eseguire:

mvn clean package

È anche possibile scaricare le versioni più recenti del file JAR dalla cartella releases.

Includere il file JAR di Spark per Azure Cosmos DB

Prima dell'esecuzione del codice, è necessario includere il file JAR di Spark per Azure Cosmos. Se si usa spark-shell, è possibile includere il file JAR usando l'opzione --jars.

spark-shell --master $master --jars /$location/azure-cosmosdb-spark-0.0.3-jar-with-dependencies.jar

Se si vuole eseguire il file JAR senza dipendenze, usare il codice seguente:

spark-shell --master $master --jars /$location/azure-cosmosdb-spark-0.0.3.jar,/$location/azure-documentdb-1.10.0.jar

Se si usa un servizio notebook, come Azure HDInsight Jupyter, è possibile usare i comandi spark magic:

%%configure
{ "jars": ["wasb:///example/jars/azure-documentdb-1.10.0.jar","wasb:///example/jars/azure-cosmosdb-spark-0.0.3.jar"],
  "conf": {
    "spark.jars.excludes": "org.scala-lang:scala-reflect"
   }
}

Il comando jars consente di includere i due file JAR necessari per azure-cosmosdb-spark (il file stesso e Azure DocumentDB Java SDK) ed escludere scala-reflect in modo da non interferire con le chiamate Livy (notebook Jupyter > Livy > Spark).

Connettere Spark a Cosmos DB con il connettore

Nonostante il trasporto di comunicazione sia un po' più complesso, l'esecuzione di una query da Spark a Cosmos DB con il connettore è notevolmente più veloce.

Il frammento di codice seguente illustra come usare il connettore in un contesto Spark.

// Import Necessary Libraries
import org.joda.time._
import org.joda.time.format._
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark._
import com.microsoft.azure.cosmosdb.spark.config.Config

// Configure connection to your collection
val readConfig2 = Config(Map("Endpoint" -> "https://doctorwho.documents.azure.com:443/",
"Masterkey" -> "le1n99i1w5l7uvokJs3RT5ZAH8dc3ql7lx2CG0h0kK4lVWPkQnwpRLyAN0nwS1z4Cyd1lJgvGUfMWR3v8vkXKA==",
"Database" -> "DepartureDelays",
"preferredRegions" -> "Central US;East US2;",
"Collection" -> "flights_pcoll",
"SamplingRatio" -> "1.0"))

// Create collection connection
val coll = spark.sqlContext.read.cosmosDB(readConfig2)
coll.createOrReplaceTempView("c")

Come indicato nel frammento di codice:

  • azure-cosmosdb-spark contiene tutti i parametri di connessione necessari, che includono le posizioni preferite. È ad esempio possibile scegliere la replica di lettura e l'ordine di priorità.
  • Importare le librerie necessarie e configurare masterKey e host per creare il client Cosmos DB.

Eseguire query Spark tramite il connettore

L'esempio seguente usa l'istanza di Cosmos DB creata nel frammento precedente con le chiavi di sola lettura specificate. Il frammento di codice seguente consente la connessione alla raccolta DepartureDelays.flights_pcoll (nell'account DoctorWho specificato in precedenza) ed esegue una query per estrarre le informazioni sui ritardi dei voli in partenza da Seattle.

// Queries
var query = "SELECT c.date, c.delay, c.distance, c.origin, c.destination FROM c WHERE c.origin = 'SEA'"
val df = spark.sql(query)

// Run DF query (count)
df.count()

// Run DF query (show)
df.show()

Vantaggi dell'implementazione del connettore Spark per Cosmos DB

La connessione di Spark a Cosmos DB con il connettore viene usata in genere negli scenari in cui:

  • Si vuole usare Scala e aggiornarlo in modo da includere un wrapper Python come indicato in Issue 3: Add Python wrapper and examples (Problema 3: Aggiungere il wrapper Python ed esempi).
  • La quantità di dati da trasferire tra Apache Spark e Cosmos DB è elevata.

Per informazioni sulle differenze a livello di prestazioni delle query, vedere la wiki sulle esecuzioni di test di query.

Esempio di aggregazione distribuita

Questa sezione fornisce alcuni esempi di come è possibile eseguire analisi e aggregazioni distribuite combinando Apache Spark e Azure Cosmos DB. Azure Cosmos DB supporta già le aggregazioni, come illustrato nel post di blog sulle aggregazioni su scala globale con Azure Cosmos DB. Ecco come passare a un livello superiore con Apache Spark.

Si noti che queste aggregazioni fanno riferimento al notebook del connettore Spark per Cosmos DB.

Eseguire la connessione ai dati di esempio sui voli

Queste aggregazioni di esempio accedono ad alcuni dati sulle prestazioni dei voli archiviati nel database Cosmos DB DoctorWho. Per connettersi al database, è necessario usare il frammento di codice seguente:

// Import Spark to Cosmos DB connector
import com.microsoft.azure.cosmosdb.spark.schema._
import com.microsoft.azure.cosmosdb.spark._
import com.microsoft.azure.cosmosdb.spark.config.Config

// Connect to Cosmos DB Database
val readConfig2 = Config(Map("Endpoint" -> "https://doctorwho.documents.azure.com:443/",
"Masterkey" -> "le1n99i1w5l7uvokJs3RT5ZAH8dc3ql7lx2CG0h0kK4lVWPkQnwpRLyAN0nwS1z4Cyd1lJgvGUfMWR3v8vkXKA==",
"Database" -> "DepartureDelays",
"preferredRegions" -> "Central US;East US 2;",
"Collection" -> "flights_pcoll",
"SamplingRatio" -> "1.0"))

// Create collection connection
val coll = spark.sqlContext.read.cosmosDB(readConfig2)
coll.createOrReplaceTempView("c")

Con questo frammento si eseguirà anche una query di base che trasferisce il set di dati filtrato da Cosmos DB a Spark, che potrà eseguire aggregazioni distribuite. In questo caso la richiesta è relativa ai voli in partenza da Seattle (SEA).

// Run, get row count, and time query
val originSEA = spark.sql("SELECT c.date, c.delay, c.distance, c.origin, c.destination FROM c WHERE c.origin = 'SEA'")
originSEA.createOrReplaceTempView("originSEA")

I risultati seguenti sono stati generati eseguendo le query dal servizio notebook Jupyter. Si noti che tutti i frammenti di codice sono generici e non specifici di un servizio.

Esecuzione di query LIMIT e COUNT

Come avviene in genere in SQL/Spark SQL, è possibile iniziare con una query LIMIT:

Query LIMIT Spark

La query successiva è una semplice e rapida query COUNT:

Query COUNT Spark

Query GROUP BY

In questo set successivo è possibile eseguire facilmente query GROUP BY sul database Cosmos DB:

select destination, sum(delay) as TotalDelays
from originSEA
group by destination
order by sum(delay) desc limit 10

Grafico della query GROUP BY Spark

Query DISTINCT, ORDER BY

Di seguito una query DISTINCT, ORDER BY:

Grafico della query GROUP BY Spark

Continuare l'analisi dei dati relativi ai voli

Per continuare l'analisi di questi dati, è possibile usare le query di esempio seguenti:

Prime 5 destinazioni (città) con ritardi in partenza da Seattle

select destination, sum(delay)
from originSEA
where delay < 0
group by destination
order by sum(delay) limit 5

Grafico dei ritardi massimi Spark

Calcolare i ritardi medi per città di destinazione dei voli in partenza da Seattle

select destination, percentile_approx(delay, 0.5) as median_delay
from originSEA
where delay < 0
group by destination
order by percentile_approx(delay, 0.5)

Grafico dei ritardi medi Spark

Passaggi successivi

Se ancora non lo si è fatto, scaricare il connettore Spark per Cosmos DB dal repository GitHub azure-cosmosdb-spark ed esplorare le risorse aggiuntive nel repository:

È possibile anche esaminare la guida ad Apache Spark SQL, DataFrame e set di dati e l'articolo su Apache Spark in Azure HDInsight.