แชร์ผ่าน


ใช้ Sparkr

SparkR เป็นแพ็คเกจ R ที่มี frontend น้ําหนักเบาเพื่อใช้ Apache Spark จาก R SparkR มีการใช้งานเฟรมข้อมูลที่กระจายที่รองรับการทํางานเช่น การเลือก การกรอง การรวม ฯลฯ SparkR ยังสนับสนุนการเรียนรู้ของเครื่องแบบกระจายโดยใช้ MLlib

ใช้ SparkR ผ่านข้อกําหนดชุดงาน Spark หรือกับสมุดบันทึก Microsoft Fabric แบบโต้ตอบ

การสนับสนุน R มีให้ใช้งานใน Spark3.1 หรือสูงกว่าเท่านั้น R ใน Spark 2.4 ไม่ได้รับการสนับสนุน

ข้อกำหนดเบื้องต้น

  • เปิดหรือสร้างสมุดบันทึก หากต้องการเรียนรู้วิธีการ ดู วิธีใช้สมุดบันทึก Microsoft Fabric

  • ตั้งค่าตัวเลือกภาษาเป็น SparkR (R) เพื่อเปลี่ยนภาษาหลัก

  • แนบสมุดบันทึกของคุณเข้ากับเลคเฮ้าส์ ทางด้านซ้าย เลือก เพิ่ม เพื่อเพิ่มเลคเฮาส์ที่มีอยู่ หรือเพื่อสร้างเลคเฮ้าส์

อ่านและเขียน SparkR DataFrames

อ่าน SparkR DataFrame จากเฟรมข้อมูล R ภายในเครื่อง

วิธีที่ง่ายที่สุดในการสร้าง DataFrame คือการแปลง R data.frame ภายในเครื่องเป็น Spark DataFrame

# load SparkR pacakge
library(SparkR)

# read a SparkR DataFrame from a local R data.frame
df <- createDataFrame(faithful)

# displays the content of the DataFrame
display(df)

อ่านและเขียน SparkR DataFrame จาก Lakehouse

ข้อมูลสามารถจัดเก็บในระบบไฟล์ภายในเครื่องของโหนดคลัสเตอร์ วิธีการทั่วไปในการอ่านและเขียน SparkR DataFrame จาก Lakehouse คือ read.df และwrite.df วิธีการเหล่านี้ใช้เส้นทางสําหรับไฟล์ที่จะโหลดและชนิดของแหล่งข้อมูล SparkR สนับสนุนการอ่านไฟล์ CSV, JSON, text และ Parquet โดยดั้งเดิม

หากต้องการอ่านและเขียนไปยังเลคเฮ้าส์ ก่อนอื่นให้เพิ่มชุดข้อมูลลงในเซสชั่นของคุณ ทางด้านซ้ายของสมุดบันทึก ให้เลือก เพิ่ม เพื่อเพิ่มเลคเฮ้าส์ที่มีอยู่แล้ว หรือสร้างเลคเฮ้าส์

หมายเหตุ

เมื่อต้องการเข้าถึงไฟล์ Lakehouse โดยใช้แพคเกจ Spark เช่น read.df หรือ write.dfใช้เส้นทาง ADFS หรือเส้นทางสัมพัทธ์สําหรับ Spark ใน Lakehouse explorer คลิกขวาที่ไฟล์หรือโฟลเดอร์ที่คุณต้องการเข้าถึงและคัดลอก เส้นทาง ADFS หรือ เส้นทางสัมพัทธ์สําหรับ Spark จากเมนูตามบริบท

# write data in CSV using relative path for Spark
temp_csv_spark<-"Files/data/faithful.csv"
write.df(df, temp_csv_spark ,source="csv", mode = "overwrite", header = "true")

# read data in CSV using relative path for Spark
faithfulDF_csv <- read.df(temp_csv_spark, source= "csv", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_csv)
# write data in parquet using ADFS path
temp_parquet_spark<-"abfss://xxx/xxx/data/faithful.parquet"
write.df(df, temp_parquet_spark ,source="parquet", mode = "overwrite", header = "true")

