Présentation des jeux de données

l’API datasets offre les avantages de rdd (typage fort, possibilité d’utiliser des fonctions lambda puissantes) avec les avantages du moteur d’exécution optimisé d’Spark SQL. Vous pouvez définir un DataSet JVM objets, puis les manipuler à l’aide de transformations fonctionnelles ( map ,, flatMap , etc filter .) similaires à un RDD. l’avantage est que, contrairement à rdd, ces transformations sont désormais appliquées sur une collection distribuée structurée et fortement typée qui permet à spark de tirer parti du moteur d’exécution de spark SQL pour l’optimisation.

Créer un DataSet

Pour convertir une séquence en DataSet, appelez .toDS() sur la séquence.

val dataset = Seq(1, 2, 3).toDS()
dataset.show()

Si vous avez une séquence de classes de cas, .toDS() l’appel de fournit un DataSet avec tous les champs nécessaires.

case class Person(name: String, age: Int)

val personDS = Seq(Person("Max", 33), Person("Adam", 32), Person("Muller", 62)).toDS()
personDS.show()

Créer un DataSet à partir d’un RDD

Pour convertir un RDD en DataSet, appelez rdd.toDS() .

val rdd = sc.parallelize(Seq((1, "Spark"), (2, "Databricks")))
val integerDS = rdd.toDS()
integerDS.show()

Créer un DataSet à partir d’un tableau

Vous pouvez appeler df.as[SomeCaseClass] pour convertir le tableau en un jeu de données.

case class Company(name: String, foundingYear: Int, numEmployees: Int)
val inputSeq = Seq(Company("ABC", 1998, 310), Company("XYZ", 1983, 904), Company("NOP", 2005, 83))
val df = sc.parallelize(inputSeq).toDF()

val companyDS = df.as[Company]
companyDS.show()

Vous pouvez également gérer les tuples lors de la conversion d’un tableau en DataSet sans utiliser de case class .

val rdd = sc.parallelize(Seq((1, "Spark"), (2, "Databricks"), (3, "Notebook")))
val df = rdd.toDF("Id", "Name")

val dataset = df.as[(Int, String)]
dataset.show()

Utiliser des jeu de données

Exemple de comptage de mots

val wordsDataset = sc.parallelize(Seq("Spark I am your father", "May the spark be with you", "Spark I am your father")).toDS()
val groupedDataset = wordsDataset.flatMap(_.toLowerCase.split(" "))
                                 .filter(_ != "")
                                 .groupBy("value")
val countsDataset = groupedDataset.count()
countsDataset.show()

Joindre des jeux de données

L’exemple suivant illustre ce qui suit :

  • Réunir plusieurs jeux de données
  • Réalisation d’une jointure interne sur un groupe de conditions par une colonne spécifique
  • Procédez à une agrégation personnalisée (moyenne) sur le DataSet groupé.

Les exemples utilisent uniquement l’API datasets pour illustrer toutes les opérations disponibles. En réalité, l’utilisation de trames pour l’agrégation est plus simple et plus rapide que l’agrégation personnalisée avec mapGroups . La section suivante décrit en détail la conversion de jeux de données en trames et l’utilisation de l’API trames pour effectuer des agrégations.

case class Employee(name: String, age: Int, departmentId: Int, salary: Double)
case class Department(id: Int, name: String)

case class Record(name: String, age: Int, salary: Double, departmentId: Int, departmentName: String)
case class ResultSet(departmentId: Int, departmentName: String, avgSalary: Double)

val employeeDataSet1 = sc.parallelize(Seq(Employee("Max", 22, 1, 100000.0), Employee("Adam", 33, 2, 93000.0), Employee("Eve", 35, 2, 89999.0), Employee("Muller", 39, 3, 120000.0))).toDS()
val employeeDataSet2 = sc.parallelize(Seq(Employee("John", 26, 1, 990000.0), Employee("Joe", 38, 3, 115000.0))).toDS()
val departmentDataSet = sc.parallelize(Seq(Department(1, "Engineering"), Department(2, "Marketing"), Department(3, "Sales"))).toDS()

val employeeDataset = employeeDataSet1.union(employeeDataSet2)

def averageSalary(key: (Int, String), iterator: Iterator[Record]): ResultSet = {
  val (total, count) = iterator.foldLeft(0.0, 0.0) {
      case ((total, count), x) => (total + x.salary, count + 1)
  }
  ResultSet(key._1, key._2, total/count)
}

val averageSalaryDataset = employeeDataset.joinWith(departmentDataSet, $"departmentId" === $"id", "inner")
                                          .map(record => Record(record._1.name, record._1.age, record._1.salary, record._1.departmentId, record._2.name))
                                          .filter(record => record.age > 25)
                                          .groupBy($"departmentId", $"departmentName")
                                          .avg()

averageSalaryDataset.show()

Convertir un jeu de données en DataFrame

Les deux exemples ci-dessus traitent de l’utilisation des API de jeux de données purs. Vous pouvez également facilement passer des jeux de données à trames et tirer parti des API trames. L’exemple suivant montre l’exemple de comptage de mots qui utilise à la fois des DataSets et des API trames.

import org.apache.spark.sql.functions._

val wordsDataset = sc.parallelize(Seq("Spark I am your father", "May the spark be with you", "Spark I am your father")).toDS()
val result = wordsDataset
              .flatMap(_.split(" "))               // Split on whitespace
              .filter(_ != "")                     // Filter empty words
              .map(_.toLowerCase())
              .toDF()                              // Convert to DataFrame to perform aggregation / sorting
              .groupBy($"value")                   // Count number of occurrences of each word
              .agg(count("*") as "numOccurances")
              .orderBy($"numOccurances" desc)      // Show most common words first
result.show()