数据集简介Introduction to Datasets

数据集 API 提供 Rdd (强类型化的优势,能够使用功能强大的 lambda 函数) 具有 Spark SQL 的优化执行引擎的优势。The Datasets API provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. 您可以定义数据集 JVM 对象,然后使用函数转换 (map 、、等 flatMap filter) 类似于 RDD。You can define a Dataset JVM objects and then manipulate them using functional transformations (map, flatMap, filter, and so on) similar to an RDD. 其优势在于,与 Rdd 不同的是,这些转换现在适用于_结构化的强类型_化分布式集合,使 spark 能够利用 spark SQL 的执行引擎进行优化。The benefits is that, unlike RDDs, these transformations are now applied on a structured and strongly typed distributed collection that allows Spark to leverage Spark SQL’s execution engine for optimization.

创建数据集Create a Dataset

若要将序列转换为数据集,请 .toDS() 对序列调用。To convert a sequence to a Dataset, call .toDS() on the sequence.

val dataset = Seq(1, 2, 3).toDS()
dataset.show()

如果有一系列事例类,则调用会为 .toDS() 数据集提供所有必需的字段。If you have a sequence of case classes, calling .toDS() provides a Dataset with all the necessary fields.

case class Person(name: String, age: Int)

val personDS = Seq(Person("Max", 33), Person("Adam", 32), Person("Muller", 62)).toDS()
personDS.show()

从 RDD 创建数据集Create a Dataset from an RDD

若要将 RDD 转换为数据集,请调用 rdd.toDS()To convert an RDD into a Dataset, call rdd.toDS().

val rdd = sc.parallelize(Seq((1, "Spark"), (2, "Databricks")))
val integerDS = rdd.toDS()
integerDS.show()

从数据帧创建数据集Create a Dataset from a DataFrame

可以调用 df.as[SomeCaseClass] 将数据帧转换为数据集。You can call df.as[SomeCaseClass] to convert the DataFrame to a Dataset.

case class Company(name: String, foundingYear: Int, numEmployees: Int)
val inputSeq = Seq(Company("ABC", 1998, 310), Company("XYZ", 1983, 904), Company("NOP", 2005, 83))
val df = sc.parallelize(inputSeq).toDF()

val companyDS = df.as[Company]
companyDS.show()

在将数据帧转换为 Dataset 时,还可以处理元组,而无需使用 case classYou can also deal with tuples while converting a DataFrame to Dataset without using a case class.

val rdd = sc.parallelize(Seq((1, "Spark"), (2, "Databricks"), (3, "Notebook")))
val df = rdd.toDF("Id", "Name")

val dataset = df.as[(Int, String)]
dataset.show()

使用数据集Work with Datasets

字数统计示例Word Count Example

val wordsDataset = sc.parallelize(Seq("Spark I am your father", "May the spark be with you", "Spark I am your father")).toDS()
val groupedDataset = wordsDataset.flatMap(_.toLowerCase.split(" "))
                                 .filter(_ != "")
                                 .groupBy("value")
val countsDataset = groupedDataset.count()
countsDataset.show()

联接数据集Join Datasets

下面的示例演示了以下内容:The following example demonstrates the following:

  • 联合多个数据集Union multiple datasets
  • 按特定列对条件组执行内部联接Doing an inner join on a condition Group by a specific column
  • 对分组的数据集执行自定义聚合 (平均) 。Doing a custom aggregation (average) on the grouped dataset.

这些示例仅使用数据集 API 来演示所有可用的操作。The examples uses only Datasets API to demonstrate all the operations available. 事实上,使用 DataFrames 进行聚合比使用自定义聚合更为简单快捷 mapGroupsIn reality, using DataFrames for doing aggregation would be simpler and faster than doing custom aggregation with mapGroups. 下一部分介绍了将数据集转换为 DataFrames 并使用 DataFrames API 进行聚合的详细信息。The next section covers the details of converting Datasets to DataFrames and using DataFrames API for doing aggregations.

case class Employee(name: String, age: Int, departmentId: Int, salary: Double)
case class Department(id: Int, name: String)

case class Record(name: String, age: Int, salary: Double, departmentId: Int, departmentName: String)
case class ResultSet(departmentId: Int, departmentName: String, avgSalary: Double)

val employeeDataSet1 = sc.parallelize(Seq(Employee("Max", 22, 1, 100000.0), Employee("Adam", 33, 2, 93000.0), Employee("Eve", 35, 2, 89999.0), Employee("Muller", 39, 3, 120000.0))).toDS()
val employeeDataSet2 = sc.parallelize(Seq(Employee("John", 26, 1, 990000.0), Employee("Joe", 38, 3, 115000.0))).toDS()
val departmentDataSet = sc.parallelize(Seq(Department(1, "Engineering"), Department(2, "Marketing"), Department(3, "Sales"))).toDS()

val employeeDataset = employeeDataSet1.union(employeeDataSet2)

def averageSalary(key: (Int, String), iterator: Iterator[Record]): ResultSet = {
  val (total, count) = iterator.foldLeft(0.0, 0.0) {
      case ((total, count), x) => (total + x.salary, count + 1)
  }
  ResultSet(key._1, key._2, total/count)
}

val averageSalaryDataset = employeeDataset.joinWith(departmentDataSet, $"departmentId" === $"id", "inner")
                                          .map(record => Record(record._1.name, record._1.age, record._1.salary, record._1.departmentId, record._2.name))
                                          .filter(record => record.age > 25)
                                          .groupBy($"departmentId", $"departmentName")
                                          .avg()

averageSalaryDataset.show()

将数据集转换为数据帧Convert a Dataset to a DataFrame

上述2个示例使用纯数据集 Api 处理。The above 2 examples dealt with using pure Datasets APIs. 你还可以轻松地从数据集迁移到 DataFrames 并利用 DataFrames Api。You can also easily move from Datasets to DataFrames and leverage the DataFrames APIs. 下面的示例演示使用数据集和 DataFrames Api 的单词计数示例。The following example shows the word count example that uses both Datasets and DataFrames APIs.

import org.apache.spark.sql.functions._

val wordsDataset = sc.parallelize(Seq("Spark I am your father", "May the spark be with you", "Spark I am your father")).toDS()
val result = wordsDataset
              .flatMap(_.split(" "))               // Split on whitespace
              .filter(_ != "")                     // Filter empty words
              .map(_.toLowerCase())
              .toDF()                              // Convert to DataFrame to perform aggregation / sorting
              .groupBy($"value")                   // Count number of occurrences of each word
              .agg(count("*") as "numOccurances")
              .orderBy($"numOccurances" desc)      // Show most common words first
result.show()