# read data in parquet uxing ADFS path
faithfulDF_pq <- read.df(temp_parquet_spark, source= "parquet", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_pq)

Microsoft Fabric ได้ tidyverse ติดตั้งไว้ล่วงหน้า คุณสามารถเข้าถึงไฟล์ Lakehouse ในแพคเกจ R ที่คุณคุ้นเคย เช่น การอ่านและการเขียนไฟล์ Lakehouse โดยใช้ readr::read_csv() และreadr::write_csv()

หมายเหตุ

หากต้องการเข้าถึงไฟล์ Lakehouse โดยใช้แพคเกจ R คุณจําเป็นต้องใช้ เส้นทาง API ไฟล์ ใน Lakehouse explorer คลิกขวาที่ไฟล์หรือโฟลเดอร์ที่คุณต้องการเข้าถึงและคัดลอกเส้นทาง API ของไฟล์จากเมนูตามบริบท

# read data in CSV using API path
# To find the path, navigate to the csv file, right click, and  Copy File API path.
temp_csv_api<-'/lakehouse/default/Files/data/faithful.csv/part-00000-d8e09a34-bd63-41bd-8cf8-f4ed2ef90e6c-c000.csv'
faithfulDF_API <- readr::read_csv(temp_csv_api)

# display the content of the R data.frame
head(faithfulDF_API)

คุณยังสามารถอ่าน SparkR Dataframe บนเลคเฮ้าส์ของคุณได้โดยใช้คิวรี SparkSQL

# Regsiter ealier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT * FROM eruptions")

head(waiting)

อ่านและเขียนตาราง SQL ผ่าน RODBC

ใช้ RODBC เพื่อเชื่อมต่อกับฐานข้อมูลตาม SQL ผ่านอินเทอร์เฟซ ODBC ตัวอย่างเช่น คุณสามารถเชื่อมต่อกับพูล SQL เฉพาะ Synapse ดังที่แสดงในโค้ดตัวอย่างต่อไปนี้ แทนที่รายละเอียดการเชื่อมต่อของคุณเองสําหรับ <database><uid>, <password>และ<table>

# load RODBC package
library(RODBC)


# config connection string

DriverVersion <- substr(system("apt list --installed *msodbc*", intern=TRUE, ignore.stderr=TRUE)[2],10,11)
ServerName <- "your-server-name"
DatabaseName <- "your-database-name"
Uid <- "your-user-id-list"
Password <- "your-password"

