分享方式:


使用sparklyr

sparklyr 是 Apache Spark 的 R 介面。 它提供使用熟悉 R 介面與 Spark 互動的機制。 您可以透過Spark批次作業定義或互動式 Microsoft Fabric 筆記本來使用sparklyr。

sparklyr會與其他 tidyverse 套件一起使用,例如 dplyr。 Microsoft Fabric 會在每個運行時間版本中散發最新穩定版本的 sparklyr 和 tidyverse。 您可以匯入它們,並開始使用 API。

必要條件

  • 開啟或建立筆記本。 若要瞭解如何,請參閱 如何使用 Microsoft Fabric 筆記本

  • 將語言選項設定為 SparkR (R) 以變更主要語言。

  • 將筆記本附加至 Lakehouse。 在左側,選取 [新增 ] 以新增現有的 Lakehouse 或建立 Lakehouse。

將sparklyr 連線至 Synapse Spark 叢集

在中使用 spark_connect() 下列連接方法來建立 sparklyr 連接。 我們支持稱為 synapse的新連接方法,可讓您連線到現有的Spark會話。 這可大幅減少 sparklyr 會話開始時間。 此外,我們已將此連線方法貢獻給 開放原始碼 sparklyr 專案。 透過 method = "synapse",您可以在相同的會話中使用 sparklyrSparkR ,並輕鬆地 在它們之間共享數據。

# connect sparklyr to your spark cluster
spark_version <- sparkR.version()
config <- spark_config()
sc <- spark_connect(master = "yarn", version = spark_version, spark_home = "/opt/spark", method = "synapse", config = config)

使用sparklyr讀取數據

新的Spark會話不包含任何數據。 第一個步驟是將數據載入 Spark 工作階段的記憶體,或將 Spark 指向資料的位置,以便依需求存取數據。

# load the sparklyr package
library(sparklyr)

# copy data from R environment to the Spark session's memory
mtcars_tbl <- copy_to(sc, mtcars, "spark_mtcars", overwrite = TRUE)

head(mtcars_tbl)

使用 sparklyr,您也可以 write 使用ABFS路徑,從Lakehouse檔案取得 read 數據。 若要讀取和寫入 Lakehouse,請先將其新增至您的工作階段。 在筆記本左側,選取 [新增 ] 以新增現有的 Lakehouse 或建立 Lakehouse。

若要尋找 ABFS 路徑,請以滑鼠右鍵按兩下 Lakehouse 中的 [檔案 ] 資料夾,然後選取 [ 複製ABFS 路徑]。 貼上您的路徑以在此程式代碼中取代 abfss://xxxx@onelake.dfs.fabric.microsoft.com/xxxx/Files

temp_csv = "abfss://xxxx@onelake.dfs.fabric.microsoft.com/xxxx/Files/data/mtcars.csv"

# write the table to your lakehouse using the ABFS path
spark_write_csv(mtcars_tbl, temp_csv, header = TRUE, mode = 'overwrite')

# read the data as CSV from lakehouse using the ABFS path
mtcarsDF <- spark_read_csv(sc, temp_csv) 
head(mtcarsDF)

使用sparklyr操作數據

sparklyr 使用下列方法提供多個方法來處理 Spark 內的數據:

  • dplyr 命令
  • SparkSQL
  • Spark 的功能轉換器

使用 dplyr

您可以使用熟悉 dplyr 的命令在 Spark 內準備數據。 命令會在 Spark 內執行,因此 R 與 Spark 之間沒有不必要的資料傳輸。

按兩下 操作數據 搭配dplyr,以查看搭配Spark使用 dplyr的額外檔。

# count cars by the number of cylinders the engine contains (cyl), order the results descendingly
library(dplyr)

cargroup <- group_by(mtcars_tbl, cyl) %>%
  count() %>%
  arrange(desc(n))

cargroup

sparklyr 並將 dplyr R 命令轉譯為適用於我們的 Spark SQL。 若要檢視產生的查詢,請使用 show_query()

# show the dplyr commands that are to run against the Spark connection
dplyr::show_query(cargroup)

使用 SQL

您也可以直接對 Spark 叢集中的數據表執行 SQL 查詢。 物件 spark_connection() 會實作 Spark 的 DBI 介面,因此您可以使用 dbGetQuery() 來執行 SQL,並以 R 數據框架傳回結果:

library(DBI)
dbGetQuery(sc, "select cyl, count(*) as n from spark_mtcars
GROUP BY cyl
ORDER BY n DESC")

使用功能轉換器

上述兩種方法都依賴 SQL 語句。 Spark 提供命令,讓某些資料轉換更方便,而且不需要使用 SQL。

例如, ft_binarizer() 命令可簡化新數據行的建立,指出另一個數據行的值是否高於特定臨界值。

您可以從參考 -FT 找到可用的 sparklyrSpark 功能轉換器完整清單。

mtcars_tbl %>% 
  ft_binarizer("mpg", "over_20", threshold = 20) %>% 
  select(mpg, over_20) %>% 
  head(5)

在和 之間 sparklyr 共享數據 SparkR

當您使用method = "synapse"連線sparklyr到 synapse spark 叢集時,可以在相同的工作階段中使用 sparklyrSparkR ,並輕鬆地在它們之間共用數據。 您可以在 中 sparklyr 建立 Spark 數據表,並從 讀取它 SparkR

# load the sparklyr package
library(sparklyr)

# Create table in `sparklyr`
mtcars_sparklyr <- copy_to(sc, df = mtcars, name = "mtcars_tbl", overwrite = TRUE, repartition = 3L)

# Read table from `SparkR`
mtcars_sparklr <- SparkR::sql("select cyl, count(*) as n
from mtcars_tbl
GROUP BY cyl
ORDER BY n DESC")

head(mtcars_sparklr)

機器學習

以下是我們用來 ml_linear_regression() 配合線性回歸模型的範例。 我們使用內mtcars建數據集,並查看我們是否可以根據汽車重量()預測汽車的油耗(mpgwt),以及發動機包含的汽缸數目(cyl)。 我們假設在每個案例中,和每個特徵之間的 mpg 關聯性都是線性的。

產生測試和定型數據集

使用分割,70% 用於定型,30% 用於測試模型。 玩這個比例會導致不同的模型。

# split the dataframe into test and training dataframes

partitions <- mtcars_tbl %>%
  select(mpg, wt, cyl) %>% 
  sdf_random_split(training = 0.7, test = 0.3, seed = 2023)

定型模型

將羅吉斯回歸模型定型。

fit <- partitions$training %>%
  ml_linear_regression(mpg ~ .)

fit

現在,使用 summary() 來深入瞭解模型的品質,以及每個預測值的統計意義。

summary(fit)

使用模型

您可以呼叫 ml_predict(),在測試數據集上套用模型。

pred <- ml_predict(fit, partitions$test)

head(pred)

如需可透過sparklyr取得的SparkML模型清單,請造訪 參考 - ML

中斷與Spark叢集的連線

您可以呼叫 spark_disconnect() 或選取 筆記本功能區頂端的 [停止會話 ] 按鈕,結束 Spark 會話。

spark_disconnect(sc)

深入瞭解 R 功能: