教程:使用 Apache Spark 数据帧加载和转换数据

本教程介绍如何在 Azure Databricks 中使用 Apache Spark Python (PySpark) 数据帧 API、Apache Spark Scala 数据帧 API 和 SparkR SparkDataFrame API 加载和转换数据。

本教程结束时,你可了解数据帧是什么并熟悉以下任务:

Python

另请参阅 Apache Spark PySpark API 参考

Scala

另请参阅 Apache Spark Scala API 参考

R

另请参阅 Apache SparkR API 参考

什么是数据帧?

数据帧是一种有标签的二维数据结构,其中的列可能会有不同的类型。 可将数据帧视为电子表格、SQL 表或序列对象的字典。 Apache Spark 数据帧提供了一组丰富的函数(选择列、筛选、联接、聚合),让你可以有效地解决常见的数据分析问题。

Apache Spark 数据帧是基于弹性分布式数据集 (RDD) 的抽象。 Spark 数据帧和 Spark SQL 使用统一的规划和优化引擎,使你能够在 Azure Databricks 上的所有受支持的语言(Python、SQL、Scala 和 R)中获得几乎相同的性能。

要求

若要完成以下教程,必须满足以下要求:

  • 若要使用本教程中的示例,必须已为工作区启用 Unity 目录

  • 本教程中的示例使用 Unity Catalog 来存储示例数据。 若要使用这些示例,请创建一个卷,并使用该卷的目录、架构和卷名称来设置示例使用的卷路径。

  • 必须在 Unity 目录中具有以下权限:

    • READ VOLUMEWRITE VOLUMEALL PRIVILEGES 表示本教程使用的卷。
    • USE SCHEMAALL PRIVILEGES 表示本教程使用的架构。
    • USE CATALOGALL PRIVILEGES 表示本教程使用的目录。

    若要设置这些权限,请联系 Databricks 管理员或参阅 Unity Catalog 特权和安全对象

步骤 1:定义变量并加载 CSV 文件

此步骤定义要在本教程中使用的变量,然后将包含婴儿姓名数据的 CSV 文件从 health.data.ny.gov 加载到 Unity Catalog 卷。

  1. 单击 新建图标 图标打开新笔记本。 若要了解如何在 Azure Databricks 笔记本中导航,请参阅 Databricks 笔记本界面和控件

  2. 将以下代码复制并粘贴到新的空笔记本单元格中: 将 <catalog-name><schema-name><volume-name> 替换为 Unity 目录卷的目录、架构和卷名称。 请将 <table_name> 替换为你选择的表名称。 本教程稍后会将婴儿姓名数据加载到此表中。

  3. Shift+Enter 以运行单元格并创建新的空白单元格。

    Python

    catalog = "<catalog_name>"
    schema = "<schema_name>"
    volume = "<volume_name>"
    download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name = "rows.csv"
    table_name = "<table_name>"
    path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
    path_tables = catalog + "." + schema
    print(path_tables) # Show the complete path
    print(path_volume) # Show the complete path
    

    Scala

    val catalog = "<catalog_name>"
    val schema = "<schema_name>"
    val volume = "<volume_name>"
    val download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    val file_name = "rows.csv"
    val table_name = "<table_name>"
    val path_volume = s"/Volumes/$catalog/$schema/$volume"
    val path_tables = s"$catalog.$schema.$table_name"
    print(path_volume) // Show the complete path
    print(path_tables) // Show the complete path
    

    R

    catalog <- "<catalog_name>"
    schema <- "<schema_name>"
    volume <- "<volume_name>"
    download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name <- "rows.csv"
    table_name <- "<table_name>"
    path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "")
    path_tables <- paste(catalog, ".", schema, sep = "")
    print(path_volume) # Show the complete path
    print(path_tables) # Show the complete path
    
  4. 将以下代码复制并粘贴到新的空笔记本单元格中: 此代码使用 Databricks dbutuils 命令将 rows.csv 文件从 health.data.ny.gov 复制到 Unity Catalog 卷。

  5. Shift+Enter 以运行单元格,然后移动到下一个单元格。

    Python

    dbutils.fs.cp(f"{download_url}", f"{path_volume}" + "/" + f"{file_name}")
    

    Scala

    dbutils.fs.cp(download_url, s"$path_volume/$file_name")
    

    R

    dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
    

