# 資料集簡介Introduction to Datasets

## 建立資料集Create a Dataset

``````val dataset = Seq(1, 2, 3).toDS()
dataset.show()
``````

``````case class Person(name: String, age: Int)

val personDS = Seq(Person("Max", 33), Person("Adam", 32), Person("Muller", 62)).toDS()
personDS.show()
``````

### 從 RDD 建立資料集Create a Dataset from an RDD

``````val rdd = sc.parallelize(Seq((1, "Spark"), (2, "Databricks")))
val integerDS = rdd.toDS()
integerDS.show()
``````

### 從資料框架建立資料集Create a Dataset from a DataFrame

``````case class Company(name: String, foundingYear: Int, numEmployees: Int)
val inputSeq = Seq(Company("ABC", 1998, 310), Company("XYZ", 1983, 904), Company("NOP", 2005, 83))
val df = sc.parallelize(inputSeq).toDF()

val companyDS = df.as[Company]
companyDS.show()
``````

``````val rdd = sc.parallelize(Seq((1, "Spark"), (2, "Databricks"), (3, "Notebook")))
val df = rdd.toDF("Id", "Name")

val dataset = df.as[(Int, String)]
dataset.show()
``````

## 使用資料集Work with Datasets

### 字數統計範例Word Count Example

``````val wordsDataset = sc.parallelize(Seq("Spark I am your father", "May the spark be with you", "Spark I am your father")).toDS()
val groupedDataset = wordsDataset.flatMap(_.toLowerCase.split(" "))
.filter(_ != "")
.groupBy("value")
val countsDataset = groupedDataset.count()
countsDataset.show()
``````

### 聯結資料集Join Datasets

• 聯集多個資料集Union multiple datasets
• 在條件群組上依特定資料行執行內部聯結Doing an inner join on a condition Group by a specific column
• 在群組資料集上執行自訂匯總 (平均) 。Doing a custom aggregation (average) on the grouped dataset.

``````case class Employee(name: String, age: Int, departmentId: Int, salary: Double)
case class Department(id: Int, name: String)

case class Record(name: String, age: Int, salary: Double, departmentId: Int, departmentName: String)
case class ResultSet(departmentId: Int, departmentName: String, avgSalary: Double)

val employeeDataSet1 = sc.parallelize(Seq(Employee("Max", 22, 1, 100000.0), Employee("Adam", 33, 2, 93000.0), Employee("Eve", 35, 2, 89999.0), Employee("Muller", 39, 3, 120000.0))).toDS()
val employeeDataSet2 = sc.parallelize(Seq(Employee("John", 26, 1, 990000.0), Employee("Joe", 38, 3, 115000.0))).toDS()
val departmentDataSet = sc.parallelize(Seq(Department(1, "Engineering"), Department(2, "Marketing"), Department(3, "Sales"))).toDS()

val employeeDataset = employeeDataSet1.union(employeeDataSet2)

def averageSalary(key: (Int, String), iterator: Iterator[Record]): ResultSet = {
val (total, count) = iterator.foldLeft(0.0, 0.0) {
case ((total, count), x) => (total + x.salary, count + 1)
}
ResultSet(key._1, key._2, total/count)
}

val averageSalaryDataset = employeeDataset.joinWith(departmentDataSet, \$"departmentId" === \$"id", "inner")
.map(record => Record(record._1.name, record._1.age, record._1.salary, record._1.departmentId, record._2.name))
.filter(record => record.age > 25)
.groupBy(\$"departmentId", \$"departmentName")
.avg()

averageSalaryDataset.show()
``````

## 將資料集轉換成資料框架Convert a Dataset to a DataFrame

``````import org.apache.spark.sql.functions._

val wordsDataset = sc.parallelize(Seq("Spark I am your father", "May the spark be with you", "Spark I am your father")).toDS()
val result = wordsDataset
.flatMap(_.split(" "))               // Split on whitespace
.filter(_ != "")                     // Filter empty words
.map(_.toLowerCase())
.toDF()                              // Convert to DataFrame to perform aggregation / sorting
.groupBy(\$"value")                   // Count number of occurrences of each word
.agg(count("*") as "numOccurances")
.orderBy(\$"numOccurances" desc)      // Show most common words first
result.show()
``````