مقدمة لمجموعات البيانات

توفر واجهة برمجة تطبيقات Datasets فوائد RDDs (الكتابة القوية ، والقدرة على استخدام وظائف lambda القوية) مع فوائد محرك التنفيذ الأمثل من Spark SQL. يمكنك تعريف كائنات JVM Dataset ثم معالجتها باستخدام تحويلات وظيفية mapflatMap و و filter وهكذا) مشابهة ل RDD. الفوائد هي أنه ، على عكس RDDs ، يتم تطبيق هذه التحولات الآن على مجموعة موزعة منظمة ومطبوعة بقوة تسمح ل Spark بالاستفادة من محرك تنفيذ Spark SQL للتحسين.

إنشاء مجموعة بيانات

لتحويل تسلسل إلى مجموعة بيانات، اتصل .toDS() بالتسلسل.

val dataset = Seq(1, 2, 3).toDS()
dataset.show()

إذا كان لديك تسلسل فئات الحالة، يوفر الاستدعاء .toDS() مجموعة بيانات مع كافة الحقول الضرورية.

case class Person(name: String, age: Int)

val personDS = Seq(Person("Max", 33), Person("Adam", 32), Person("Muller", 62)).toDS()
personDS.show()

إنشاء مجموعة بيانات من RDD

لتحويل RDD إلى مجموعة بيانات، اتصل rdd.toDS() .

val rdd = sc.parallelize(Seq((1, "Spark"), (2, "Databricks")))
val integerDS = rdd.toDS()
integerDS.show()

إنشاء مجموعة بيانات من إطار بيانات

يمكنك استدعاء df.as[SomeCaseClass] لتحويل DataFrame إلى Dataset.

case class Company(name: String, foundingYear: Int, numEmployees: Int)
val inputSeq = Seq(Company("ABC", 1998, 310), Company("XYZ", 1983, 904), Company("NOP", 2005, 83))
val df = sc.parallelize(inputSeq).toDF()

val companyDS = df.as[Company]
companyDS.show()

يمكنك أيضا التعامل مع tuples أثناء تحويل DataFrame إلى Dataset دون استخدام case class .

val rdd = sc.parallelize(Seq((1, "Spark"), (2, "Databricks"), (3, "Notebook")))
val df = rdd.toDF("Id", "Name")

val dataset = df.as[(Int, String)]
dataset.show()

العمل مع مجموعات البيانات

مثال عدد الكلمات

val wordsDataset = sc.parallelize(Seq("Spark I am your father", "May the spark be with you", "Spark I am your father")).toDS()
val groupedDataset = wordsDataset.flatMap(_.toLowerCase.split(" "))
                                 .filter(_ != "")
                                 .groupBy("value")
val countsDataset = groupedDataset.count()
countsDataset.show()

الانضمام إلى مجموعات البيانات

يوضح المثال التالي ما يلي:

  • توحيد مجموعات بيانات متعددة
  • القيام صلة داخلية على شرط المجموعة بواسطة عمود معين
  • إجراء تجميع مخصص (متوسط) على مجموعة البيانات المجمعة.

تستخدم الأمثلة فقط Datasets API لعرض كافة العمليات المتوفرة. في الواقع، استخدام DataFrames للقيام التجميع سيكون أبسط وأسرع من القيام التجميع المخصص مع mapGroups . يغطي القسم التالي تفاصيل تحويل Datasets إلى DataFrames واستخدام واجهة برمجة تطبيقات DataFrames لإجراء التجميعات.

case class Employee(name: String, age: Int, departmentId: Int, salary: Double)
case class Department(id: Int, name: String)

case class Record(name: String, age: Int, salary: Double, departmentId: Int, departmentName: String)
case class ResultSet(departmentId: Int, departmentName: String, avgSalary: Double)

val employeeDataSet1 = sc.parallelize(Seq(Employee("Max", 22, 1, 100000.0), Employee("Adam", 33, 2, 93000.0), Employee("Eve", 35, 2, 89999.0), Employee("Muller", 39, 3, 120000.0))).toDS()
val employeeDataSet2 = sc.parallelize(Seq(Employee("John", 26, 1, 990000.0), Employee("Joe", 38, 3, 115000.0))).toDS()
val departmentDataSet = sc.parallelize(Seq(Department(1, "Engineering"), Department(2, "Marketing"), Department(3, "Sales"))).toDS()

val employeeDataset = employeeDataSet1.union(employeeDataSet2)

def averageSalary(key: (Int, String), iterator: Iterator[Record]): ResultSet = {
  val (total, count) = iterator.foldLeft(0.0, 0.0) {
      case ((total, count), x) => (total + x.salary, count + 1)
  }
  ResultSet(key._1, key._2, total/count)
}

val averageSalaryDataset = employeeDataset.joinWith(departmentDataSet, $"departmentId" === $"id", "inner")
                                          .map(record => Record(record._1.name, record._1.age, record._1.salary, record._1.departmentId, record._2.name))
                                          .filter(record => record.age > 25)
                                          .groupBy($"departmentId", $"departmentName")
                                          .avg()

averageSalaryDataset.show()

تحويل مجموعة بيانات إلى إطار بيانات

تناول المثالان أعلاه استخدام واجهات برمجة التطبيقات لمجموعات البيانات النقية. يمكنك أيضا الانتقال بسهولة من Datasets إلى DataFrames والاستفادة من واجهات برمجة التطبيقات DataFrames. يوضح المثال التالي مثال عدد الكلمات الذي يستخدم كل من Datasets و DataFrames APIs.

import org.apache.spark.sql.functions._

val wordsDataset = sc.parallelize(Seq("Spark I am your father", "May the spark be with you", "Spark I am your father")).toDS()
val result = wordsDataset
              .flatMap(_.split(" "))               // Split on whitespace
              .filter(_ != "")                     // Filter empty words
              .map(_.toLowerCase())
              .toDF()                              // Convert to DataFrame to perform aggregation / sorting
              .groupBy($"value")                   // Count number of occurrences of each word
              .agg(count("*") as "numOccurances")
              .orderBy($"numOccurances" desc)      // Show most common words first
result.show()