Einführung in DatasetsIntroduction to Datasets

Die Datasets-API bietet die Vorteile von RDDS (starke Typisierung, Fähigkeit zur Verwendung leistungsfähiger Lambda-Funktionen) mit den Vorteilen der optimierten Ausführungs-Engine von Spark SQL.The Datasets API provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. Sie können ein JVM-DataSet-Objekt definieren und anschließend mithilfe von funktionalen Transformationen ( map , flatMap , filter usw.) bearbeiten, ähnlich wie bei einer RDD.You can define a Dataset JVM objects and then manipulate them using functional transformations (map, flatMap, filter, and so on) similar to an RDD. Der Vorteil besteht darin, dass diese Transformationen im Gegensatz zu RDDS nun auf eine strukturierte und stark typisierte verteilte Auflistung angewendet werden, mit der Spark die Ausführungs-Engine von Spark SQL für die Optimierung nutzen kann.The benefits is that, unlike RDDs, these transformations are now applied on a structured and strongly typed distributed collection that allows Spark to leverage Spark SQL’s execution engine for optimization.

Erstellen eines DatasetsCreate a Dataset

Um eine Sequenz in ein DataSet zu konvertieren, müssen Sie .toDS() für die Sequenz aufzurufen.To convert a sequence to a Dataset, call .toDS() on the sequence.

val dataset = Seq(1, 2, 3).toDS()
dataset.show()

Wenn Sie eine Sequenz von Case-Klassen haben, wird beim Aufrufen von .toDS() ein DataSet mit allen erforderlichen Feldern bereitstellt.If you have a sequence of case classes, calling .toDS() provides a Dataset with all the necessary fields.

case class Person(name: String, age: Int)

val personDS = Seq(Person("Max", 33), Person("Adam", 32), Person("Muller", 62)).toDS()
personDS.show()

Erstellen eines Datasets aus einer RDDCreate a Dataset from an RDD

Um einen rdd in ein DataSet zu konvertieren, wird aufgerufen rdd.toDS() .To convert an RDD into a Dataset, call rdd.toDS().

val rdd = sc.parallelize(Seq((1, "Spark"), (2, "Databricks")))
val integerDS = rdd.toDS()
integerDS.show()

Erstellen eines Datasets aus einem dataframeCreate a Dataset from a DataFrame

Sie können aufzurufen df.as[SomeCaseClass] , um den dataframe in ein DataSet zu konvertieren.You can call df.as[SomeCaseClass] to convert the DataFrame to a Dataset.

case class Company(name: String, foundingYear: Int, numEmployees: Int)
val inputSeq = Seq(Company("ABC", 1998, 310), Company("XYZ", 1983, 904), Company("NOP", 2005, 83))
val df = sc.parallelize(inputSeq).toDF()

val companyDS = df.as[Company]
companyDS.show()

Sie können auch Tupel behandeln, während Sie einen dataframe ohne Verwendung eines in ein DataSet umwandelt case class .You can also deal with tuples while converting a DataFrame to Dataset without using a case class.

val rdd = sc.parallelize(Seq((1, "Spark"), (2, "Databricks"), (3, "Notebook")))
val df = rdd.toDF("Id", "Name")

val dataset = df.as[(Int, String)]
dataset.show()

Arbeiten mit DatasetsWork with Datasets

Wort Anzahl BeispielWord Count Example

val wordsDataset = sc.parallelize(Seq("Spark I am your father", "May the spark be with you", "Spark I am your father")).toDS()
val groupedDataset = wordsDataset.flatMap(_.toLowerCase.split(" "))
                                 .filter(_ != "")
                                 .groupBy("value")
val countsDataset = groupedDataset.count()
countsDataset.show()

Verknüpfen von DatasetsJoin Datasets

