共用方式為


使用SparkR

SparkR 是一種 R 套件,提供輕量前端以使用來自 R 的 Apache Spark。SparkR 提供分散式數據框架實作,支持選取、篩選、匯總等作業。SparkR 也支援使用 MLlib 的分散式機器學習。

透過 Spark 批次作業定義或互動式 Microsoft Fabric 筆記本使用 SparkR。

R 支援僅適用於 Spark3.1 或更新版本。 不支援Spark 2.4中的 R。

必要條件

  • 開啟或建立筆記本。 若要瞭解如何,請參閱 如何使用 Microsoft Fabric 筆記本

  • 將語言選項設定為 SparkR (R) 以變更主要語言。

  • 將筆記本附加至 Lakehouse。 在左側,選取 [新增 ] 以新增現有的 Lakehouse 或建立 Lakehouse。

讀取和寫入 SparkR DataFrame

從本機 R data.frame 讀取 SparkR DataFrame

建立 DataFrame 最簡單的方式是將本機 R data.frame 轉換成 Spark DataFrame。

# load SparkR pacakge
library(SparkR)

# read a SparkR DataFrame from a local R data.frame
df <- createDataFrame(faithful)

# displays the content of the DataFrame
display(df)

從 Lakehouse 讀取和寫入 SparkR DataFrame

數據可以儲存在叢集節點的本機文件系統上。 從 Lakehouse read.df 讀取與寫入 SparkR DataFrame 的一般方法是 和 write.df。 這些方法會採用檔案要載入的路徑,以及數據源的類型。 SparkR 支援原生讀取 CSV、JSON、文字和 Parquet 檔案。

若要讀取和寫入 Lakehouse,請先將其新增至您的工作階段。 在筆記本左側,選取 [新增 ] 以新增現有的 Lakehouse 或建立 Lakehouse。

注意

若要使用 Spark 套件存取 Lakehouse 檔案,例如 read.dfwrite.df,請使用其 ADFS 路徑Spark 的相對路徑。 在 Lakehouse 總管中,以滑鼠右鍵按下您想要存取的檔案或資料夾,並從內容功能表複製 Spark 的 ADFS 路徑相對路徑。

# write data in CSV using relative path for Spark
temp_csv_spark<-"Files/data/faithful.csv"
write.df(df, temp_csv_spark ,source="csv", mode = "overwrite", header = "true")

# read data in CSV using relative path for Spark
faithfulDF_csv <- read.df(temp_csv_spark, source= "csv", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_csv)
# write data in parquet using ADFS path
temp_parquet_spark<-"abfss://xxx/xxx/data/faithful.parquet"
write.df(df, temp_parquet_spark ,source="parquet", mode = "overwrite", header = "true")

# read data in parquet uxing ADFS path
faithfulDF_pq <- read.df(temp_parquet_spark, source= "parquet", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_pq)

Microsoft Fabric 已 tidyverse 預安裝。 您可以在熟悉的 R 套件中存取 Lakehouse 檔案,例如使用 readr::read_csv()readr::write_csv()讀取和寫入 Lakehouse 檔案。

注意

若要使用 R 套件存取 Lakehouse 檔案,您必須使用 檔案 API 路徑。 在 Lakehouse 總管中,以滑鼠右鍵按下您想要存取的檔案或資料夾,並從內容功能表複製其 檔案 API 路徑

# read data in CSV using API path
# To find the path, navigate to the csv file, right click, and  Copy File API path.
temp_csv_api<-'/lakehouse/default/Files/data/faithful.csv/part-00000-d8e09a34-bd63-41bd-8cf8-f4ed2ef90e6c-c000.csv'
faithfulDF_API <- readr::read_csv(temp_csv_api)

# display the content of the R data.frame
head(faithfulDF_API)

您也可以使用 SparkSQL 查詢,在 Lakehouse 上讀取 SparkR 數據框架。

# Regsiter ealier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT * FROM eruptions")

head(waiting)

透過 RODBC 讀取和寫入 SQL 資料表

使用 RODBC 透過 ODBC 介面連線到 SQL 資料庫。 例如,您可以連線到 Synapse 專用 SQL 集區,如下列範例程式代碼所示。 將您自己的連線詳細資料取代為 <database><uid><password><table>

# load RODBC package
library(RODBC)


# config connection string

DriverVersion <- substr(system("apt list --installed *msodbc*", intern=TRUE, ignore.stderr=TRUE)[2],10,11)
ServerName <- "your-server-name"
DatabaseName <- "your-database-name"
Uid <- "your-user-id-list"
Password <- "your-password"

