Inleiding tot gegevenssets

De API voor gegevenssets biedt de voordelen van RDD's (sterk typen, de mogelijkheid om krachtige lambda-functies te gebruiken) met de voordelen van de geoptimaliseerde uitvoeringsen engine van Spark SQL. U kunt een JVM-gegevenssetobjecten definiƫren en deze vervolgens bewerken met behulp van functionele transformaties ( , , , bijvoorbeeld ) die vergelijkbaar zijn met map flatMap een filter RDD. De voordelen zijn dat, in tegenstelling tot RDD's, deze transformaties nu worden toegepast op een gestructureerde en sterk getypeerd gedistribueerde verzameling waarmee Spark gebruik kan maken van de uitvoeringsen engine van Spark SQL voor optimalisatie.

Een gegevensset maken

Als u een reeks wilt converteren naar een gegevensset, roept u .toDS() aan voor de reeks.

val dataset = Seq(1, 2, 3).toDS()
dataset.show()

Als u een reeks caseklassen hebt, biedt het aanroepen .toDS() van een gegevensset alle benodigde velden.

case class Person(name: String, age: Int)

val personDS = Seq(Person("Max", 33), Person("Adam", 32), Person("Muller", 62)).toDS()
personDS.show()

Een gegevensset maken op basis van een RDD

Als u een RDD wilt converteren naar een gegevensset, roept u rdd.toDS() aan.

val rdd = sc.parallelize(Seq((1, "Spark"), (2, "Databricks")))
val integerDS = rdd.toDS()
integerDS.show()

Een gegevensset maken op basis van een DataFrame

U kunt df.as[SomeCaseClass] aanroepen om het DataFrame te converteren naar een gegevensset.

case class Company(name: String, foundingYear: Int, numEmployees: Int)
val inputSeq = Seq(Company("ABC", 1998, 310), Company("XYZ", 1983, 904), Company("NOP", 2005, 83))
val df = sc.parallelize(inputSeq).toDF()

val companyDS = df.as[Company]
companyDS.show()

U kunt ook tuples verwerken tijdens het converteren van een DataFrame naar een gegevensset zonder een case class te gebruiken.

val rdd = sc.parallelize(Seq((1, "Spark"), (2, "Databricks"), (3, "Notebook")))
val df = rdd.toDF("Id", "Name")

val dataset = df.as[(Int, String)]
dataset.show()

Werken met gegevenssets

Voorbeeld van aantal woorden

val wordsDataset = sc.parallelize(Seq("Spark I am your father", "May the spark be with you", "Spark I am your father")).toDS()
val groupedDataset = wordsDataset.flatMap(_.toLowerCase.split(" "))
                                 .filter(_ != "")
                                 .groupBy("value")
val countsDataset = groupedDataset.count()
countsDataset.show()

Gegevenssets lid maken

In het volgende voorbeeld wordt het volgende gedemonstreerd:

  • Meerdere gegevenssets samen te brengen
  • Een inner join op een voorwaarde Groep op een specifieke kolom
  • Een aangepaste aggregatie (gemiddeld) op de gegroepeerde gegevensset.

In de voorbeelden wordt alleen de API voor gegevenssets gebruikt om alle beschikbare bewerkingen te demonstreren. In werkelijkheid is het gebruik van DataFrames voor aggregatie eenvoudiger en sneller dan aangepaste aggregatie met mapGroups . De volgende sectie bevat informatie over het converteren van gegevenssets naar DataFrames en het gebruik van de DataFrames-API voor het doen van aggregaties.

case class Employee(name: String, age: Int, departmentId: Int, salary: Double)
case class Department(id: Int, name: String)

case class Record(name: String, age: Int, salary: Double, departmentId: Int, departmentName: String)
case class ResultSet(departmentId: Int, departmentName: String, avgSalary: Double)

val employeeDataSet1 = sc.parallelize(Seq(Employee("Max", 22, 1, 100000.0), Employee("Adam", 33, 2, 93000.0), Employee("Eve", 35, 2, 89999.0), Employee("Muller", 39, 3, 120000.0))).toDS()
val employeeDataSet2 = sc.parallelize(Seq(Employee("John", 26, 1, 990000.0), Employee("Joe", 38, 3, 115000.0))).toDS()
val departmentDataSet = sc.parallelize(Seq(Department(1, "Engineering"), Department(2, "Marketing"), Department(3, "Sales"))).toDS()

val employeeDataset = employeeDataSet1.union(employeeDataSet2)

def averageSalary(key: (Int, String), iterator: Iterator[Record]): ResultSet = {
  val (total, count) = iterator.foldLeft(0.0, 0.0) {
      case ((total, count), x) => (total + x.salary, count + 1)
  }
  ResultSet(key._1, key._2, total/count)
}

val averageSalaryDataset = employeeDataset.joinWith(departmentDataSet, $"departmentId" === $"id", "inner")
                                          .map(record => Record(record._1.name, record._1.age, record._1.salary, record._1.departmentId, record._2.name))
                                          .filter(record => record.age > 25)
                                          .groupBy($"departmentId", $"departmentName")
                                          .avg()

averageSalaryDataset.show()

Een gegevensset converteren naar een DataFrame

In de bovenstaande twee voorbeelden werd het gebruik van pure gegevensset-API's behandeld. U kunt ook eenvoudig van gegevenssets naar DataFrames gaan en gebruikmaken van de DataFrames-API's. In het volgende voorbeeld ziet u het voorbeeld van het aantal woorden dat gebruikmaakt van gegevenssets en DataFrames-API's.

import org.apache.spark.sql.functions._

val wordsDataset = sc.parallelize(Seq("Spark I am your father", "May the spark be with you", "Spark I am your father")).toDS()
val result = wordsDataset
              .flatMap(_.split(" "))               // Split on whitespace
              .filter(_ != "")                     // Filter empty words
              .map(_.toLowerCase())
              .toDF()                              // Convert to DataFrame to perform aggregation / sorting
              .groupBy($"value")                   // Count number of occurrences of each word
              .agg(count("*") as "numOccurances")
              .orderBy($"numOccurances" desc)      // Show most common words first
result.show()