Introduktion till datauppsättningar

Api:et för datauppsättningar ger fördelarna med RDD:er (stark skrivning, möjlighet att använda kraftfulla lambda-funktioner) med fördelarna med Spark SQL:s optimerade körningsmotor. Du kan definiera ett JVM-datauppsättningsobjekt och sedan ändra dem med hjälp av funktionella transformationer ( , , och så vidare) som map liknar en flatMapfilter RDD. Fördelarna är att dessa transformeringar nu, till skillnad från RDD:er, nu tillämpas på en strukturerad och starkt typad distribuerad samling som gör att Spark kan utnyttja Spark SQL:s körningsmotor för optimering.

Skapa en datauppsättning

Om du vill konvertera en sekvens till en datauppsättning .toDS() anropar du för sekvensen.

val dataset = Seq(1, 2, 3).toDS()
dataset.show()

Om du har en sekvens med fallklasser ger .toDS() anropet en datauppsättning med alla nödvändiga fält.

case class Person(name: String, age: Int)

val personDS = Seq(Person("Max", 33), Person("Adam", 32), Person("Muller", 62)).toDS()
personDS.show()

Skapa en datauppsättning från en RDD

Om du vill konvertera en RDD till en datauppsättning anropar du rdd.toDS() .

val rdd = sc.parallelize(Seq((1, "Spark"), (2, "Databricks")))
val integerDS = rdd.toDS()
integerDS.show()

Skapa en datauppsättning från en DataFrame

Du kan anropa df.as[SomeCaseClass] för att konvertera DataFrame till en datauppsättning.

case class Company(name: String, foundingYear: Int, numEmployees: Int)
val inputSeq = Seq(Company("ABC", 1998, 310), Company("XYZ", 1983, 904), Company("NOP", 2005, 83))
val df = sc.parallelize(inputSeq).toDF()

val companyDS = df.as[Company]
companyDS.show()

Du kan också hantera tupplar när du konverterar en DataFrame till datauppsättning utan att använda case class en .

val rdd = sc.parallelize(Seq((1, "Spark"), (2, "Databricks"), (3, "Notebook")))
val df = rdd.toDF("Id", "Name")

val dataset = df.as[(Int, String)]
dataset.show()

Arbeta med datauppsättningar

Exempel på antal ord

val wordsDataset = sc.parallelize(Seq("Spark I am your father", "May the spark be with you", "Spark I am your father")).toDS()
val groupedDataset = wordsDataset.flatMap(_.toLowerCase.split(" "))
                                 .filter(_ != "")
                                 .groupBy("value")
val countsDataset = groupedDataset.count()
countsDataset.show()

Koppla datauppsättningar

Följande exempel visar följande:

  • Union multiple datasets
  • Göra en inre koppling på ett villkor Gruppera efter en specifik kolumn
  • Gör en anpassad aggregering (genomsnitt) på den grupperade datauppsättningen.

I exemplen används endast API för datauppsättningar för att demonstrera alla tillgängliga åtgärder. I verkligheten skulle det vara enklare och snabbare att använda DataFrames för aggregering än att göra anpassad aggregering med mapGroups . I nästa avsnitt går vi in på hur du konverterar datauppsättningar till DataFrames och hur du använder DataFrames-API:et för att göra aggregeringar.

case class Employee(name: String, age: Int, departmentId: Int, salary: Double)
case class Department(id: Int, name: String)

case class Record(name: String, age: Int, salary: Double, departmentId: Int, departmentName: String)
case class ResultSet(departmentId: Int, departmentName: String, avgSalary: Double)

val employeeDataSet1 = sc.parallelize(Seq(Employee("Max", 22, 1, 100000.0), Employee("Adam", 33, 2, 93000.0), Employee("Eve", 35, 2, 89999.0), Employee("Muller", 39, 3, 120000.0))).toDS()
val employeeDataSet2 = sc.parallelize(Seq(Employee("John", 26, 1, 990000.0), Employee("Joe", 38, 3, 115000.0))).toDS()
val departmentDataSet = sc.parallelize(Seq(Department(1, "Engineering"), Department(2, "Marketing"), Department(3, "Sales"))).toDS()

val employeeDataset = employeeDataSet1.union(employeeDataSet2)

def averageSalary(key: (Int, String), iterator: Iterator[Record]): ResultSet = {
  val (total, count) = iterator.foldLeft(0.0, 0.0) {
      case ((total, count), x) => (total + x.salary, count + 1)
  }
  ResultSet(key._1, key._2, total/count)
}

val averageSalaryDataset = employeeDataset.joinWith(departmentDataSet, $"departmentId" === $"id", "inner")
                                          .map(record => Record(record._1.name, record._1.age, record._1.salary, record._1.departmentId, record._2.name))
                                          .filter(record => record.age > 25)
                                          .groupBy($"departmentId", $"departmentName")
                                          .avg()

averageSalaryDataset.show()

Konvertera en datauppsättning till en DataFrame

De två exemplen ovan hanterade användning av rena datauppsättningar-API:er. Du kan också enkelt flytta från datauppsättningar till DataFrames och utnyttja DataFrames-API:erna. I följande exempel visas ordantalsexempel som använder både datauppsättningar och DataFrames-API:er.

import org.apache.spark.sql.functions._

val wordsDataset = sc.parallelize(Seq("Spark I am your father", "May the spark be with you", "Spark I am your father")).toDS()
val result = wordsDataset
              .flatMap(_.split(" "))               // Split on whitespace
              .filter(_ != "")                     // Filter empty words
              .map(_.toLowerCase())
              .toDF()                              // Convert to DataFrame to perform aggregation / sorting
              .groupBy($"value")                   // Count number of occurrences of each word
              .agg(count("*") as "numOccurances")
              .orderBy($"numOccurances" desc)      // Show most common words first
result.show()