Datasets の概要Introduction to Datasets

データセット API は、Spark SQL の最適化された実行エンジンの利点と共に、RDDs (厳密な型指定、強力なラムダ関数を使用する機能) の利点を提供します。The Datasets API provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. RDD と同様に、データセット JVM オブジェクトを定義し、関数型変換 (mapflatMapfilter など) を使用して操作することができます。You can define a Dataset JVM objects and then manipulate them using functional transformations (map, flatMap, filter, and so on) similar to an RDD. その利点は、RDDs とは異なり、これらの変換は、_厳密に型指定_された構造化コレクションに適用されるようになりました。これにより、Spark は spark SQL の実行エンジンを活用して最適化を行うことができます。The benefits is that, unlike RDDs, these transformations are now applied on a structured and strongly typed distributed collection that allows Spark to leverage Spark SQL’s execution engine for optimization.

データセットを作成するCreate a Dataset

シーケンスをデータセットに変換するには、シーケンスの .toDS() を呼び出します。To convert a sequence to a Dataset, call .toDS() on the sequence.

val dataset = Seq(1, 2, 3).toDS()
dataset.show()

ケースクラスのシーケンスがある場合、.toDS() を呼び出すと、必要なすべてのフィールドを含むデータセットが提供されます。If you have a sequence of case classes, calling .toDS() provides a Dataset with all the necessary fields.

case class Person(name: String, age: Int)

val personDS = Seq(Person("Max", 33), Person("Adam", 32), Person("Muller", 62)).toDS()
personDS.show()

RDD からデータセットを作成するCreate a Dataset from an RDD

RDD をデータセットに変換するには、rdd.toDS() を呼び出します。To convert an RDD into a Dataset, call rdd.toDS().

val rdd = sc.parallelize(Seq((1, "Spark"), (2, "Databricks")))
val integerDS = rdd.toDS()
integerDS.show()

データフレームからデータセットを作成するCreate a Dataset from a DataFrame

@No__t_0 を呼び出して、データフレームをデータセットに変換できます。You can call df.as[SomeCaseClass] to convert the DataFrame to a Dataset.

case class Company(name: String, foundingYear: Int, numEmployees: Int)
val inputSeq = Seq(Company("ABC", 1998, 310), Company("XYZ", 1983, 904), Company("NOP", 2005, 83))
val df = sc.parallelize(inputSeq).toDF()

val companyDS = df.as[Company]
companyDS.show()

また、case class を使用せずにデータフレームを Dataset に変換するときに、組を扱うこともできます。You can also deal with tuples while converting a DataFrame to Dataset without using a case class.

val rdd = sc.parallelize(Seq((1, "Spark"), (2, "Databricks"), (3, "Notebook")))
val df = rdd.toDF("Id", "Name")

val dataset = df.as[(Int, String)]
dataset.show()

データセットの操作Work with Datasets

ワードカウントの例Word Count Example

val wordsDataset = sc.parallelize(Seq("Spark I am your father", "May the spark be with you", "Spark I am your father")).toDS()
val groupedDataset = wordsDataset.flatMap(_.toLowerCase.split(" "))
                                 .filter(_ != "")
                                 .groupBy("value")
val countsDataset = groupedDataset.count()
countsDataset.show()

データセットの結合Join Datasets

次の例を次に示します。The following example demonstrates the following:

  • 複数のデータセットの結合Union multiple datasets
  • 特定の列による条件グループでの内部結合の実行Doing an inner join on a condition Group by a specific column
  • グループ化されたデータセットに対してカスタム集計 (平均) を実行します。Doing a custom aggregation (average) on the grouped dataset.

この例では、データセット API のみを使用して、使用可能なすべての操作を示しています。The examples uses only Datasets API to demonstrate all the operations available. 実際には、集計の実行に DataFrames 使用する方が、mapGroups を使用したカスタム集計よりも簡単で高速になります。In reality, using DataFrames for doing aggregation would be simpler and faster than doing custom aggregation with mapGroups. 次のセクションでは、データセットを dataframes 変換する方法と、集計を実行するために DataFrames API を使用する方法の詳細について説明します。The next section covers the details of converting Datasets to DataFrames and using DataFrames API for doing aggregations.

case class Employee(name: String, age: Int, departmentId: Int, salary: Double)
case class Department(id: Int, name: String)

case class Record(name: String, age: Int, salary: Double, departmentId: Int, departmentName: String)
case class ResultSet(departmentId: Int, departmentName: String, avgSalary: Double)

val employeeDataSet1 = sc.parallelize(Seq(Employee("Max", 22, 1, 100000.0), Employee("Adam", 33, 2, 93000.0), Employee("Eve", 35, 2, 89999.0), Employee("Muller", 39, 3, 120000.0))).toDS()
val employeeDataSet2 = sc.parallelize(Seq(Employee("John", 26, 1, 990000.0), Employee("Joe", 38, 3, 115000.0))).toDS()
val departmentDataSet = sc.parallelize(Seq(Department(1, "Engineering"), Department(2, "Marketing"), Department(3, "Sales"))).toDS()

val employeeDataset = employeeDataSet1.union(employeeDataSet2)

def averageSalary(key: (Int, String), iterator: Iterator[Record]): ResultSet = {
  val (total, count) = iterator.foldLeft(0.0, 0.0) {
      case ((total, count), x) => (total + x.salary, count + 1)
  }
  ResultSet(key._1, key._2, total/count)
}

val averageSalaryDataset = employeeDataset.joinWith(departmentDataSet, $"departmentId" === $"id", "inner")
                                          .map(record => Record(record._1.name, record._1.age, record._1.salary, record._1.departmentId, record._2.name))
                                          .filter(record => record.age > 25)
                                          .groupBy($"departmentId", $"departmentName")
                                          .avg()

averageSalaryDataset.show()

データセットをデータフレームに変換するConvert a Dataset to a DataFrame

上の2つの例は、純粋なデータセット Api を使用して処理されています。The above 2 examples dealt with using pure Datasets APIs. また、データセットから dataframes 簡単に移動し、dataframes Api を活用することもできます。You can also easily move from Datasets to DataFrames and leverage the DataFrames APIs. 次の例は、データセットと DataFrames Api の両方を使用するワードカウントの例を示しています。The below example shows the word count example that uses both Datasets and DataFrames APIs.

import org.apache.spark.sql.functions._

val wordsDataset = sc.parallelize(Seq("Spark I am your father", "May the spark be with you", "Spark I am your father")).toDS()
val result = wordsDataset
              .flatMap(_.split(" "))               // Split on whitespace
              .filter(_ != "")                     // Filter empty words
              .map(_.toLowerCase())
              .toDF()                              // Convert to DataFrame to perform aggregation / sorting
              .groupBy($"value")                   // Count number of occurrences of each word
              .agg(count("*") as "numOccurances")
              .orderBy($"numOccurances" desc)      // Show most common words first
result.show()