GraphFrames 使用者指南 - Scala

本文示範 GraphFrames 使用者指南中的範例。

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.graphframes._

建立 GraphFrame

您可以從頂點和邊緣 DataFrame 建立 GraphFrame。

  • 頂點資料框架:頂點 DataFrame 應該包含名為 id 的特殊資料行,指定圖表中每個頂點的唯一識別碼。
  • Edge DataFrame:邊緣 DataFrame 應該包含兩個特殊資料行: src (邊緣) 的來源頂點識別碼,以及 dst 邊緣) 的 (目的地頂點識別碼。

這兩個 DataFrame 都可以有任意的其他資料行。 這些資料行可以代表頂點和邊緣屬性。

建立頂點和邊緣

// Vertex DataFrame
val v = spark.createDataFrame(List(
  ("a", "Alice", 34),
  ("b", "Bob", 36),
  ("c", "Charlie", 30),
  ("d", "David", 29),
  ("e", "Esther", 32),
  ("f", "Fanny", 36),
  ("g", "Gabby", 60)
)).toDF("id", "name", "age")
// Edge DataFrame
val e = spark.createDataFrame(List(
  ("a", "b", "friend"),
  ("b", "c", "follow"),
  ("c", "b", "follow"),
  ("f", "c", "follow"),
  ("e", "f", "follow"),
  ("e", "d", "friend"),
  ("d", "a", "friend"),
  ("a", "e", "friend")
)).toDF("src", "dst", "relationship")

讓我們從這些頂點和這些邊緣建立圖形:

val g = GraphFrame(v, e)
// This example graph also comes with the GraphFrames package.
// val g = examples.Graphs.friends

基本圖形和資料框架查詢

GraphFrame 提供簡單的圖形查詢,例如節點度。

此外,由於 GraphFrame 會將圖形表示為頂點和邊緣 DataFrame 配對,因此可以輕鬆地直接在頂點和邊緣 DataFrame 上進行功能強大的查詢。 這些 DataFrame 可用為 GraphFrame 中的頂點和邊緣欄位。

display(g.vertices)
display(g.edges)

頂點的傳入程度:

display(g.inDegrees)

頂點的傳出度:

display(g.outDegrees)

頂點的程度:

display(g.degrees)

您可以直接在頂點 DataFrame 上執行查詢。 例如,我們可以在圖表中找到最晚人員的年齡:

val youngest = g.vertices.groupBy().min("age")
display(youngest)

同樣地,您可以在邊緣 DataFrame 上執行查詢。 例如,讓我們計算圖表中的「追蹤」關聯性數目:

val numFollows = g.edges.filter("relationship = 'follow'").count()

尋找 Motif

使用 Motif 建立涉及邊緣和頂點的複雜關聯性。 下列儲存格會尋找頂點配對,其中兩個頂點之間有兩個方向的邊緣。 結果是 DataFrame,其中資料行名稱為 Motif 索引鍵。

如需 API 的詳細資訊,請參閱 GraphFrame 使用者指南

// Search for pairs of vertices with edges in both directions between them.
val motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)")
display(motifs)

由於結果是 DataFrame,因此您可以在 Motif 之上建置更複雜的查詢。 讓我們找出一個人早于 30 的所有相互關聯性:

val filtered = motifs.filter("b.age > 30")
display(filtered)

具狀態查詢

大部分的 Motif 查詢都是無狀態且容易表達的,如上述範例所示。 下一個範例示範更複雜的查詢,這些查詢會沿著圖案中的路徑執行狀態。 結合 GraphFrame Motif 尋找結果與篩選準則來表示這些查詢,其中篩選準則會使用序列作業來建構一系列的 DataFrame 資料行。

例如,假設您想要識別 4 個頂點的鏈結,其中某些屬性是由函式序列所定義。 也就是說,在 4 個頂點 a->b->c->d 的鏈結之間,識別符合此複雜篩選的鏈結子集:

  • 初始化路徑上的狀態。
  • 根據頂點 a 更新狀態。
  • 根據頂點 b 更新狀態。
  • 等。適用于 c 和 d。
  • 如果最終狀態符合某些條件,則篩選準則會接受鏈結。

下列程式碼片段示範此程式,其中我們識別了 4 個頂點的鏈結,讓至少 3 個邊緣的 2 個是「friend」關聯性。 在此範例中,狀態是目前「friend」 邊緣計數;一般而言,它可以是任何 DataFrame 資料行。