Im folgenden Beispiel wird Folgendes veranschaulicht:The following example demonstrates the following:

  • Union Multiple DatasetsUnion multiple datasets
  • Durchführung eines inneren Joins in einer Bedingungs Gruppe durch eine bestimmte SpalteDoing an inner join on a condition Group by a specific column
  • Durchführung einer benutzerdefinierten Aggregation (Durchschnitt) für das gruppierte DataSet.Doing a custom aggregation (average) on the grouped dataset.

In den Beispielen wird nur die Datasets-API verwendet, um alle verfügbaren Vorgänge zu veranschaulichen.The examples uses only Datasets API to demonstrate all the operations available. In der Realität wäre die Verwendung von dataframes zum Durchsetzen von Aggregationen einfacher und schneller als eine benutzerdefinierte Aggregation mit mapGroups .In reality, using DataFrames for doing aggregation would be simpler and faster than doing custom aggregation with mapGroups. Der nächste Abschnitt behandelt die Details zum Umrechnen von Datasets in dataframes und zum Verwenden der dataframes-API zum durch nehmen von Aggregationen.The next section covers the details of converting Datasets to DataFrames and using DataFrames API for doing aggregations.

case class Employee(name: String, age: Int, departmentId: Int, salary: Double)
case class Department(id: Int, name: String)

case class Record(name: String, age: Int, salary: Double, departmentId: Int, departmentName: String)
case class ResultSet(departmentId: Int, departmentName: String, avgSalary: Double)

val employeeDataSet1 = sc.parallelize(Seq(Employee("Max", 22, 1, 100000.0), Employee("Adam", 33, 2, 93000.0), Employee("Eve", 35, 2, 89999.0), Employee("Muller", 39, 3, 120000.0))).toDS()
val employeeDataSet2 = sc.parallelize(Seq(Employee("John", 26, 1, 990000.0), Employee("Joe", 38, 3, 115000.0))).toDS()
val departmentDataSet = sc.parallelize(Seq(Department(1, "Engineering"), Department(2, "Marketing"), Department(3, "Sales"))).toDS()

val employeeDataset = employeeDataSet1.union(employeeDataSet2)

def averageSalary(key: (Int, String), iterator: Iterator[Record]): ResultSet = {
  val (total, count) = iterator.foldLeft(0.0, 0.0) {
      case ((total, count), x) => (total + x.salary, count + 1)
  }
  ResultSet(key._1, key._2, total/count)
}

val averageSalaryDataset = employeeDataset.joinWith(departmentDataSet, $"departmentId" === $"id", "inner")
                                          .map(record => Record(record._1.name, record._1.age, record._1.salary, record._1.departmentId, record._2.name))
                                          .filter(record => record.age > 25)
                                          .groupBy($"departmentId", $"departmentName")
                                          .avg()

averageSalaryDataset.show()

Konvertieren eines Datasets in einen DataFrameConvert a Dataset to a DataFrame

In den obigen zwei Beispielen wurde die Verwendung von reinen Datasets-APIs behandelt.The above 2 examples dealt with using pure Datasets APIs. Sie können auch problemlos von Datasets zu dataframes wechseln und die dataframes-APIs nutzen.You can also easily move from Datasets to DataFrames and leverage the DataFrames APIs. Das folgende Beispiel zeigt das Wort count-Beispiel, in dem sowohl Datasets als auch dataframes-APIs verwendet werden.The following example shows the word count example that uses both Datasets and DataFrames APIs.

import org.apache.spark.sql.functions._

val wordsDataset = sc.parallelize(Seq("Spark I am your father", "May the spark be with you", "Spark I am your father")).toDS()
val result = wordsDataset
              .flatMap(_.split(" "))               // Split on whitespace
              .filter(_ != "")                     // Filter empty words
              .map(_.toLowerCase())
              .toDF()                              // Convert to DataFrame to perform aggregation / sorting
              .groupBy($"value")                   // Count number of occurrences of each word
              .agg(count("*") as "numOccurances")
              .orderBy($"numOccurances" desc)      // Show most common words first
result.show()