Introduktion till datauppsättningar

API: erna för data uppsättningar ger fördelarna med RDD (stark inmatning, möjlighet att använda kraftfulla lambda-funktioner) med fördelarna med Spark SQL: s optimerade körnings motor. Du kan definiera en data uppsättnings JVM objekt och sedan ändra dem med hjälp av funktionella omvandlingar ( map ,, flatMap filter och så vidare) som liknar en RDD. Fördelarna är att dessa transformeringar nu används på en strukturerad och starkt distribuerad samling som gör det möjligt för Spark att utnyttja Spark sqls körnings motor för optimering, till skillnad från RDD.

Skapa en data uppsättning

Om du vill konvertera en sekvens till en data uppsättning, anrop .toDS() du till sekvensen.

val dataset = Seq(1, 2, 3).toDS()
dataset.show()

Om du har en sekvens av Case-klasser .toDS() ger anrop en data uppsättning med alla nödvändiga fält.

case class Person(name: String, age: Int)

val personDS = Seq(Person("Max", 33), Person("Adam", 32), Person("Muller", 62)).toDS()
personDS.show()

Skapa en data uppsättning från en RDD

Om du vill konvertera en RDD till en data uppsättning anropar du rdd.toDS() .

val rdd = sc.parallelize(Seq((1, "Spark"), (2, "Databricks")))
val integerDS = rdd.toDS()
integerDS.show()

Skapa en data uppsättning från en DataFrame

Du kan anropa df.as[SomeCaseClass] för att konvertera DataFrame till en data uppsättning.

case class Company(name: String, foundingYear: Int, numEmployees: Int)
val inputSeq = Seq(Company("ABC", 1998, 310), Company("XYZ", 1983, 904), Company("NOP", 2005, 83))
val df = sc.parallelize(inputSeq).toDF()

val companyDS = df.as[Company]
companyDS.show()

Du kan också hantera tupler när du konverterar en DataFrame till data uppsättningen utan att använda en case class .

val rdd = sc.parallelize(Seq((1, "Spark"), (2, "Databricks"), (3, "Notebook")))
val df = rdd.toDF("Id", "Name")

val dataset = df.as[(Int, String)]
dataset.show()

Arbeta med datauppsättningar

Exempel på ord räknare

val wordsDataset = sc.parallelize(Seq("Spark I am your father", "May the spark be with you", "Spark I am your father")).toDS()
val groupedDataset = wordsDataset.flatMap(_.toLowerCase.split(" "))
                                 .filter(_ != "")
                                 .groupBy("value")
val countsDataset = groupedDataset.count()
countsDataset.show()

Anslut data uppsättningar

Följande exempel visar följande:

  • Union flera data uppsättningar
  • Göra en inre koppling på en villkors grupp efter en angiven kolumn
  • Utföra en anpassad agg regering (Average) på den grupperade data uppsättningen.

I exemplen används endast data uppsättnings-API för att visa alla tillgängliga åtgärder. I verkligheten skulle användningen av DataFrames för att utföra agg regering vara enklare och snabbare än att göra anpassad agg regering med mapGroups . Nästa avsnitt beskriver hur du konverterar data uppsättningar till DataFrames och använder DataFrames-API: et för att utföra agg regeringar.

case class Employee(name: String, age: Int, departmentId: Int, salary: Double)
case class Department(id: Int, name: String)

case class Record(name: String, age: Int, salary: Double, departmentId: Int, departmentName: String)
case class ResultSet(departmentId: Int, departmentName: String, avgSalary: Double)

val employeeDataSet1 = sc.parallelize(Seq(Employee("Max", 22, 1, 100000.0), Employee("Adam", 33, 2, 93000.0), Employee("Eve", 35, 2, 89999.0), Employee("Muller", 39, 3, 120000.0))).toDS()
val employeeDataSet2 = sc.parallelize(Seq(Employee("John", 26, 1, 990000.0), Employee("Joe", 38, 3, 115000.0))).toDS()
val departmentDataSet = sc.parallelize(Seq(Department(1, "Engineering"), Department(2, "Marketing"), Department(3, "Sales"))).toDS()

val employeeDataset = employeeDataSet1.union(employeeDataSet2)

def averageSalary(key: (Int, String), iterator: Iterator[Record]): ResultSet = {
  val (total, count) = iterator.foldLeft(0.0, 0.0) {
      case ((total, count), x) => (total + x.salary, count + 1)
  }
  ResultSet(key._1, key._2, total/count)
}

val averageSalaryDataset = employeeDataset.joinWith(departmentDataSet, $"departmentId" === $"id", "inner")
                                          .map(record => Record(record._1.name, record._1.age, record._1.salary, record._1.departmentId, record._2.name))
                                          .filter(record => record.age > 25)
                                          .groupBy($"departmentId", $"departmentName")
                                          .avg()

averageSalaryDataset.show()

Konvertera en datauppsättning till en DataFrame

De två exemplen ovan hanterar med hjälp av rena data uppsättnings-API: er. Du kan också enkelt flytta från data uppsättningar till DataFrames och utnyttja DataFrames-API: erna. I följande exempel visas exemplet ord räkning som använder både data uppsättningar och DataFrames-API: er.

import org.apache.spark.sql.functions._

val wordsDataset = sc.parallelize(Seq("Spark I am your father", "May the spark be with you", "Spark I am your father")).toDS()
val result = wordsDataset
              .flatMap(_.split(" "))               // Split on whitespace
              .filter(_ != "")                     // Filter empty words
              .map(_.toLowerCase())
              .toDF()                              // Convert to DataFrame to perform aggregation / sorting
              .groupBy($"value")                   // Count number of occurrences of each word
              .agg(count("*") as "numOccurances")
              .orderBy($"numOccurances" desc)      // Show most common words first
result.show()