ConnectionString = sprintf("Driver={ODBC Driver %s for SQL Server};
Server=%s;
Database=%s;
Uid=%s;
Pwd=%s;
Encrypt=yes;
TrustServerCertificate=yes;
Connection Timeout=30;",DriverVersion,ServerName,DatabaseName,Uid,Password)
print(ConnectionString)


# connect to driver
channel <-odbcDriverConnect(ConnectionString)

# query from existing tables
Rdf <- sqlQuery(channel, "select * from <table>")
class(Rdf)

# use SparkR::as.DataFrame to convert R data.frame to SparkR DataFrame.
spark_df <- as.DataFrame(Rdf)
class(spark_df)
head(spark_df)

การดําเนินการ DataFrame

SparkR DataFrames สนับสนุนฟังก์ชันมากมายเพื่อทําการประมวลผลข้อมูลที่มีโครงสร้าง ต่อไปนี้คือตัวอย่างพื้นฐานบางส่วน สามารถพบรายการทั้งหมดได้ในเอกสาร SparkR API

เลือกแถวและคอลัมน์

# Select only the "waiting" column
head(select(df,df$waiting))
# Pass in column name as strings
head(select(df, "waiting"))
# Filter to only retain rows with waiting times longer than 70 mins
head(filter(df, df$waiting > 70))

การจัดกลุ่มและการรวม

เฟรมข้อมูล SparkR สนับสนุนฟังก์ชันที่ใช้กันทั่วไปมากมายเพื่อรวบรวมข้อมูลหลังจากการจัดกลุ่ม ตัวอย่างเช่น เราสามารถคํานวณฮิสโทแกรมของเวลารอในชุดข้อมูลที่ซื่อสัตย์ดังที่แสดงด้านล่าง

# we use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
# we can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))

การดําเนินการคอลัมน์

SparkR มีฟังก์ชันมากมายที่สามารถนําไปใช้กับคอลัมน์สําหรับการประมวลผลข้อมูลและการรวมข้อมูลได้โดยตรง ตัวอย่างต่อไปนี้แสดงการใช้ฟังก์ชันทางคณิตศาสตร์พื้นฐาน

# convert waiting time from hours to seconds.
# you can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)

ใช้ฟังก์ชันที่ผู้ใช้กําหนดเอง

SparkR สนับสนุนฟังก์ชันที่ผู้ใช้กําหนดเองหลายชนิด:

เรียกใช้ฟังก์ชันบนชุดข้อมูลขนาดใหญ่ที่มี dapply หรือ dapplyCollect

dapply

นําฟังก์ชันไปใช้กับแต่ละพาร์ติชันของSparkDataFrame ฟังก์ชันที่จะนําไปใช้กับแต่ละพาร์ติชันของ SparkDataFrame และ ควรมีพารามิเตอร์เดียวเท่านั้น ซึ่ง data.frame สอดคล้องกับแต่ละพาร์ติชันจะถูกส่งผ่าน ผลลัพธ์ของฟังก์ชันควรเป็นdata.frame Schema ระบุรูปแบบแถวของผลลัพธ์SparkDataFrame ซึ่งต้องตรงกับ ชนิดข้อมูล ของค่าที่ส่งกลับ

# convert waiting time from hours to seconds
df <- createDataFrame(faithful)
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
                     structField("waiting_secs", "double"))

# apply UDF to DataFrame
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))

dapplyCollect

เช่นเดียวกับ dapply ใช้ฟังก์ชันกับแต่ละพาร์ติชันของ SparkDataFrame และรวบรวมผลลัพธ์กลับมา ผลลัพธ์ของฟังก์ชันควรเป็นdata.frame แต่คราวนี้ ไม่จําเป็นต้องส่งสคีมา โปรดทราบว่า dapplyCollect อาจล้มเหลวถ้าเอาต์พุตของฟังก์ชันทํางานบนพาร์ติชันทั้งหมดไม่สามารถดึงเข้าไปยังโปรแกรมควบคุมได้และพอดีกับหน่วยความจําโปรแกรมควบคุม

# convert waiting time from hours to seconds
# apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
         df,
         function(x) {
           x <- cbind(x, "waiting_secs" = x$waiting * 60)
         })
head(ldf, 3)

เรียกใช้ฟังก์ชันในการจัดกลุ่มชุดข้อมูลขนาดใหญ่ตามคอลัมน์ข้อมูลป้อนเข้าที่มี gapply หรือ gapplyCollect

gapply

นําฟังก์ชันไปใช้กับแต่ละกลุ่มของSparkDataFrame ฟังก์ชันจะถูกนําไปใช้กับแต่ละกลุ่มของ SparkDataFrame และ ควรมีพารามิเตอร์เพียงสองตัวเท่านั้น: การจัดกลุ่มคีย์ และ R data.frame ที่สอดคล้องกับคีย์นั้น กลุ่มจะถูกเลือกจาก SparkDataFrames คอลัมน์ ผลลัพธ์ของฟังก์ชันควรเป็นdata.frame Schema ระบุรูปแบบแถวของผลลัพธ์SparkDataFrame ซึ่งต้องแสดง Schema ผลลัพธ์ของฟังก์ชัน R จากชนิดข้อมูล Spark ชื่อคอลัมน์ของ ผลลัพธ์ data.frame จะถูกตั้งค่าโดยผู้ใช้

# determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
    },
    schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))

