資料集簡介Introduction to Datasets

資料集 API 提供 Rdd (強型別的優點,也就是使用功能強大的 lambda 函式) 具有 Spark SQL 優化執行引擎的優點。The Datasets API provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. 您可以定義資料集 JVM 物件,然後使用功能性轉換 (mapflatMapfilter 等) 類似于 RDD)來操作它們。You can define a Dataset JVM objects and then manipulate them using functional transformations (map, flatMap, filter, and so on) similar to an RDD. 好處是,與 Rdd 不同的是,這些轉換現在適用于 結構化和強 型別分散式集合,可讓 Spark 利用 spark SQL 的執行引擎進行優化。The benefits is that, unlike RDDs, these transformations are now applied on a structured and strongly typed distributed collection that allows Spark to leverage Spark SQL’s execution engine for optimization.

建立資料集Create a Dataset

若要將序列轉換成資料集,請 .toDS() 在序列上呼叫。To convert a sequence to a Dataset, call .toDS() on the sequence.

val dataset = Seq(1, 2, 3).toDS()
dataset.show()

如果您有一連串案例類別,則呼叫會 .toDS() 提供包含所有必要欄位的資料集。If you have a sequence of case classes, calling .toDS() provides a Dataset with all the necessary fields.

case class Person(name: String, age: Int)

val personDS = Seq(Person("Max", 33), Person("Adam", 32), Person("Muller", 62)).toDS()
personDS.show()

從 RDD 建立資料集Create a Dataset from an RDD

若要將 RDD 轉換成資料集,請呼叫 rdd.toDS()To convert an RDD into a Dataset, call rdd.toDS().

val rdd = sc.parallelize(Seq((1, "Spark"), (2, "Databricks")))
val integerDS = rdd.toDS()
integerDS.show()

從資料框架建立資料集Create a Dataset from a DataFrame

您可以呼叫 df.as[SomeCaseClass] ,將資料框架轉換為資料集。You can call df.as[SomeCaseClass] to convert the DataFrame to a Dataset.

case class Company(name: String, foundingYear: Int, numEmployees: Int)
val inputSeq = Seq(Company("ABC", 1998, 310), Company("XYZ", 1983, 904), Company("NOP", 2005, 83))
val df = sc.parallelize(inputSeq).toDF()

val companyDS = df.as[Company]
companyDS.show()

您也可以處理元組,並在不使用的情況下,將資料框架轉換為資料集 case classYou can also deal with tuples while converting a DataFrame to Dataset without using a case class.

val rdd = sc.parallelize(Seq((1, "Spark"), (2, "Databricks"), (3, "Notebook")))
val df = rdd.toDF("Id", "Name")

val dataset = df.as[(Int, String)]
dataset.show()

使用資料集Work with Datasets

字數統計範例Word Count Example

val wordsDataset = sc.parallelize(Seq("Spark I am your father", "May the spark be with you", "Spark I am your father")).toDS()
val groupedDataset = wordsDataset.flatMap(_.toLowerCase.split(" "))
                                 .filter(_ != "")
                                 .groupBy("value")
val countsDataset = groupedDataset.count()
countsDataset.show()

聯結資料集Join Datasets

下列範例將示範下列各項:The following example demonstrates the following:

  • 聯集多個資料集Union multiple datasets
  • 在條件群組上依特定資料行執行內部聯結Doing an inner join on a condition Group by a specific column
  • 在群組資料集上執行自訂匯總 (平均) 。Doing a custom aggregation (average) on the grouped dataset.

這些範例只會使用資料集 API 來示範所有可用的作業。The examples uses only Datasets API to demonstrate all the operations available. 實際上,使用資料框架來進行匯總,比使用自訂匯總更簡單且更快速 mapGroupsIn reality, using DataFrames for doing aggregation would be simpler and faster than doing custom aggregation with mapGroups. 下一節會詳細說明將資料集轉換成資料框架,以及使用資料框架 API 來進行匯總的詳細資料。The next section covers the details of converting Datasets to DataFrames and using DataFrames API for doing aggregations.

case class Employee(name: String, age: Int, departmentId: Int, salary: Double)
case class Department(id: Int, name: String)

case class Record(name: String, age: Int, salary: Double, departmentId: Int, departmentName: String)
case class ResultSet(departmentId: Int, departmentName: String, avgSalary: Double)

val employeeDataSet1 = sc.parallelize(Seq(Employee("Max", 22, 1, 100000.0), Employee("Adam", 33, 2, 93000.0), Employee("Eve", 35, 2, 89999.0), Employee("Muller", 39, 3, 120000.0))).toDS()
val employeeDataSet2 = sc.parallelize(Seq(Employee("John", 26, 1, 990000.0), Employee("Joe", 38, 3, 115000.0))).toDS()
val departmentDataSet = sc.parallelize(Seq(Department(1, "Engineering"), Department(2, "Marketing"), Department(3, "Sales"))).toDS()

val employeeDataset = employeeDataSet1.union(employeeDataSet2)

def averageSalary(key: (Int, String), iterator: Iterator[Record]): ResultSet = {
  val (total, count) = iterator.foldLeft(0.0, 0.0) {
      case ((total, count), x) => (total + x.salary, count + 1)
  }
  ResultSet(key._1, key._2, total/count)
}

val averageSalaryDataset = employeeDataset.joinWith(departmentDataSet, $"departmentId" === $"id", "inner")
                                          .map(record => Record(record._1.name, record._1.age, record._1.salary, record._1.departmentId, record._2.name))
                                          .filter(record => record.age > 25)
                                          .groupBy($"departmentId", $"departmentName")
                                          .avg()

averageSalaryDataset.show()

將資料集轉換成資料框架Convert a Dataset to a DataFrame

上述2個範例會使用純資料集 Api 來處理。The above 2 examples dealt with using pure Datasets APIs. 您也可以輕鬆地從資料集移至資料框架,並利用資料框架 Api。You can also easily move from Datasets to DataFrames and leverage the DataFrames APIs. 下列範例顯示同時使用資料集和資料框架 Api 的字數統計範例。The following example shows the word count example that uses both Datasets and DataFrames APIs.

import org.apache.spark.sql.functions._

val wordsDataset = sc.parallelize(Seq("Spark I am your father", "May the spark be with you", "Spark I am your father")).toDS()
val result = wordsDataset
              .flatMap(_.split(" "))               // Split on whitespace
              .filter(_ != "")                     // Filter empty words
              .map(_.toLowerCase())
              .toDF()                              // Convert to DataFrame to perform aggregation / sorting
              .groupBy($"value")                   // Count number of occurrences of each word
              .agg(count("*") as "numOccurances")
              .orderBy($"numOccurances" desc)      // Show most common words first
result.show()