GraphFrames felhasználói útmutató – Scala

Ez a cikk példákat mutat be a GraphFrames felhasználói útmutatójából.

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.graphframes._

GraphFrame-ek létrehozása

GraphFrame-eket csúcspontokból és perem adatkeretekből hozhat létre.

  • Csúcsérték adatkerete: A csúcsok adatkeretének tartalmaznia kell egy speciális oszlopot, id amely egyedi azonosítókat ad meg a gráf egyes csúcspontjaihoz.
  • Edge DataFrame: Az edge DataFrame-nek két speciális oszlopot kell tartalmaznia: src (az él forrás csúcsazonosítója) és dst (az él cél csúcspontazonosítója).

Mindkét Adatkeret tetszőleges más oszlopokkal rendelkezhet. Ezek az oszlopok csúcs- és élattribútumokat jelölhetnek.

Csúcspontok és élek létrehozása

// Vertex DataFrame
val v = spark.createDataFrame(List(
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Fanny", 36),
  ("g", "Gabby", 60)
)).toDF("id", "name", "age")
// Edge DataFrame
val e = spark.createDataFrame(List(
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
  ("f", "c", "follow"),
  ("e", "f", "follow"),
  ("e", "d", "friend"),
  ("d", "a", "friend"),
  ("a", "e", "friend")
)).toDF("src", "dst", "relationship")

Hozzunk létre egy gráfot ezekből a csúcspontokból és a következő élekből:

val g = GraphFrame(v, e)
// This example graph also comes with the GraphFrames package.
// val g = examples.Graphs.friends

Alapszintű gráf- és DataFrame-lekérdezések

A GraphFrame-ek egyszerű gráf lekérdezéseket biztosítanak, például csomópontfokot.

Emellett mivel a GraphFrame-ek csúcs- és él-adatkeretek párjaként jelölik a gráfokat, könnyű hatékony lekérdezéseket létrehozni közvetlenül a csúcsponton és az él adatkeretein. Ezek a DataFrame-ek csúcsok és élek mezőként érhetők el a GraphFrame-ben.

display(g.vertices)
display(g.edges)

A csúcspontok bejövő foka:

display(g.inDegrees)

A csúcspontok kimenő foka:

display(g.outDegrees)

A csúcsok foka:

display(g.degrees)

A lekérdezéseket közvetlenül a DataFrame csúcsán futtathatja. A gráfban például megtalálhatjuk a legfiatalabb személy életkorát:

val youngest = g.vertices.groupBy().min("age")
display(youngest)

Hasonlóképpen lekérdezéseket is futtathat a DataFrame peremhálózatán. Számoljuk meg például a gráf "követő" kapcsolatainak számát:

val numFollows = g.edges.filter("relationship = 'follow'").count()

Motif megállapítás

Összetettebb kapcsolatokat hozhat létre élekkel és csúcsokkal, motívumok használatával. Az alábbi cella megkeresi a két irányban széllel rendelkező csúcsok párjait. Az eredmény egy DataFrame, amelyben az oszlopnevek motívumkulcsok.

Az API-val kapcsolatos további részletekért tekintse meg a GraphFrame felhasználói útmutatót .

// Search for pairs of vertices with edges in both directions between them.
val motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)")
display(motifs)

Mivel az eredmény egy DataFrame, összetettebb lekérdezéseket hozhat létre a motívum fölé. Nézzük meg azokat a kölcsönös kapcsolatokat, amelyekben egy személy 30 évnél idősebb:

val filtered = motifs.filter("b.age > 30")
display(filtered)

Állapotalapú lekérdezések

A legtöbb motívumlekérdezés állapot nélküli és egyszerűen kifejezhet, ahogy a fenti példákban is látható. A következő példák összetettebb lekérdezéseket mutatnak be, amelyek állapotot hordoznak a motívum egy útvonalán. Fejezze ki ezeket a lekérdezéseket a GraphFrame-motívumkeresés és az eredmény szűrőinek kombinálásával, ahol a szűrők sorozatműveleteket használnak DataFrame-oszlopok sorozatának létrehozásához.

Tegyük fel például, hogy egy 4 csúcsból álló láncot szeretne azonosítani, amelynek egyes tulajdonságait függvények sorozata határozza meg. Azaz a 4 csúcslánc a->b->c->dközött azonosítsa a láncok ezen összetett szűrőnek megfelelő részhalmazát:

  • Állapot inicializálása az elérési úton.
  • Állapot frissítése az a csúcspont alapján.
  • Állapot frissítése a b csúcspont alapján.
  • Stb. c és d esetén.
  • Ha a végső állapot megfelel valamilyen feltételnek, akkor a szűrő elfogadja a láncot.

Az alábbi kódrészletek ezt a folyamatot mutatják be, ahol 4 csúcsláncot azonosítunk, így a 3 él közül legalább 2 "barát" kapcsolat. Ebben a példában az állapot a "barát" élek aktuális száma; általában bármilyen DataFrame oszlop lehet.

// Find chains of 4 vertices.
val chain4 = g.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")

