Az adathalmazok bemutatása

A Datasets API az RDD-k előnyeit (erős gépelés, hatékony lambda függvények használata) a Spark SQL optimalizált végrehajtómotorjának előnyeivel biztosítja. Meghatározhat egy Adatkészlet JVM-objektumát, majd az RDD-hez hasonló funkcionális átalakításokkal map (, flatMap , , filter stb.) kezelheti őket. Ennek az az előnye, hogy az RDD-ktől eltérően ezek az átalakítások már egy strukturált és erősen típusos elosztott gyűjteményen vannak alkalmazva, amely lehetővé teszi, hogy a Spark SQL a Spark végrehajtómotorját optimalizálja.

Adatkészlet létrehozása

Ha egy sorozatot adatkészletké kell konvertálni, hívja meg a .toDS() következőt a sorozaton: .

val dataset = Seq(1, 2, 3).toDS()
dataset.show()

Ha esetosztályok sorozatával rendelkezik, a hívása egy adatkészletet biztosít az .toDS() összes szükséges mezővel.

case class Person(name: String, age: Int)

val personDS = Seq(Person("Max", 33), Person("Adam", 32), Person("Muller", 62)).toDS()
personDS.show()

Adatkészlet létrehozása RDD-ről

Az RDD-k adatkészletké való átalakításához hívja meg a(a)t. rdd.toDS()

val rdd = sc.parallelize(Seq((1, "Spark"), (2, "Databricks")))
val integerDS = rdd.toDS()
integerDS.show()

Adatkészlet létrehozása DataFrame-ből

A df.as[SomeCaseClass] hívása a DataFrame adatkészletké való átalakításához szükséges.

case class Company(name: String, foundingYear: Int, numEmployees: Int)
val inputSeq = Seq(Company("ABC", 1998, 310), Company("XYZ", 1983, 904), Company("NOP", 2005, 83))
val df = sc.parallelize(inputSeq).toDF()

val companyDS = df.as[Company]
companyDS.show()

A dataframe-eket adatkészletekké alakítva a használata nélkül is kezelni tudja a case class adatkészleteket.

val rdd = sc.parallelize(Seq((1, "Spark"), (2, "Databricks"), (3, "Notebook")))
val df = rdd.toDF("Id", "Name")

val dataset = df.as[(Int, String)]
dataset.show()

Adathalmazok használata

Példa szószámra

val wordsDataset = sc.parallelize(Seq("Spark I am your father", "May the spark be with you", "Spark I am your father")).toDS()
val groupedDataset = wordsDataset.flatMap(_.toLowerCase.split(" "))
                                 .filter(_ != "")
                                 .groupBy("value")
val countsDataset = groupedDataset.count()
countsDataset.show()

Adatkészletek illesztés

Az alábbi példa a következőket mutatja be:

  • Több adatkészlet uniója
  • Belső illesztés egy feltételen Csoportosítás adott oszlop alapján
  • Egyéni összesítés (átlag) a csoportosított adatkészleten.

A példák csak a Datasets API-t használják az összes elérhető művelet szemléltetéséhez. A valóságban a DataFrame-eket egyszerűbben és gyorsabban lehet összesítőként használni, mint az egyéni mapGroups összesítéssel. A következő szakasz azt részletezi, hogyan konvertálhatók adatkészletek DataFrame-ekké, és hogyan használhatók a DataFrames API-k összesítésre.

case class Employee(name: String, age: Int, departmentId: Int, salary: Double)
case class Department(id: Int, name: String)

case class Record(name: String, age: Int, salary: Double, departmentId: Int, departmentName: String)
case class ResultSet(departmentId: Int, departmentName: String, avgSalary: Double)

val employeeDataSet1 = sc.parallelize(Seq(Employee("Max", 22, 1, 100000.0), Employee("Adam", 33, 2, 93000.0), Employee("Eve", 35, 2, 89999.0), Employee("Muller", 39, 3, 120000.0))).toDS()
val employeeDataSet2 = sc.parallelize(Seq(Employee("John", 26, 1, 990000.0), Employee("Joe", 38, 3, 115000.0))).toDS()
val departmentDataSet = sc.parallelize(Seq(Department(1, "Engineering"), Department(2, "Marketing"), Department(3, "Sales"))).toDS()

val employeeDataset = employeeDataSet1.union(employeeDataSet2)

def averageSalary(key: (Int, String), iterator: Iterator[Record]): ResultSet = {
  val (total, count) = iterator.foldLeft(0.0, 0.0) {
      case ((total, count), x) => (total + x.salary, count + 1)
  }
  ResultSet(key._1, key._2, total/count)
}

val averageSalaryDataset = employeeDataset.joinWith(departmentDataSet, $"departmentId" === $"id", "inner")
                                          .map(record => Record(record._1.name, record._1.age, record._1.salary, record._1.departmentId, record._2.name))
                                          .filter(record => record.age > 25)
                                          .groupBy($"departmentId", $"departmentName")
                                          .avg()

averageSalaryDataset.show()

Adathalmaz konvertálása adatkeretté

A fenti két példa a tiszta adatkészletEK API-k használatával foglalkozott. A DataFrames API-k kihasználása és az adatkeretek api-k egyszerűen át is lépkednek a DataFrame-ről DataFrame-ekre. Az alábbi példa a szószámlálási példát mutatja be, amely a Datasets és a DataFrames API-kat is használja.

import org.apache.spark.sql.functions._

val wordsDataset = sc.parallelize(Seq("Spark I am your father", "May the spark be with you", "Spark I am your father")).toDS()
val result = wordsDataset
              .flatMap(_.split(" "))               // Split on whitespace
              .filter(_ != "")                     // Filter empty words
              .map(_.toLowerCase())
              .toDF()                              // Convert to DataFrame to perform aggregation / sorting
              .groupBy($"value")                   // Count number of occurrences of each word
              .agg(count("*") as "numOccurances")
              .orderBy($"numOccurances" desc)      // Show most common words first
result.show()