Présentation des jeux de donnéesIntroduction to Datasets

L’API datasets offre les avantages de RDD (typage fort, possibilité d’utiliser des fonctions lambda puissantes) avec les avantages du moteur d’exécution optimisé de Spark SQL.The Datasets API provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. Vous pouvez définir un DataSet JVM objets, puis les manipuler à l’aide de transformations fonctionnelles ( map ,, flatMap , etc filter .) similaires à un RDD.You can define a Dataset JVM objects and then manipulate them using functional transformations (map, flatMap, filter, and so on) similar to an RDD. L’avantage est que, contrairement à RDD, ces transformations sont désormais appliquées sur une collection distribuée structurée et fortement typée qui permet à Spark de tirer parti du moteur d’exécution Spark SQL pour l’optimisation.The benefits is that, unlike RDDs, these transformations are now applied on a structured and strongly typed distributed collection that allows Spark to leverage Spark SQL’s execution engine for optimization.

Créer un DataSetCreate a Dataset

Pour convertir une séquence en DataSet, appelez .toDS() sur la séquence.To convert a sequence to a Dataset, call .toDS() on the sequence.

val dataset = Seq(1, 2, 3).toDS()
dataset.show()

Si vous avez une séquence de classes de cas, .toDS() l’appel de fournit un DataSet avec tous les champs nécessaires.If you have a sequence of case classes, calling .toDS() provides a Dataset with all the necessary fields.

case class Person(name: String, age: Int)

val personDS = Seq(Person("Max", 33), Person("Adam", 32), Person("Muller", 62)).toDS()
personDS.show()

Créer un DataSet à partir d’un RDDCreate a Dataset from an RDD

Pour convertir un RDD en DataSet, appelez rdd.toDS() .To convert an RDD into a Dataset, call rdd.toDS().

val rdd = sc.parallelize(Seq((1, "Spark"), (2, "Databricks")))
val integerDS = rdd.toDS()
integerDS.show()

Créer un DataSet à partir d’un tableauCreate a Dataset from a DataFrame

Vous pouvez appeler df.as[SomeCaseClass] pour convertir le tableau en un jeu de données.You can call df.as[SomeCaseClass] to convert the DataFrame to a Dataset.

case class Company(name: String, foundingYear: Int, numEmployees: Int)
val inputSeq = Seq(Company("ABC", 1998, 310), Company("XYZ", 1983, 904), Company("NOP", 2005, 83))
val df = sc.parallelize(inputSeq).toDF()

val companyDS = df.as[Company]
companyDS.show()

Vous pouvez également gérer les tuples lors de la conversion d’un tableau en DataSet sans utiliser de case class .You can also deal with tuples while converting a DataFrame to Dataset without using a case class.

val rdd = sc.parallelize(Seq((1, "Spark"), (2, "Databricks"), (3, "Notebook")))
val df = rdd.toDF("Id", "Name")

val dataset = df.as[(Int, String)]
dataset.show()

Utiliser des jeu de donnéesWork with Datasets

Exemple de comptage de motsWord Count Example

val wordsDataset = sc.parallelize(Seq("Spark I am your father", "May the spark be with you", "Spark I am your father")).toDS()
val groupedDataset = wordsDataset.flatMap(_.toLowerCase.split(" "))
                                 .filter(_ != "")
                                 .groupBy("value")
val countsDataset = groupedDataset.count()
countsDataset.show()

Joindre des jeux de donnéesJoin Datasets

L’exemple suivant illustre ce qui suit :The following example demonstrates the following:

  • Réunir plusieurs jeux de donnéesUnion multiple datasets
  • Réalisation d’une jointure interne sur un groupe de conditions par une colonne spécifiqueDoing an inner join on a condition Group by a specific column
  • Procédez à une agrégation personnalisée (moyenne) sur le DataSet groupé.Doing a custom aggregation (average) on the grouped dataset.

Les exemples utilisent uniquement l’API datasets pour illustrer toutes les opérations disponibles.The examples uses only Datasets API to demonstrate all the operations available. En réalité, l’utilisation de trames pour l’agrégation est plus simple et plus rapide que l’agrégation personnalisée avec mapGroups .In reality, using DataFrames for doing aggregation would be simpler and faster than doing custom aggregation with mapGroups. La section suivante décrit en détail la conversion de jeux de données en trames et l’utilisation de l’API trames pour effectuer des agrégations.The next section covers the details of converting Datasets to DataFrames and using DataFrames API for doing aggregations.

case class Employee(name: String, age: Int, departmentId: Int, salary: Double)
case class Department(id: Int, name: String)

case class Record(name: String, age: Int, salary: Double, departmentId: Int, departmentName: String)
case class ResultSet(departmentId: Int, departmentName: String, avgSalary: Double)

val employeeDataSet1 = sc.parallelize(Seq(Employee("Max", 22, 1, 100000.0), Employee("Adam", 33, 2, 93000.0), Employee("Eve", 35, 2, 89999.0), Employee("Muller", 39, 3, 120000.0))).toDS()
val employeeDataSet2 = sc.parallelize(Seq(Employee("John", 26, 1, 990000.0), Employee("Joe", 38, 3, 115000.0))).toDS()
val departmentDataSet = sc.parallelize(Seq(Department(1, "Engineering"), Department(2, "Marketing"), Department(3, "Sales"))).toDS()

val employeeDataset = employeeDataSet1.union(employeeDataSet2)

def averageSalary(key: (Int, String), iterator: Iterator[Record]): ResultSet = {
  val (total, count) = iterator.foldLeft(0.0, 0.0) {
      case ((total, count), x) => (total + x.salary, count + 1)
  }
  ResultSet(key._1, key._2, total/count)
}

val averageSalaryDataset = employeeDataset.joinWith(departmentDataSet, $"departmentId" === $"id", "inner")
                                          .map(record => Record(record._1.name, record._1.age, record._1.salary, record._1.departmentId, record._2.name))
                                          .filter(record => record.age > 25)
                                          .groupBy($"departmentId", $"departmentName")
                                          .avg()

averageSalaryDataset.show()

Convertir un jeu de données en DataFrameConvert a Dataset to a DataFrame

Les deux exemples ci-dessus traitent de l’utilisation des API de jeux de données purs.The above 2 examples dealt with using pure Datasets APIs. Vous pouvez également facilement passer des jeux de données à trames et tirer parti des API trames.You can also easily move from Datasets to DataFrames and leverage the DataFrames APIs. L’exemple suivant montre l’exemple de comptage de mots qui utilise à la fois des DataSets et des API trames.The following example shows the word count example that uses both Datasets and DataFrames APIs.

import org.apache.spark.sql.functions._

val wordsDataset = sc.parallelize(Seq("Spark I am your father", "May the spark be with you", "Spark I am your father")).toDS()
val result = wordsDataset
              .flatMap(_.split(" "))               // Split on whitespace
              .filter(_ != "")                     // Filter empty words
              .map(_.toLowerCase())
              .toDF()                              // Convert to DataFrame to perform aggregation / sorting
              .groupBy($"value")                   // Count number of occurrences of each word
              .agg(count("*") as "numOccurances")
              .orderBy($"numOccurances" desc)      // Show most common words first
result.show()