// Query on sequence, with state (cnt)
//  (a) Define method for updating state given the next element of the motif.
def sumFriends(cnt: Column, relationship: Column): Column = {
  when(relationship === "friend", cnt + 1).otherwise(cnt)
}
//  (b) Use sequence operation to apply method to sequence of elements in motif.
//      In this case, the elements are the 3 edges.
val condition = Seq("ab", "bc", "cd").
  foldLeft(lit(0))((cnt, e) => sumFriends(cnt, col(e)("relationship")))
//  (c) Apply filter to DataFrame.
val chainWith2Friends2 = chain4.where(condition >= 2)
display(chainWith2Friends2)

Algráfok

A GraphFrames API-kat biztosít az algráfok készítéséhez az élekre és csúcsokra való szűréssel. Ezek a szűrők összeállhatnak. Az alábbi algráf például csak olyan személyeket tartalmaz, akik barátok, és akik 30 évnél idősebbek.

// Select subgraph of users older than 30, and edges of type "friend"
val g2 = g
  .filterEdges("relationship = 'friend'")
  .filterVertices("age > 30")
  .dropIsolatedVertices()

Összetett hármas szűrők

Az alábbi példa bemutatja, hogyan választhat ki egy olyan algráfot, amely egy élen működő hármas szűrőkön és annak "src" és "dst" csúcsán alapul. A példa kiterjesztése a hármasok fölé összetettebb motívumok használatával egyszerű.

// Select subgraph based on edges "e" of type "follow"
// pointing from a younger user "a" to an older user "b".
val paths = g.find("(a)-[e]->(b)")
  .filter("e.relationship = 'follow'")
  .filter("a.age < b.age")
// "paths" contains vertex info. Extract the edges.
val e2 = paths.select("e.src", "e.dst", "e.relationship")
// In Spark 1.5+, the user may simplify this call:
//  val e2 = paths.select("e.*")

// Construct the subgraph
val g2 = GraphFrame(g.vertices, e2)
display(g2.vertices)
display(g2.edges)

Standard gráf algoritmusok

Ez a szakasz a GraphFramesbe beépített szabványos gráf algoritmusokat ismerteti.

Szélességi első keresés (BFS)

Keressen az "Esther" kifejezésre a 32 éves felhasználók < számára.

val paths: DataFrame = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32").run()
display(paths)

A keresés korlátozhatja az élszűrőket és a maximális elérési utak hosszát is.

val filteredPaths = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32")
  .edgeFilter("relationship != 'friend'")
  .maxPathLength(3)
  .run()
display(filteredPaths)

Csatlakoztatott összetevők

Számítsa ki az egyes csúcspontok csatlakoztatott összetevő-tagságát, és adjon vissza egy gráfot az egyes csúcspontokhoz hozzárendelt összetevő-azonosítóval.

val result = g.connectedComponents.run() // doesn't work on Spark 1.4
display(result)

Erősen csatlakoztatott összetevők

Számítsa ki az egyes csúcspontok erősen csatlakoztatott összetevőjét (SCC), és adjon vissza egy gráfot az adott csúcsot tartalmazó SCC-hez rendelt minden csúcsponttal.

val result = g.stronglyConnectedComponents.maxIter(10).run()
display(result.orderBy("component"))

Címkepropagálás

Futtassa a statikus címkepropagálási algoritmust a hálózatokban lévő közösségek észleléséhez.

A hálózat minden csomópontja eredetileg a saját közösségéhez van rendelve. Minden szupersztepnél a csomópontok elküldik a közösséghez való viszonyukat az összes szomszédnak, és frissítik az állapotukat a bejövő üzenetek módos közösségi kapcsolatára.

Az LPA egy szabványos közösségi észlelési algoritmus a gráfokhoz. Ez olcsó számítás, bár (1) a konvergencia nem garantált, és (2) lehet a végén triviális megoldások (minden csomópont azonosul egy közösség).

val result = g.labelPropagation.maxIter(5).run()
display(result.orderBy("label"))

Pagerank

Fontos csúcspontok azonosítása egy gráfban kapcsolatok alapján.

// Run PageRank until convergence to tolerance "tol".
val results = g.pageRank.resetProbability(0.15).tol(0.01).run()
display(results.vertices)
display(results.edges)
// Run PageRank for a fixed number of iterations.
val results2 = g.pageRank.resetProbability(0.15).maxIter(10).run()
display(results2.vertices)
// Run PageRank personalized for vertex "a"
val results3 = g.pageRank.resetProbability(0.15).maxIter(10).sourceId("a").run()
display(results3.vertices)

Legrövidebb elérési utak

Kiszámítja az adott pontpont csúcsainak legrövidebb útvonalait, ahol a nevezetességek a csúcsazonosító alapján vannak megadva.

val paths = g.shortestPaths.landmarks(Seq("a", "d")).run()
display(paths)

Háromszögek számlálása

Kiszámítja az egyes csúcspontokon áthaladó háromszögek számát.

import org.graphframes.examples
val g: GraphFrame = examples.Graphs.friends  // get example graph

val results = g.triangleCount.run()
results.select("id", "count").show()