// Find chains of 4 vertices.
val chain4 = g.find("(a)-[ab]->(b); (b)-[bc]->(c); (c)-[cd]->(d)")

// Query on sequence, with state (cnt)
//  (a) Define method for updating state given the next element of the motif.
def sumFriends(cnt: Column, relationship: Column): Column = {
  when(relationship === "friend", cnt + 1).otherwise(cnt)
}
//  (b) Use sequence operation to apply method to sequence of elements in motif.
//      In this case, the elements are the 3 edges.
val condition = Seq("ab", "bc", "cd").
  foldLeft(lit(0))((cnt, e) => sumFriends(cnt, col(e)("relationship")))
//  (c) Apply filter to DataFrame.
val chainWith2Friends2 = chain4.where(condition >= 2)
display(chainWith2Friends2)

子圖

GraphFrames 提供 API,可藉由篩選邊緣和頂點來建置子圖形。 這些篩選準則可以組合在一起。 例如,下列子圖只包含朋友和超過 30 年的人。

// Select subgraph of users older than 30, and edges of type "friend"
val g2 = g
  .filterEdges("relationship = 'friend'")
  .filterVertices("age > 30")
  .dropIsolatedVertices()

複雜的三元篩選

下列範例示範如何根據在邊緣及其 「src」 和 「dst」 頂點上運作的三倍篩選來選取子圖。 藉由使用更複雜的 Motif,擴充此範例以超越三倍。很簡單。

// Select subgraph based on edges "e" of type "follow"
// pointing from a younger user "a" to an older user "b".
val paths = g.find("(a)-[e]->(b)")
  .filter("e.relationship = 'follow'")
  .filter("a.age < b.age")
// "paths" contains vertex info. Extract the edges.
val e2 = paths.select("e.src", "e.dst", "e.relationship")
// In Spark 1.5+, the user may simplify this call:
//  val e2 = paths.select("e.*")

// Construct the subgraph
val g2 = GraphFrame(g.vertices, e2)
display(g2.vertices)
display(g2.edges)

標準圖表演算法

本節說明 GraphFrames 內建的標準圖表演算法。

BFS) 的廣度優先搜尋 (

從 「Esther」 搜尋年齡 < 為 32 的使用者。

val paths: DataFrame = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32").run()
display(paths)

搜尋也可以限制邊緣篩選準則和路徑長度上限。

val filteredPaths = g.bfs.fromExpr("name = 'Esther'").toExpr("age < 32")
  .edgeFilter("relationship != 'friend'")
  .maxPathLength(3)
  .run()
display(filteredPaths)

連線的元件

計算每個頂點的連線元件成員資格,並傳回每個頂點指派元件識別碼的圖形。

val result = g.connectedComponents.run() // doesn't work on Spark 1.4
display(result)

強連線的元件

計算每個頂點的強連線元件 (SCC) ,並傳回指派給包含該頂點之 SCC 的每個頂點的圖表。

val result = g.stronglyConnectedComponents.maxIter(10).run()
display(result.orderBy("component"))

標籤傳播

執行靜態標籤傳播演算法來偵測網路中的社群。

網路中的每個節點一開始都會指派給自己的社群。 在每個超級步驟中,節點都會將其社群關聯傳送到所有鄰近,並將其狀態更新為傳入訊息的模式社群關聯。

LPA 是圖表的標準社群偵測演算法。 雖然 (1 個) 聚合並不保證,但 (2 個) 最後可能會有一些簡單的解決方案, (所有節點都會識別成單一社群) 。

val result = g.labelPropagation.maxIter(5).run()
display(result.orderBy("label"))

PageRank

根據連線識別圖形中的重要頂點。

// Run PageRank until convergence to tolerance "tol".
val results = g.pageRank.resetProbability(0.15).tol(0.01).run()
display(results.vertices)
display(results.edges)
// Run PageRank for a fixed number of iterations.
val results2 = g.pageRank.resetProbability(0.15).maxIter(10).run()
display(results2.vertices)
// Run PageRank personalized for vertex "a"
val results3 = g.pageRank.resetProbability(0.15).maxIter(10).sourceId("a").run()
display(results3.vertices)

最短路徑

計算指定之一組地標頂點的最短路徑,其中地標是由頂點識別碼指定。

val paths = g.shortestPaths.landmarks(Seq("a", "d")).run()
display(paths)

三角形計數

計算通過每個頂點的三角形數目。

import org.graphframes.examples
val g: GraphFrame = examples.Graphs.friends  // get example graph

val results = g.triangleCount.run()
results.select("id", "count").show()