步骤 2:创建数据帧

此步骤使用测试数据创建名为 df1 的数据帧,然后显示其内容。

  1. 将以下代码复制并粘贴到新的空笔记本单元格中: 此代码使用测试数据创建数据帧,然后显示数据帧的内容和架构。

  2. Shift+Enter 以运行单元格,然后移动到下一个单元格。

    Python

    data = [[2021, "test", "Albany", "M", 42]]
    columns = ["Year", "First_Name", "County", "Sex", "Count"]
    
    df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    Scala

    val data = Seq((2021, "test", "Albany", "M", 42))
    val columns = Seq("Year", "First_Name", "County", "Sex", "Count")
    
    val df1 = data.toDF(columns: _*)
    display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization.
    // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    R

    # Load the SparkR package that is already preinstalled on the cluster.
    library(SparkR)
    
    data <- data.frame(
      Year = c(2021),
      First_Name = c("test"),
      County = c("Albany"),
      Sex = c("M"),
      Count = c(42)
    )
    
    df1 <- createDataFrame(data)
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
    

步骤 3:将数据从 CSV 文件加载到数据帧

此步骤从之前加载到 Unity Catalog 卷的 CSV 文件中创建名为 df_csv 的数据帧。 请参阅 spark.read.csv

  1. 将以下代码复制并粘贴到新的空笔记本单元格中: 此代码将婴儿姓名数据从 CSV 文件加载到数据帧 df_csv,然后显示该数据帧的内容。

  2. Shift+Enter 以运行单元格,然后移动到下一个单元格。

    Python

    df_csv = spark.read.csv(f"{path_volume}/{file_name}",
      header=True,
      inferSchema=True,
      sep=",")
    display(df_csv)
    

    Scala

    val df_csv = spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .option("delimiter", ",")
      .csv(s"$path_volume/$file_name")
    
    display(df_csv)
    

    R

    df_csv <- read.df(paste(path_volume, "/", file_name, sep=""),
      source="csv",
      header = TRUE,
      inferSchema = TRUE,
      delimiter = ",")
    
    display(df_csv)
    

可以从许多受支持的文件格式加载数据。

步骤 4:查看数据帧并与之交互

使用以下方法查看婴儿姓名数据帧并与之交互。

了解如何显示 Apache Spark 数据帧的架构。 Apache Spark 使用术语“架构”来指代数据帧中列的名称和数据类型。

将以下代码复制并粘贴到空的笔记本单元格中。 此代码使用 .printSchema() 方法显示数据帧的架构,以便查看两个数据帧的架构 - 准备合并这两个数据帧。

Python

df_csv.printSchema()
df1.printSchema()

Scala

df_csv.printSchema()
df1.printSchema()

R

printSchema(df_csv)
printSchema(df1)

注意

Azure Databricks 也使用术语“架构”来描述注册到目录的表集合。

重命名数据帧中的列

了解如何重命名数据帧中的列。

将以下代码复制并粘贴到空的笔记本单元格中。 此代码用于重命名 df1_csv 数据帧中的列,以匹配 df1 数据帧中的相应列。 此代码使用 Apache Spark withColumnRenamed() 方法。

Python

df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
df_csv.printSchema

Scala

val df_csvRenamed = df_csv.withColumnRenamed("First Name", "First_Name")
// when modifying a DataFrame in Scala, you must assign it to a new variable
df_csv_renamed.printSchema()

R

df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
printSchema(df_csv)

合并数据帧