gapplyCollect

เช่น gapplyใช้ฟังก์ชันกับแต่ละกลุ่มของ SparkDataFrame และรวบรวมผลลัพธ์กลับไปยัง R data.frame ผลลัพธ์ของฟังก์ชันควรเป็นdata.frame แต่ไม่จําเป็นต้องส่งผ่านสคีมา โปรดทราบว่า gapplyCollect อาจล้มเหลวถ้าเอาต์พุตของฟังก์ชันทํางานบนพาร์ติชันทั้งหมดไม่สามารถดึงเข้าไปยังโปรแกรมควบคุมได้และพอดีกับหน่วยความจําโปรแกรมควบคุม

# determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
        colnames(y) <- c("waiting", "max_eruption")
        y
    })
head(result[order(result$max_eruption, decreasing = TRUE), ])

เรียกใช้ฟังก์ชัน R ภายในเครื่องที่กระจายด้วย spark.lapply

spark.lapply

คล้ายกับ lapply ใน R spark.lapply แบบเนทีฟ เรียกใช้ฟังก์ชันในรายการองค์ประกอบและกระจายการคํานวณด้วย Spark ใช้ฟังก์ชันในลักษณะที่คล้ายกับ doParallel หรือ lapply กับองค์ประกอบของรายการ ผลลัพธ์ของการคํานวณทั้งหมดควรพอดีกับเครื่องเดียว ถ้าไม่ใช่กรณีนี้ พวกเขาสามารถทําบางอย่างเช่น df <- createDataFrame(list) แล้วใช้dapply

# perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
  model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
  summary(model)
}
# return a list of model's summaries
model.summaries <- spark.lapply(families, train)

# print the summary of each model
print(model.summaries)

เรียกใช้คิวรี SQL จาก SparkR

นอกจากนี้ SparkR DataFrame ยังสามารถลงทะเบียนเป็นมุมมองชั่วคราวที่ช่วยให้คุณสามารถเรียกใช้คิวรี SQL ผ่านข้อมูลได้ ฟังก์ชัน sql ช่วยให้แอปพลิเคชันสามารถเรียกใช้คิวรี SQL ทางโปรแกรมและส่งกลับผลลัพธ์เป็น SparkR DataFrame ได้

# Register earlier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT waiting FROM eruptions where waiting>70 ")

head(waiting)

การเรียนรู้ของเครื่อง

SparkR เปิดอัลกอริทึม MLLib ส่วนใหญ่ ภายใต้ประทุน SparkR ใช้ MLlib เพื่อฝึกแบบจําลอง

ตัวอย่างต่อไปนี้แสดงวิธีการสร้างแบบจําลอง Gaussian GLM โดยใช้ SparkR เมื่อต้องการเรียกใช้การถดถอยเชิงเส้น ให้ตั้งค่าตระกูลเป็น"gaussian" หากต้องการเรียกใช้การถดถอยโลจิสติกส์ ให้ตั้งค่าตระกูลเป็น"binomial" เมื่อใช้ SparkML GLM SparkR จะทําการเข้ารหัสคุณลักษณะตามประเภทหนึ่งอย่างโดยอัตโนมัติเพื่อให้ไม่จําเป็นต้องทําด้วยตนเอง นอกเหนือจากคุณสมบัติสตริงและประเภทคู่ยังสามารถพอดีกับคุณลักษณะเวกเตอร์ MLlib สําหรับความเข้ากันได้กับส่วนประกอบ MLlib อื่น ๆ

หากต้องการเรียนรู้เพิ่มเติมเกี่ยวกับอัลกอริทึมการเรียนรู้ของเครื่องที่รองรับ คุณสามารถไปที่ เอกสารประกอบสําหรับ SparkR และ MLlib ได้

# create the DataFrame
cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)

# fit a linear model over the dataset.
model <- spark.glm(carsDF, mpg ~ wt + cyl, family = "gaussian")

# model coefficients are returned in a similar format to R's native glm().
summary(model)