Uživatelská příručka pro GraphFrames – Scala
Tento článek ukazuje příklady z uživatelské příručky pro GraphFrames.
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.graphframes._
Vytváření graphframes
GraphFrames můžete vytvářet z vrcholů a hraničních datových rámců.
- Datový rámec vrcholu: Datový rámec vrcholu by měl obsahovat speciální sloupec s názvem
id
, který určuje jedinečná ID každého vrcholu v grafu. - Datový rámec Edge: Datový rámec edge by měl obsahovat dva speciální sloupce:
src
(ID zdrojového vrcholu hrany) adst
(ID cílového vrcholu hrany).
Oba datové rámce můžou mít libovolné další sloupce. Tyto sloupce můžou představovat atributy vrcholu a hrany.
Vytvoření vrcholů a hran
// Vertex DataFrame
val v = spark.createDataFrame(List(
("a", "Alice", 34),
("b", "Bob", 36),
("c", "Charlie", 30),
("d", "David", 29),
("e", "Esther", 32),
("f", "Fanny", 36),
("g", "Gabby", 60)
)).toDF("id", "name", "age")
// Edge DataFrame
val e = spark.createDataFrame(List(
("a", "b", "friend"),
("b", "c", "follow"),
("c", "b", "follow"),
("f", "c", "follow"),
("e", "f", "follow"),
("e", "d", "friend"),
("d", "a", "friend"),
("a", "e", "friend")
)).toDF("src", "dst", "relationship")
Pojďme vytvořit graf z těchto vrcholů a těchto hran:
val g = GraphFrame(v, e)
// This example graph also comes with the GraphFrames package.
// val g = examples.Graphs.friends
Základní dotazy na graf a datový rámec
GraphFrames poskytují jednoduché dotazy na grafy, jako je například stupeň uzlu.
Vzhledem k tomu, že graphFrames představují grafy jako dvojice datových rámců vrcholů a okrajů, je také snadné vytvářet výkonné dotazy přímo na vrchol a hraniční datové rámce. Tyto datové rámce jsou k dispozici jako pole vrcholů a hran v graphFrame.
display(g.vertices)
display(g.edges)
Příchozí stupeň vrcholů:
display(g.inDegrees)
Výstupní stupeň vrcholů:
display(g.outDegrees)
Stupeň vrcholů:
display(g.degrees)
Dotazy můžete spouštět přímo v datovém rámci vrcholů. V grafu můžeme například najít věk nejmladší osoby:
val youngest = g.vertices.groupBy().min("age")
display(youngest)
Podobně můžete spouštět dotazy na okrajích datového rámce. Pojďme například spočítat počet relací "sledovat" v grafu:
val numFollows = g.edges.filter("relationship = 'follow'").count()
Hledání motivů
Vytvářejte složitější vztahy zahrnující hrany a vrcholy pomocí motivů. Následující buňka najde dvojice vrcholů s hranami v obou směrech mezi nimi. Výsledkem je datový rámec, ve kterém názvy sloupců jsou klíče motivu.
Další podrobnosti o rozhraní API najdete v uživatelské příručce pro GraphFrame .
// Search for pairs of vertices with edges in both directions between them.
val motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)")
display(motifs)
Vzhledem k tomu, že výsledkem je datový rámec, můžete na motivu vytvořit složitější dotazy. Pojďme najít všechny vzájemné vztahy, ve kterých je jedna osoba starší než 30 let:
val filtered = motifs.filter("b.age > 30")
display(filtered)
Stavové dotazy
Většina dotazů motivů je bezstavová a snadno se vyslovuje, jako v příkladech výše. Další příklady ukazují složitější dotazy, které nesou stav podél cesty v motivu. Vyjádřete tyto dotazy kombinací hledání motivu GraphFrame s filtry na výsledek, kde filtry používají sekvenční operace k vytvoření řady sloupců datového rámce.
Předpokládejme například, že chcete identifikovat řetězec 4 vrcholů s nějakou vlastností definovanou posloupností funkcí. To znamená, že mezi řetězy se 4 vrcholy a->b->c->d
identifikujte podmnožinu řetězců odpovídajících tomuto komplexnímu filtru:
- Inicializace stavu na cestě
- Aktualizujte stav na základě vrcholu a.
- Aktualizujte stav na základě vrcholu b.
- Atd. pro c a d.
- Pokud konečný stav odpovídá nějaké podmínce, filtr řetěz přijme.
Následující fragmenty kódu demonstrují tento proces, kde identifikujeme řetězce se 4 vrcholy tak, aby alespoň 2 ze 3 hran byly "přátelské" relace. V tomto příkladu je stav aktuální počet "přátelských" hran; obecně může jít o libovolný sloupec datového rámce.
// Find chains of 4 vertices.
val chain4 = g.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")
// Query on sequence, with state (cnt)
// (a) Define method for updating state given the next element of the motif.
def sumFriends(cnt: Column, relationship: Column): Column = {
when(relationship === "friend", cnt + 1).otherwise(cnt)
}
// (b) Use sequence operation to apply method to sequence of elements in motif.
// In this case, the elements are the 3 edges.
val condition = Seq("ab", "bc", "cd").
foldLeft(lit(0))((cnt, e) => sumFriends(cnt, col(e)("relationship")))
// (c) Apply filter to DataFrame.
val chainWith2Friends2 = chain4.where(condition >= 2)
display(chainWith2Friends2)
Podgrafy
GraphFrames poskytuje rozhraní API pro vytváření podgrafů filtrováním okrajů a vrcholů. Tyto filtry se můžou skládat dohromady. Například následující podgraf obsahuje jenom lidi, kteří jsou přátelé a kteří jsou starší než 30 let.
// Select subgraph of users older than 30, and edges of type "friend"
val g2 = g
.filterEdges("relationship = 'friend'")
.filterVertices("age > 30")
.dropIsolatedVertices()
Komplexní trojité filtry
Následující příklad ukazuje, jak vybrat podgraf založený na trojitých filtrech, které pracují na okraji a jeho vrcholech "src" a "dst". Rozšíření tohoto příkladu tak, aby přesahoval triplety pomocí složitějších motivů, je jednoduché.
// Select subgraph based on edges "e" of type "follow"
// pointing from a younger user "a" to an older user "b".
val paths = g.find("(a)-[e]->(b)")
.filter("e.relationship = 'follow'")
.filter("a.age < b.age")
// "paths" contains vertex info. Extract the edges.
val e2 = paths.select("e.src", "e.dst", "e.relationship")
// In Spark 1.5+, the user may simplify this call:
// val e2 = paths.select("e.*")
// Construct the subgraph
val g2 = GraphFrame(g.vertices, e2)
display(g2.vertices)
display(g2.edges)
Standardní grafové algoritmy
Tato část popisuje standardní grafové algoritmy integrované do GraphFrames.
Hledání v první šířce (BFS)
Vyhledejte uživatele ve věku 32 let < v "Esther".
val paths: DataFrame = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32").run()
display(paths)
Vyhledávání může také omezit filtry okrajů a maximální délku cesty.
val filteredPaths = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32")
.edgeFilter("relationship != 'friend'")
.maxPathLength(3)
.run()
display(filteredPaths)
Připojené komponenty
Vypočítá členství připojených komponent jednotlivých vrcholů a vrátí graf s přiřazeným ID komponenty.
val result = g.connectedComponents.run() // doesn't work on Spark 1.4
display(result)
Silně propojené komponenty
Vypočítat silně propojenou komponentu (SCC) každého vrcholu a vrátit graf s každým vrcholem přiřazeným k SCC obsahující tento vrchol.
val result = g.stronglyConnectedComponents.maxIter(10).run()
display(result.orderBy("component"))
Šíření popisků
Spusťte statický algoritmus šíření popisků pro detekci komunit v sítích.
Každý uzel v síti je zpočátku přiřazen k vlastní komunitě. V každém superkroku odesílají uzly svoji komunitu všem sousedům a aktualizují svůj stav na režim komunitního přidružení příchozích zpráv.
LPA je standardní algoritmus detekce komunity pro grafy. Je to levné výpočetně, i když (1) konvergence není zaručena a (2) jeden může skončit s triviálními řešeními (všechny uzly se identifikují do jedné komunity).
val result = g.labelPropagation.maxIter(5).run()
display(result.orderBy("label"))
Pagerank
Identifikace důležitých vrcholů v grafu na základě připojení
// Run PageRank until convergence to tolerance "tol".
val results = g.pageRank.resetProbability(0.15).tol(0.01).run()
display(results.vertices)
display(results.edges)
// Run PageRank for a fixed number of iterations.
val results2 = g.pageRank.resetProbability(0.15).maxIter(10).run()
display(results2.vertices)
// Run PageRank personalized for vertex "a"
val results3 = g.pageRank.resetProbability(0.15).maxIter(10).sourceId("a").run()
display(results3.vertices)
Nejkratší cesty
Vypočítá nejkratší cesty k dané sadě vrcholů orientačních bodů, kde orientační body určují ID vrcholu.
val paths = g.shortestPaths.landmarks(Seq("a", "d")).run()
display(paths)
Počítání trojúhelníků
Vypočítá počet trojúhelníků procházejících jednotlivými vrcholy.
import org.graphframes.examples
val g: GraphFrame = examples.Graphs.friends // get example graph
val results = g.triangleCount.run()
results.select("id", "count").show()