了解如何创建一个新的数据帧,用于将某个数据帧的行添加到另一个数据帧。

将以下代码复制并粘贴到空的笔记本单元格中。 此代码使用 Apache Spark union() 方法将第一个数据帧 df 的内容与数据帧 df_csv 合并,后者包含从 CSV 文件加载的婴儿姓名数据。

Python

df = df1.union(df_csv)
display(df)

Scala

val df = df1.union(df_csv_renamed)
display(df)

R

display(df <- union(df1, df_csv))

筛选数据帧中的行

使用 Apache Spark .filter().where() 方法筛选行,发现数据集中最受欢迎的婴儿姓名。 使用筛选来选择要在数据帧中返回或修改的行子集。 性能或语法没有差别,如以下示例中所示。

使用 .filter() 方法

将以下代码复制并粘贴到空的笔记本单元格中。 此代码使用 Apache Spark .filter() 方法显示数据帧中计数超过 50 的行。

Python
display(df.filter(df["Count"] > 50))
Scala
display(df.filter(df("Count") > 50))
R
display(filteredDF <- filter(df, df$Count > 50))

使用 .where() 方法

将以下代码复制并粘贴到空的笔记本单元格中。 此代码使用 Apache Spark .where() 方法显示数据帧中计数超过 50 的行。

Python
display(df.where(df["Count"] > 50))
Scala
display(df.where(df("Count") > 50))
R
display(filtered_df <- where(df, df$Count > 50))

从数据帧中选择列并按频率排序

使用 select() 方法了解婴儿姓名的使用频率,以指定要返回的数据帧中的列。 使用 Apache Spark orderbydesc 函数对结果进行排序。

Apache Spark 的 pyspark.sql 模块为 SQL 函数提供支持。 在这些函数中,本教程中使用的函数包括 Apache Spark orderBy()desc()expr() 函数。 可以根据需要将它们导入会话来使用这些函数。

将以下代码复制并粘贴到空的笔记本单元格中。 此代码用于导入 desc() 函数,然后使用 Apache Spark select() 方法以及 Apache Spark orderBy()desc() 函数按降序显示最常用的姓名及其计数。

Python

from pyspark.sql.functions import desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))

Scala

import org.apache.spark.sql.functions.desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))

R

display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))

创建子集数据帧

了解如何从现有数据帧创建子集数据帧。

将以下代码复制并粘贴到空的笔记本单元格中。 此代码使用 Apache Spark filter 方法创建新的数据帧,以按年份、计数和性别限制数据。 它使用 Apache Spark select() 方法来限制列。 它还使用 Apache Spark orderBy()desc() 函数按计数对新的数据帧进行排序。

Python

subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
display(subsetDF)

Scala

val subsetDF = df.filter((df("Year") == 2009) && (df("Count") > 100) && (df("Sex") == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))

display(subsetDF)

R

subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
display(subsetDF)

步骤 5:保存数据帧

了解如何保存数据帧。 可以将数据帧保存到表,或者将数据帧写入一个或多个文件。

将数据帧保存到表

默认情况下,Azure Databricks 对所有表使用 Delta Lake 格式。 若要保存数据帧,必须拥有目录和架构上的 CREATE 表权限。

将以下代码复制并粘贴到空的笔记本单元格中。 此代码使用在本教程开始时定义的变量将数据帧的内容保存到表中。

Python

df.write.saveAsTable(f"{path_tables}" + "." + f"{table_name}")

# To overwrite an existing table, use the following code:
# df.write.mode("overwrite").saveAsTable(f"{path_tables}" + "." + f"{table_name}")

Scala

df.write.saveAsTable(s"$path_tables" + "." + s"$table_name")

// To overwrite an existing table, use the following code:
// df.write.mode("overwrite").saveAsTable(s"$path_volume" + "." + s"$table_name")

R

