使用SparkR
SparkR 是一種 R 套件,提供輕量前端以使用來自 R 的 Apache Spark。SparkR 提供分散式數據框架實作,支持選取、篩選、匯總等作業。SparkR 也支援使用 MLlib 的分散式機器學習。
透過 Spark 批次作業定義或互動式 Microsoft Fabric 筆記本使用 SparkR。
R 支援僅適用於 Spark3.1 或更新版本。 不支援Spark 2.4中的 R。
必要條件
取得 Microsoft Fabric 訂用 帳戶。 或者,註冊免費的 Microsoft Fabric 試用版。
登入 Microsoft Fabric。
使用首頁左側的體驗切換器,切換至 Synapse 資料科學 體驗。
開啟或建立筆記本。 若要瞭解如何,請參閱 如何使用 Microsoft Fabric 筆記本。
將語言選項設定為 SparkR (R) 以變更主要語言。
將筆記本附加至 Lakehouse。 在左側,選取 [新增 ] 以新增現有的 Lakehouse 或建立 Lakehouse。
讀取和寫入 SparkR DataFrame
從本機 R data.frame 讀取 SparkR DataFrame
建立 DataFrame 最簡單的方式是將本機 R data.frame 轉換成 Spark DataFrame。
# load SparkR pacakge
library(SparkR)
# read a SparkR DataFrame from a local R data.frame
df <- createDataFrame(faithful)
# displays the content of the DataFrame
display(df)
從 Lakehouse 讀取和寫入 SparkR DataFrame
數據可以儲存在叢集節點的本機文件系統上。 從 Lakehouse read.df
讀取與寫入 SparkR DataFrame 的一般方法是 和 write.df
。 這些方法會採用檔案要載入的路徑,以及數據源的類型。 SparkR 支援原生讀取 CSV、JSON、文字和 Parquet 檔案。
若要讀取和寫入 Lakehouse,請先將其新增至您的工作階段。 在筆記本左側,選取 [新增 ] 以新增現有的 Lakehouse 或建立 Lakehouse。
注意
若要使用 Spark 套件存取 Lakehouse 檔案,例如 read.df
或 write.df
,請使用其 ADFS 路徑 或 Spark 的相對路徑。 在 Lakehouse 總管中,以滑鼠右鍵按下您想要存取的檔案或資料夾,並從內容功能表複製 Spark 的 ADFS 路徑或相對路徑。
# write data in CSV using relative path for Spark
temp_csv_spark<-"Files/data/faithful.csv"
write.df(df, temp_csv_spark ,source="csv", mode = "overwrite", header = "true")
# read data in CSV using relative path for Spark
faithfulDF_csv <- read.df(temp_csv_spark, source= "csv", header = "true", inferSchema = "true")
# displays the content of the DataFrame
display(faithfulDF_csv)
# write data in parquet using ADFS path
temp_parquet_spark<-"abfss://xxx/xxx/data/faithful.parquet"
write.df(df, temp_parquet_spark ,source="parquet", mode = "overwrite", header = "true")
# read data in parquet uxing ADFS path
faithfulDF_pq <- read.df(temp_parquet_spark, source= "parquet", header = "true", inferSchema = "true")
# displays the content of the DataFrame
display(faithfulDF_pq)
Microsoft Fabric 已 tidyverse
預安裝。 您可以在熟悉的 R 套件中存取 Lakehouse 檔案,例如使用 readr::read_csv()
和 readr::write_csv()
讀取和寫入 Lakehouse 檔案。
注意
若要使用 R 套件存取 Lakehouse 檔案,您必須使用 檔案 API 路徑。 在 Lakehouse 總管中,以滑鼠右鍵按下您想要存取的檔案或資料夾,並從內容功能表複製其 檔案 API 路徑 。
# read data in CSV using API path
# To find the path, navigate to the csv file, right click, and Copy File API path.
temp_csv_api<-'/lakehouse/default/Files/data/faithful.csv/part-00000-d8e09a34-bd63-41bd-8cf8-f4ed2ef90e6c-c000.csv'
faithfulDF_API <- readr::read_csv(temp_csv_api)
# display the content of the R data.frame
head(faithfulDF_API)
您也可以使用 SparkSQL 查詢,在 Lakehouse 上讀取 SparkR 數據框架。
# Regsiter ealier df as temp view
createOrReplaceTempView(df, "eruptions")
# Create a df using a SparkSQL query
waiting <- sql("SELECT * FROM eruptions")
head(waiting)
透過 RODBC 讀取和寫入 SQL 資料表
使用 RODBC 透過 ODBC 介面連線到 SQL 資料庫。 例如,您可以連線到 Synapse 專用 SQL 集區,如下列範例程式代碼所示。 將您自己的連線詳細資料取代為 <database>
、 <uid>
、 <password>
和 <table>
。
# load RODBC package
library(RODBC)
# config connection string
DriverVersion <- substr(system("apt list --installed *msodbc*", intern=TRUE, ignore.stderr=TRUE)[2],10,11)
ServerName <- "your-server-name"
DatabaseName <- "your-database-name"
Uid <- "your-user-id-list"
Password <- "your-password"
ConnectionString = sprintf("Driver={ODBC Driver %s for SQL Server};
Server=%s;
Database=%s;
Uid=%s;
Pwd=%s;
Encrypt=yes;
TrustServerCertificate=yes;
Connection Timeout=30;",DriverVersion,ServerName,DatabaseName,Uid,Password)
print(ConnectionString)
# connect to driver
channel <-odbcDriverConnect(ConnectionString)
# query from existing tables
Rdf <- sqlQuery(channel, "select * from <table>")
class(Rdf)
# use SparkR::as.DataFrame to convert R data.frame to SparkR DataFrame.
spark_df <- as.DataFrame(Rdf)
class(spark_df)
head(spark_df)
DataFrame 作業
SparkR DataFrame 支援許多函式來執行結構化數據處理。 以下是一些基本範例。 您可以在 SparkR API 檔案中找到完整的清單。
選取數據列和數據行
# Select only the "waiting" column
head(select(df,df$waiting))
# Pass in column name as strings
head(select(df, "waiting"))
# Filter to only retain rows with waiting times longer than 70 mins
head(filter(df, df$waiting > 70))
群組和匯總
SparkR 數據框架支援許多常用函式,以在分組之後匯總數據。 例如,我們可以計算忠實數據集中等候時間的直方圖,如下所示
# we use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
# we can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))
資料行作業
SparkR 提供許多函式,可直接套用至數據行以進行數據處理和匯總。 下列範例顯示基本算術函數的使用。
# convert waiting time from hours to seconds.
# you can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)
套用使用者定義函式
SparkR 支援數種使用者定義函式:
在具有 dapply
或的大型數據集上執行函式 dapplyCollect
dapply
將函式套用至 的每個分割區 SparkDataFrame
。 要套用至 的每個分割區的 SparkDataFrame
函式,而且應該只有一個參數,data.frame 對應至每個數據分割的函式將會傳遞至該參數。 函式的輸出應該是 data.frame
。 架構會指定產生的 SparkDataFrame
數據列格式。 它必須符合 傳回值的數據類型 。
# convert waiting time from hours to seconds
df <- createDataFrame(faithful)
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
structField("waiting_secs", "double"))
# apply UDF to DataFrame
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))
dapplyCollect
就像 dapply 一樣,將函式套用至 的每個 SparkDataFrame
分割區,並將結果收集回來。 函式的輸出應該是 data.frame
。 但是,這次不需要傳遞架構。 請注意, dapplyCollect
如果函式的輸出無法在所有分割區上執行,則無法提取至驅動程式並放入驅動程式記憶體中,則可能會失敗。
# convert waiting time from hours to seconds
# apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
df,
function(x) {
x <- cbind(x, "waiting_secs" = x$waiting * 60)
})
head(ldf, 3)
在具有 或的輸入數據行分組 gapply
大型數據集上執行函式 gapplyCollect
gapply
將函式套用至 的每個群組 SparkDataFrame
。 函式會套用至 的每個群組, SparkDataFrame
而且應該只有兩個參數:將索引鍵和 R data.frame
分組至該索引鍵。 群組是從 column SparkDataFrames
(s) 中選擇的。 函式的輸出應該是 data.frame
。 架構會指定所產生 SparkDataFrame
的數據列格式。 它必須代表來自 Spark 資料類型的 R 函式輸出架構。 傳回 data.frame
的數據行名稱是由用戶設定。
# determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
},
schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))
gapplyCollect
如同 gapply
,會將函式套用至 的每個群組 SparkDataFrame
,並將結果收集回 R data.frame
。 函式的輸出應該是 data.frame
。 但是,不需要傳遞架構。 請注意, gapplyCollect
如果函式的輸出無法在所有分割區上執行,則無法提取至驅動程式並放入驅動程式記憶體中,則可能會失敗。
# determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
df,
"waiting",
function(key, x) {
y <- data.frame(key, max(x$eruptions))
colnames(y) <- c("waiting", "max_eruption")
y
})
head(result[order(result$max_eruption, decreasing = TRUE), ])
執行以spark.lapply散發的本機 R 函式
spark.lapply
類似於 lapply
在原生 R 中,在 spark.lapply
元素清單上執行函式,並使用 Spark 散發計算。 以類似 doParallel
或 lapply
清單元素的方式套用函式。 所有計算的結果都應該符合單一計算機。 如果情況並非如此,他們可以執行類似 df <- createDataFrame(list)
的事情,然後使用 dapply
。
# perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
summary(model)
}
# return a list of model's summaries
model.summaries <- spark.lapply(families, train)
# print the summary of each model
print(model.summaries)
從 SparkR 執行 SQL 查詢
SparkR DataFrame 也可以註冊為暫存檢視,可讓您對其數據執行 SQL 查詢。 sql 函式可讓應用程式以程式設計方式執行 SQL 查詢,並以 SparkR DataFrame 傳回結果。
# Register earlier df as temp view
createOrReplaceTempView(df, "eruptions")
# Create a df using a SparkSQL query
waiting <- sql("SELECT waiting FROM eruptions where waiting>70 ")
head(waiting)
機器學習
SparkR 會公開大部分的 MLLib 演算法。 在幕後,SparkR 會使用 MLlib 來定型模型。
下列範例示範如何使用 SparkR 建置 Gaussian GLM 模型。 若要執行線性回歸,請將系列設定為 "gaussian"
。 若要執行羅吉斯回歸,請將系列設定為 "binomial"
。 使用SparkML GLM
SparkR時,會自動執行類別特徵的單熱編碼,因此不需要手動完成。 除了 String 和 Double 類型功能之外,也可以配合 MLlib 向量功能,以與其他 MLlib 元件相容。
若要深入了解支援哪些機器學習演算法,您可以流覽 SparkR 和 MLlib 的檔。
# create the DataFrame
cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)
# fit a linear model over the dataset.
model <- spark.glm(carsDF, mpg ~ wt + cyl, family = "gaussian")
# model coefficients are returned in a similar format to R's native glm().
summary(model)
相關內容
意見反應
https://aka.ms/ContentUserFeedback。
即將登場:在 2024 年,我們將逐步淘汰 GitHub 問題作為內容的意見反應機制,並將它取代為新的意見反應系統。 如需詳細資訊,請參閱:提交並檢視相關的意見反應