SparkR 1.6 概述SparkR 1.6 overview

备注

有关最新 SparkR 库的信息,请参阅适用于 R 开发人员的 Azure DatabricksFor information about the latest SparkR library, see the Azure Databricks for R developers.

SparkR 是一个 R 包,提供从 R 开始使用 Apache Spark 的轻型前端。从 Spark 1.5.1 开始,SparkR 提供了一个分布式数据帧实现,它支持类似于 R 数据帧和 dplyr) 但在大型数据集上的选择、筛选和 (聚合等操作。SparkR is an R package that provides a light-weight frontend to use Apache Spark from R. Starting with Spark 1.5.1, SparkR provides a distributed DataFrame implementation that supports operations like selection, filtering, and aggregation (similar to R data frames and dplyr) but on large datasets. SparkR 还支持使用 MLlib 的分布式机器学习。SparkR also supports distributed machine learning using MLlib.

创建 SparkR DataFramesCreating SparkR DataFrames

应用程序可以从本地 R 数据帧、数据源或使用 Spark SQL 查询创建 DataFrames。Applications can create DataFrames from a local R data frame, from data sources, or using Spark SQL queries.

创建数据帧的最简单方法是将本地 R 数据帧转换为 SparkR 数据帧。The simplest way to create a DataFrame is to convert a local R data frame into a SparkR DataFrame. 具体来说,我们可以使用 create a 数据帧并传入本地 R 数据帧,以创建 SparkR 数据帧。Specifically we can use create a DataFrame and pass in the local R data frame to create a SparkR DataFrame. 例如,下面的单元格使用 R 中的忠实数据集创建一个数据帧。As an example, the following cell creates a DataFrame using the faithful dataset from R.

df <- createDataFrame(sqlContext, faithful)

# Displays the content of the DataFrame to stdout
head(df)

从使用 Spark SQL 的数据源From data sources using Spark SQL

从数据源创建 DataFrames 的常规方法为 read df。The general method for creating DataFrames from data sources is read.df. 此方法采用 SQLContext、要加载的文件的路径和数据源的类型。This method takes in the SQLContext, the path for the file to load and the type of data source. SparkR 支持以本机方式读取 JSON 和 Parquet 文件,并通过 Spark 包查找常用文件格式(如 CSV 和 Avro)的数据源连接器。SparkR supports reading JSON and Parquet files natively and through Spark Packages you can find data source connectors for popular file formats like CSV and Avro.

%fs rm dbfs:/tmp/people.json
%fs put dbfs:/tmp/people.json
'{"age": 10, "name": "John"}
{"age": 20, "name": "Jane"}
{"age": 30, "name": "Andy"}'
people <- read.df(sqlContext, "dbfs:/tmp/people.json", source="json")

SparkR 会自动从 JSON 文件推断架构。SparkR automatically infers the schema from the JSON file.

printSchema(people)
display(people)

将数据源连接器用于 Spark 包Using data source connectors with Spark Packages

例如,我们将使用 Spark CSV 包加载 CSV 文件。As an example, we will use the Spark CSV package to load a CSV file. 可以通过 Databricks 在此处找到 Spark 包的列表You can find a list of Spark Packages by Databricks here.

diamonds <- read.df(sqlContext, "/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv",
                    source = "com.databricks.spark.csv", header="true", inferSchema = "true")
head(diamonds)

数据源 API 还可用于将 DataFrames 保存到多种文件格式。The data sources API can also be used to save out DataFrames into multiple file formats. 例如,我们可以使用 write df 将上一个示例中的数据帧保存到 Parquet 文件For example we can save the DataFrame from the previous example to a Parquet file using write.df

%fs rm -r dbfs:/tmp/people.parquet
write.df(people, path="dbfs:/tmp/people.parquet", source="parquet", mode="overwrite")
%fs ls dbfs:/tmp/people.parquet

从 Spark SQL 查询From Spark SQL queries

还可以使用 Spark SQL 查询创建 SparkR DataFrames。You can also create SparkR DataFrames using Spark SQL queries.

# Register earlier df as temp view
createOrReplaceTempView(people, "peopleTemp")
# Create a df consisting of only the 'age' column using a Spark SQL query
age <- sql(sqlContext, "SELECT age FROM peopleTemp")
head(age)
# Resulting df is a SparkR df
str(age)

数据帧操作DataFrame operations

SparkR DataFrames 支持多种函数来处理结构化数据。SparkR DataFrames support a number of functions to do structured data processing. 此处提供一些基本示例,并在API 文档中找到完整列表。Here we include some basic examples and a complete list can be found in the API docs.

选择行和列Selecting rows and columns

# Create DataFrame
df <- createDataFrame(sqlContext, faithful)
df
# Select only the "eruptions" column
head(select(df, df$eruptions))
# You can also pass in column name as strings
head(select(df, "eruptions"))
# Filter the DataFrame to only retain rows with wait times shorter than 50 mins
head(filter(df, df$waiting < 50))

分组和聚合Grouping and aggregation

SparkR DataFrames 支持多种常用函数,以便在分组后聚合数据。SparkR DataFrames support a number of commonly used functions to aggregate data after grouping. 例如,我们可以计算每个等待时间在忠实数据集中出现的次数。For example we can count the number of times each waiting time appears in the faithful dataset.

head(count(groupBy(df, df$waiting)))
# We can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- count(groupBy(df, df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))

列操作Column operations

SparkR 提供了许多可直接应用于数据处理和聚合的列的函数。SparkR provides a number of functions that can be directly applied to columns for data processing and aggregation. 下面的示例演示如何使用基本算术函数。The following example shows the use of basic arithmetic functions.

# Convert waiting time from hours to seconds.
# You can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)

机器学习Machine learning

从 Spark 1.5 起,SparkR 允许使用 glm ( # A1 函数对 SparkR DataFrames 上的通用线性模型进行调整。As of Spark 1.5, SparkR allows the fitting of generalized linear models over SparkR DataFrames using the glm() function. 在这种情况下,SparkR 使用 MLlib 训练指定族的模型。Under the hood, SparkR uses MLlib to train a model of the specified family. 我们支持模型管接头的可用 R 公式运算符的子集,包括 "~"、"."、"+" 和 "-"。We support a subset of the available R formula operators for model fitting, including ‘~’, ‘.’, ‘+’, and ‘-‘.

在这种情况下,SparkR 会自动对分类特征进行一次热编码,以便不需要手动执行此操作。Under the hood, SparkR automatically performs one-hot encoding of categorical features so that it does not need to be done manually. 除了 String 和 Double 类型功能以外,还可以适应 MLlib 矢量功能,以便与其他 MLlib 组件兼容。Beyond String and Double type features, it is also possible to fit over MLlib Vector features, for compatibility with other MLlib components.

下面的示例演示如何使用 SparkR 生成高斯 GLM 模型。The following example shows the use of building a gaussian GLM model using SparkR. 若要运行线性回归,请将 "系列" 设置为 "高斯"。To run Linear Regression, set family to “gaussian”. 若要运行逻辑回归,请将 "系列" 设置为 "二项式"。To run Logistic Regression, set family to “binomial”.

# Create the DataFrame
df <- createDataFrame(sqlContext, iris)

# Fit a linear model over the dataset.
model <- glm(Sepal_Length ~ Sepal_Width + Species, data = df, family = "gaussian")

# Model coefficients are returned in a similar format to R's native glm().
summary(model)

将本地 R 数据帧转换为 SparkR DataFramesConverting local R data frames to SparkR DataFrames

可以使用将 createDataFrame 本地 R 数据帧转换为 SparkR DataFrames。You can use createDataFrame to convert local R data frames to SparkR DataFrames.

# Create SparkR DataFrame using localDF
convertedSparkDF <- createDataFrame(sqlContext, localDF)
str(convertedSparkDF)
# Another example: Create SparkR DataFrame with a local R data frame
anotherSparkDF <- createDataFrame(sqlContext, data.frame(surname = c("Tukey", "Venables", "Tierney", "Ripley", "McNeil"),
                                                         nationality = c("US", "Australia", "US", "UK", "Australia"),
                                                         deceased = c("yes", rep("no", 4))))
count(anotherSparkDF)