saveAsTable(df, paste(path_tables, ".", table_name))
# To overwrite an existing table, use the following code:
# saveAsTable(df, paste(path_tables, ".", table_name), mode = "overwrite")

大多数 Apache Spark 应用程序都以分布式方式处理大型数据集。 Apache Spark 会写出文件目录,而不是单个文件。 Delta Lake 会拆分 Parquet 文件夹和文件。 许多数据系统都可以读取这些目录的文件。 Azure Databricks 建议为大多数应用程序使用表而不是文件路径。

将数据帧保存到 JSON 文件

将以下代码复制并粘贴到空的笔记本单元格中。 此代码用于将数据帧保存到 JSON 文件的目录中。

Python

df.write.format("json").save("/tmp/json_data")

# To overwrite an existing file, use the following code:
# df.write.format("json").mode("overwrite").save("/tmp/json_data")

Scala

df.write.format("json").save("/tmp/json_data")

// To overwrite an existing file, use the following code:
// df.write.format("json").mode("overwrite").save("/tmp/json_data")

R

write.df(df, path = "/tmp/json_data", source = "json")
# To overwrite an existing file, use the following code:
# write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")

从 JSON 文件读取数据帧

了解如何使用 Apache Spark spark.read.format() 方法将 JSON 数据从目录读取到数据帧中。

将以下代码复制并粘贴到空的笔记本单元格中。 此代码显示在上一示例中保存的 JSON 文件。

Python

display(spark.read.format("json").json("/tmp/json_data"))

Scala

display(spark.read.format("json").json("/tmp/json_data"))

R

display(read.json("/tmp/json_data"))

其他任务:在 PySpark、Scala 和 R 中运行 SQL 查询

Apache Spark 数据帧提供以下选项,用于将 SQL 与 PySpark、Scala 和 R 合并在一起。可以在为本教程创建的同一笔记本中运行以下代码。

将列指定为 SQL 查询

了解如何使用 Apache Spark selectExpr() 方法。 这是 select() 方法的变体,它用于接受 SQL 表达式并返回更新的数据帧。 此方法允许使用 SQL 表达式,例如 upper

将以下代码复制并粘贴到空的笔记本单元格中。 此代码使用 Apache Spark selectExpr() 方法和 SQL upper 表达式将字符串列转换为大写(并重命名列)。

Python

display(df.selectExpr("Count", "upper(County) as big_name"))

Scala

display(df.selectExpr("Count", "upper(County) as big_name"))

R

display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))

使用 expr() 对列使用 SQL 语法

了解如何导入并使用 Apache Spark expr() 函数,以在指定列的任何位置使用 SQL 语法。

将以下代码复制并粘贴到空的笔记本单元格中。 此代码用于导入 expr() 函数,然后使用 Apache Spark expr() 函数和 SQL lower 表达式将字符串列转换为小写(并重命名列)。

Python

from pyspark.sql.functions import expr
display(df.select("Count", expr("lower(County) as little_name")))

Scala

import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function

display(df.select(col("Count"), expr("lower(County) as little_name")))

R

display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
# expr() function is not supported in R, selectExpr in SparkR replicates this functionality

使用 spark.sql() 函数运行任意 SQL 查询

了解如何使用 Apache Spark spark.sql() 函数运行任意 SQL 查询。

将以下代码复制并粘贴到空的笔记本单元格中。 此代码使用 Apache Spark spark.sql() 函数通过 SQL 语法来查询 SQL 表。

Python

display(spark.sql(f"SELECT * FROM {path_tables}" + "." + f"{table_name}"))

Scala

display(spark.sql(s"SELECT * FROM $path_tables.$table_name"))

R

display(sql(paste("SELECT * FROM", path_tables, ".", table_name)))

数据帧教程笔记本

以下笔记本包含本教程中的示例查询。

Python

数据帧教程笔记本

获取笔记本

Scala

数据帧教程笔记本

获取笔记本

R

数据帧教程笔记本

获取笔记本

其他资源