ConnectionString = sprintf("Driver={ODBC Driver %s for SQL Server};
Server=%s;
Database=%s;
Uid=%s;
Pwd=%s;
Encrypt=yes;
TrustServerCertificate=yes;
Connection Timeout=30;",DriverVersion,ServerName,DatabaseName,Uid,Password)
print(ConnectionString)


# connect to driver
channel <-odbcDriverConnect(ConnectionString)

# query from existing tables
Rdf <- sqlQuery(channel, "select * from <table>")
class(Rdf)

# use SparkR::as.DataFrame to convert R data.frame to SparkR DataFrame.
spark_df <- as.DataFrame(Rdf)
class(spark_df)
head(spark_df)

DataFrame 作業

SparkR DataFrame 支援許多函式來執行結構化數據處理。 以下是一些基本範例。 您可以在 SparkR API 檔案中找到完整的清單。

選取數據列和數據行

# Select only the "waiting" column
head(select(df,df$waiting))
# Pass in column name as strings
head(select(df, "waiting"))
# Filter to only retain rows with waiting times longer than 70 mins
head(filter(df, df$waiting > 70))

群組和匯總

SparkR 數據框架支援許多常用函式,以在分組之後匯總數據。 例如,我們可以計算忠實數據集中等候時間的直方圖,如下所示

# we use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
# we can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))

資料行作業

SparkR 提供許多函式,可直接套用至數據行以進行數據處理和匯總。 下列範例顯示基本算術函數的使用。

# convert waiting time from hours to seconds.
# you can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)

套用使用者定義函式

SparkR 支援數種使用者定義函式:

在具有 dapply 或的大型數據集上執行函式 dapplyCollect

dapply

將函式套用至 的每個分割區 SparkDataFrame。 要套用至 的每個分割區的 SparkDataFrame 函式,而且應該只有一個參數,data.frame 對應至每個數據分割的函式將會傳遞至該參數。 函式的輸出應該是 data.frame。 架構會指定產生的 SparkDataFrame數據列格式。 它必須符合 傳回值的數據類型

# convert waiting time from hours to seconds
df <- createDataFrame(faithful)
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
                     structField("waiting_secs", "double"))

# apply UDF to DataFrame
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))

dapplyCollect

就像 dapply 一樣,將函式套用至 的每個 SparkDataFrame 分割區,並將結果收集回來。 函式的輸出應該是 data.frame。 但是,這次不需要傳遞架構。 請注意, dapplyCollect 如果函式的輸出無法在所有分割區上執行,則無法提取至驅動程式並放入驅動程式記憶體中,則可能會失敗。

# convert waiting time from hours to seconds
# apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
         df,
         function(x) {
           x <- cbind(x, "waiting_secs" = x$waiting * 60)
         })
head(ldf, 3)

在具有 或的輸入數據行分組 gapply 大型數據集上執行函式 gapplyCollect

gapply

將函式套用至 的每個群組 SparkDataFrame。 函式會套用至 的每個群組, SparkDataFrame 而且應該只有兩個參數:將索引鍵和 R data.frame 分組至該索引鍵。 群組是從 column SparkDataFrames (s) 中選擇的。 函式的輸出應該是 data.frame。 架構會指定所產生 SparkDataFrame的數據列格式。 它必須代表來自 Spark 資料類型的 R 函式輸出架構。 傳回 data.frame 的數據行名稱是由用戶設定。

# determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
    },
    schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))

gapplyCollect

如同 gapply,會將函式套用至 的每個群組 SparkDataFrame ,並將結果收集回 R data.frame。 函式的輸出應該是 data.frame。 但是,不需要傳遞架構。 請注意, gapplyCollect 如果函式的輸出無法在所有分割區上執行,則無法提取至驅動程式並放入驅動程式記憶體中,則可能會失敗。

# determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
        colnames(y) <- c("waiting", "max_eruption")
        y
    })
head(result[order(result$max_eruption, decreasing = TRUE), ])

執行以spark.lapply散發的本機 R 函式

spark.lapply

類似於 lapply 在原生 R 中,在 spark.lapply 元素清單上執行函式,並使用 Spark 散發計算。 以類似 doParallellapply 清單元素的方式套用函式。 所有計算的結果都應該符合單一計算機。 如果情況並非如此,他們可以執行類似 df <- createDataFrame(list) 的事情,然後使用 dapply

# perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
  model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
  summary(model)
}
# return a list of model's summaries
model.summaries <- spark.lapply(families, train)

# print the summary of each model
print(model.summaries)

從 SparkR 執行 SQL 查詢

SparkR DataFrame 也可以註冊為暫存檢視,可讓您對其數據執行 SQL 查詢。 sql 函式可讓應用程式以程式設計方式執行 SQL 查詢,並以 SparkR DataFrame 傳回結果。

# Register earlier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT waiting FROM eruptions where waiting>70 ")

head(waiting)

機器學習

SparkR 會公開大部分的 MLLib 演算法。 在幕後,SparkR 會使用 MLlib 來定型模型。

下列範例示範如何使用 SparkR 建置 Gaussian GLM 模型。 若要執行線性回歸,請將系列設定為 "gaussian"。 若要執行羅吉斯回歸,請將系列設定為 "binomial"。 使用SparkML GLM SparkR時,會自動執行類別特徵的單熱編碼,因此不需要手動完成。 除了 String 和 Double 類型功能之外,也可以配合 MLlib 向量功能,以與其他 MLlib 元件相容。

若要深入了解支援哪些機器學習演算法,您可以流覽 SparkR 和 MLlib 的檔。

# create the DataFrame
cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)

# fit a linear model over the dataset.
model <- spark.glm(carsDF, mpg ~ wt + cyl, family = "gaussian")

# model coefficients are returned in a similar format to R's native glm().
summary(model)