Κοινή χρήση μέσω


Χρήση SparkR

Το SparkR είναι ένα πακέτο R που παρέχει ένα ελαφρύ προσκήνιο για τη χρήση του Apache Spark από την R. SparkR παρέχει μια υλοποίηση κατανεμημένων πλαισίων δεδομένων που υποστηρίζει λειτουργίες όπως επιλογή, φιλτράρισμα, συνάθροιση κ.λπ. Το SparkR υποστηρίζει επίσης κατανεμημένη εκμάθηση μηχανής με χρήση MLlib.

Χρησιμοποιήστε το SparkR μέσω ορισμών εργασίας δέσμης Spark ή με αλληλεπιδραστικά σημειωματάρια Microsoft Fabric.

Η υποστήριξη R είναι διαθέσιμη μόνο στο Spark3.1 ή νεότερη έκδοση. Η R στο Spark 2.4 δεν υποστηρίζεται.

Προαπαιτούμενα στοιχεία

  • Λάβετε μια συνδρομή Microsoft Fabric. Εναλλακτικά, εγγραφείτε για μια δωρεάν δοκιμαστική έκδοση του Microsoft Fabric.

  • Εισέλθετε στο Microsoft Fabric.

  • Χρησιμοποιήστε την εναλλαγή εμπειρίας στην αριστερή πλευρά της αρχικής σελίδας σας για να μεταβείτε στην εμπειρία Synapse Data Science.

    Screenshot of the experience switcher menu, showing where to select Data Science.

  • Ανοίξτε ή δημιουργήστε ένα σημειωματάριο. Για να μάθετε τον τρόπο, ανατρέξτε στο θέμα Τρόπος χρήσης σημειωματάριων Microsoft Fabric.

  • Ορίστε την επιλογή γλώσσας σε SparkR (R) για να αλλάξετε την κύρια γλώσσα.

  • Επισυνάψτε το σημειωματάριό σας σε ένα lakehouse. Στην αριστερή πλευρά, επιλέξτε Προσθήκη για να προσθέσετε μια υπάρχουσα λίμνη ή για να δημιουργήσετε μια λίμνη.

Ανάγνωση και εγγραφή sparkr dataFrames

Διαβάστε ένα SparkR DataFrame από ένα τοπικό πλαίσιο δεδομένων R

Ο απλούστερος τρόπος για να δημιουργήσετε ένα DataFrame είναι να μετατρέψετε ένα τοπικό data.frame R σε ένα Spark DataFrame.

# load SparkR pacakge
library(SparkR)

# read a SparkR DataFrame from a local R data.frame
df <- createDataFrame(faithful)

# displays the content of the DataFrame
display(df)

Ανάγνωση και εγγραφή του SparkR DataFrame από το Lakehouse

Τα δεδομένα μπορούν να αποθηκευτούν στο τοπικό σύστημα αρχείων των κόκκων συμπλέγματος. Οι γενικές μέθοδοι ανάγνωσης και εγγραφής ενός SparkR DataFrame από το Lakehouse είναι read.df και write.df. Αυτές οι μέθοδοι λαμβάνουν τη διαδρομή για τη φόρτωση του αρχείου και τον τύπο της προέλευσης δεδομένων. Το SparkR υποστηρίζει εγγενώς την ανάγνωση αρχείων CSV, JSON, κειμένου και Parquet.

Για να διαβάσετε και γράψετε σε ένα Lakehouse, προσθέστε το πρώτα στην περίοδο λειτουργίας σας. Στην αριστερή πλευρά του σημειωματάριου, επιλέξτε Προσθήκη για να προσθέσετε ένα υπάρχον Lakehouse ή να δημιουργήσετε ένα Lakehouse.

Σημείωμα

Για να αποκτήσετε πρόσβαση σε αρχεία Lakehouse χρησιμοποιώντας πακέτα Spark, όπως read.df ή write.df, χρησιμοποιήστε τη διαδρομή ADFS ή τη σχετική διαδρομή για το Spark. Στην εξερεύνηση Lakehouse, κάντε δεξί κλικ στα αρχεία ή τον φάκελο στον οποίο θέλετε να αποκτήσετε πρόσβαση και να αντιγράψετε τη διαδρομή του ADFS ή τη σχετική διαδρομή του για το Spark από το μενού περιβάλλοντος.

# write data in CSV using relative path for Spark
temp_csv_spark<-"Files/data/faithful.csv"
write.df(df, temp_csv_spark ,source="csv", mode = "overwrite", header = "true")

# read data in CSV using relative path for Spark
faithfulDF_csv <- read.df(temp_csv_spark, source= "csv", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_csv)
# write data in parquet using ADFS path
temp_parquet_spark<-"abfss://xxx/xxx/data/faithful.parquet"
write.df(df, temp_parquet_spark ,source="parquet", mode = "overwrite", header = "true")

# read data in parquet uxing ADFS path
faithfulDF_pq <- read.df(temp_parquet_spark, source= "parquet", header = "true", inferSchema = "true")

# displays the content of the DataFrame
display(faithfulDF_pq)

Το Microsoft Fabric έχει tidyverse προεγκατεστημένες. Μπορείτε να αποκτήσετε πρόσβαση σε αρχεία Lakehouse στα γνωστά πακέτα R σας, όπως ανάγνωση και εγγραφή αρχείων Lakehouse χρησιμοποιώντας readr::read_csv() και readr::write_csv().

Σημείωμα

Για να αποκτήσετε πρόσβαση σε αρχεία Lakehouse χρησιμοποιώντας πακέτα R, πρέπει να χρησιμοποιήσετε τη διαδρομή API αρχείου. Στην εξερεύνηση Lakehouse, κάντε δεξί κλικ στο αρχείο ή στον φάκελο στον οποίο θέλετε να αποκτήσετε πρόσβαση και να αντιγράψετε τη διαδρομή του API αρχείου από το μενού περιβάλλοντος.

# read data in CSV using API path
# To find the path, navigate to the csv file, right click, and  Copy File API path.
temp_csv_api<-'/lakehouse/default/Files/data/faithful.csv/part-00000-d8e09a34-bd63-41bd-8cf8-f4ed2ef90e6c-c000.csv'
faithfulDF_API <- readr::read_csv(temp_csv_api)

# display the content of the R data.frame
head(faithfulDF_API)

Μπορείτε επίσης να διαβάσετε ένα πλαίσιο δεδομένων SparkR στο Lakehouse σας, χρησιμοποιώντας ερωτήματα SparkSQL.

# Regsiter ealier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT * FROM eruptions")

head(waiting)

Ανάγνωση και εγγραφή πινάκων SQL μέσω του RODBC

Χρησιμοποιήστε το RODBC για να συνδεθείτε σε βάσεις δεδομένων SQL μέσω μιας διασύνδεσης ODBC. Για παράδειγμα, μπορείτε να συνδεθείτε σε έναν αποκλειστικό χώρο συγκέντρωσης SQL Synapse, όπως φαίνεται στο παρακάτω παράδειγμα κώδικα. Αντικαταστήστε τις δικές σας λεπτομέρειες σύνδεσης για <database>τα , <uid>, <password>και <table>.

# load RODBC package
library(RODBC)


# config connection string

DriverVersion <- substr(system("apt list --installed *msodbc*", intern=TRUE, ignore.stderr=TRUE)[2],10,11)
ServerName <- "your-server-name"
DatabaseName <- "your-database-name"
Uid <- "your-user-id-list"
Password <- "your-password"

ConnectionString = sprintf("Driver={ODBC Driver %s for SQL Server};
Server=%s;
Database=%s;
Uid=%s;
Pwd=%s;
Encrypt=yes;
TrustServerCertificate=yes;
Connection Timeout=30;",DriverVersion,ServerName,DatabaseName,Uid,Password)
print(ConnectionString)


# connect to driver
channel <-odbcDriverConnect(ConnectionString)

# query from existing tables
Rdf <- sqlQuery(channel, "select * from <table>")
class(Rdf)

# use SparkR::as.DataFrame to convert R data.frame to SparkR DataFrame.
spark_df <- as.DataFrame(Rdf)
class(spark_df)
head(spark_df)

Λειτουργίες DataFrame

Τα SparkR DataFrames υποστηρίζουν πολλές συναρτήσεις για την επεξεργασία δομημένων δεδομένων. Ακολουθούν ορισμένα βασικά παραδείγματα. Μπορείτε να βρείτε μια πλήρη λίστα στα έγγραφα του SparkR API.

Επιλογή γραμμών και στηλών

# Select only the "waiting" column
head(select(df,df$waiting))
# Pass in column name as strings
head(select(df, "waiting"))
# Filter to only retain rows with waiting times longer than 70 mins
head(filter(df, df$waiting > 70))

Ομαδοποίηση και συνάθροιση

Τα πλαίσια δεδομένων SparkR υποστηρίζουν πολλές συναρτήσεις που χρησιμοποιούνται συχνά για τη συγκέντρωση δεδομένων μετά την ομαδοποίηση. Για παράδειγμα, μπορούμε να υπολογίσουμε ένα ιστόγραμμα του χρόνου αναμονής στο πιστό σύνολο δεδομένων, όπως φαίνεται παρακάτω

# we use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
# we can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))

Λειτουργίες στήλης

Το SparkR παρέχει πολλές συναρτήσεις που μπορούν να εφαρμοστούν απευθείας σε στήλες για επεξεργασία δεδομένων και συνάθροιση. Το παρακάτω παράδειγμα δείχνει τη χρήση βασικών αριθμητικών συναρτήσεων.

# convert waiting time from hours to seconds.
# you can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)

Εφαρμογή συνάρτησης που ορίζεται από τον χρήστη

Το SparkR υποστηρίζει πολλά είδη συναρτήσεων που ορίζονται από τον χρήστη:

Εκτελέστε μια συνάρτηση σε ένα μεγάλο σύνολο δεδομένων με dapply ή dapplyCollect

dapply

Εφαρμόστε μια συνάρτηση σε κάθε διαμέρισμα ενός SparkDataFrame. Η συνάρτηση που θα εφαρμοστεί σε κάθε διαμέρισμα του SparkDataFrame και θα πρέπει να έχει μόνο μία παράμετρο, στην οποία ένα data.frame αντιστοιχεί σε κάθε διαμέρισμα. Η έξοδος της συνάρτησης πρέπει να είναι μια data.frame. Το σχήμα καθορίζει τη μορφή γραμμής του αποτελέσματος ενός SparkDataFrame. Πρέπει να συμφωνεί με τους τύπους δεδομένων της επιστρεφόμενας τιμής.

# convert waiting time from hours to seconds
df <- createDataFrame(faithful)
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
                     structField("waiting_secs", "double"))

# apply UDF to DataFrame
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))

dapplyCollect

Όπως με το dapply, εφαρμόστε μια συνάρτηση σε κάθε διαμέρισμα ενός SparkDataFrame και συγκεντρώστε ξανά το αποτέλεσμα. Το αποτέλεσμα της συνάρτησης πρέπει να είναι ένα data.frame. Ωστόσο, αυτήν τη φορά, το σχήμα δεν είναι απαραίτητο να μεταβιβαστεί. Σημειώστε ότι μπορεί dapplyCollect να αποτύχει εάν οι έξοδοι της συνάρτησης εκτελούνται σε όλο το διαμέρισμα δεν είναι δυνατό να ελαχθούν στο πρόγραμμα οδήγησης και να χωρέσουν στη μνήμη του προγράμματος οδήγησης.

# convert waiting time from hours to seconds
# apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
         df,
         function(x) {
           x <- cbind(x, "waiting_secs" = x$waiting * 60)
         })
head(ldf, 3)

Εκτελέστε μια συνάρτηση σε ένα μεγάλο σύνολο δεδομένων, ομαδοποιώντας κατά στήλες εισόδου με gapply ή gapplyCollect

gapply

Εφαρμόστε μια συνάρτηση σε κάθε ομάδα ενός SparkDataFrame. Η συνάρτηση πρέπει να εφαρμοστεί σε κάθε ομάδα του SparkDataFrame και θα πρέπει να έχει μόνο δύο παραμέτρους: κλειδί ομαδοποίησης και R data.frame που αντιστοιχεί σε αυτό το κλειδί. Οι ομάδες επιλέγονται από SparkDataFrames στήλες. Το αποτέλεσμα της συνάρτησης πρέπει να είναι ένα data.frame. Το σχήμα καθορίζει τη μορφή γραμμής του αποτελέσματος SparkDataFrame. Πρέπει να αντιπροσωπεύει το σχήμα εξόδου της συνάρτησης R από τους τύπους δεδομένων Spark. Τα ονόματα των στηλών που επιστρέφονται data.frame ορίζονται από τον χρήστη.

# determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
    },
    schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))

gapplyCollect

Όπως gapply, εφαρμόζει μια συνάρτηση σε κάθε ομάδα του και SparkDataFrame συλλέγει το αποτέλεσμα πίσω στην R data.frame. Το αποτέλεσμα της συνάρτησης πρέπει να είναι ένα data.frame. Ωστόσο, δεν απαιτείται η διαβίβαση του σχήματος. Σημειώστε ότι μπορεί gapplyCollect να αποτύχει εάν οι έξοδοι της συνάρτησης εκτελούνται σε όλο το διαμέρισμα δεν είναι δυνατό να ελαχθούν στο πρόγραμμα οδήγησης και να χωρέσουν στη μνήμη του προγράμματος οδήγησης.

# determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
        colnames(y) <- c("waiting", "max_eruption")
        y
    })
head(result[order(result$max_eruption, decreasing = TRUE), ])

Εκτέλεση τοπικών συναρτήσεων R κατανεμημένων με spark.lapply

spark.lapply

lapply Όμοια με την εγγενή R, spark.lapply εκτελεί μια συνάρτηση σε μια λίστα στοιχείων και διανέμει τους υπολογισμούς με spark. Εφαρμόζει μια συνάρτηση με τρόπο παρόμοιο με doParallel ή lapply με τα στοιχεία μιας λίστας. Τα αποτελέσματα όλων των υπολογισμών θα πρέπει να χωρέσουν σε έναν μόνο υπολογιστή. Εάν δεν συμβαίνει αυτό, μπορούν να κάνουν κάτι όπως df <- createDataFrame(list) το και, στη συνέχεια, να χρησιμοποιήσουν dapplyτο .

# perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
  model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
  summary(model)
}
# return a list of model's summaries
model.summaries <- spark.lapply(families, train)

# print the summary of each model
print(model.summaries)

Εκτέλεση ερωτημάτων SQL από το SparkR

Ένα SparkR DataFrame μπορεί επίσης να καταχωρηθεί ως μια προσωρινή προβολή που σας επιτρέπει να εκτελείτε ερωτήματα SQL στα δεδομένα του. Η συνάρτηση SQL επιτρέπει στις εφαρμογές να εκτελούν ερωτήματα SQL μέσω προγραμματισμού και επιστρέφει το αποτέλεσμα ως SparkR DataFrame.

# Register earlier df as temp view
createOrReplaceTempView(df, "eruptions")

# Create a df using a SparkSQL query
waiting <- sql("SELECT waiting FROM eruptions where waiting>70 ")

head(waiting)

Εκμάθηση μηχανής

Το SparkR εμφανίζει τους περισσότερους αλγόριθμους MLLib. Στο κάτω μέρος του παραθύρου, το SparkR χρησιμοποιεί το MLlib για την εκπαίδευση του μοντέλου.

Το παρακάτω παράδειγμα δείχνει πώς μπορείτε να δημιουργήσετε ένα μοντέλο Gaussian GLM χρησιμοποιώντας SparkR. Για να εκτελέσετε γραμμική παλινδρόμηση, ορίστε την οικογένεια σε "gaussian". Για να εκτελέσετε λογιστική παλινδρόμηση, ορίστε την οικογένεια σε "binomial". Όταν χρησιμοποιείτε το SparkML GLM SparkR, εκτελείται αυτόματα κωδικοποίηση μίας χρήσης των κατηγορικών δυνατοτήτων, ώστε να μην χρειάζεται να γίνεται με μη αυτόματο τρόπο. Πέρα από τις δυνατότητες συμβολοσειράς και διπλού τύπου, μπορείτε επίσης να χωρέσετε στις δυνατότητες του MLlib Vector, για συμβατότητα με άλλα στοιχεία MLlib.

Για να μάθετε περισσότερα σχετικά με τους αλγόριθμους εκμάθησης μηχανής που υποστηρίζονται, μπορείτε να επισκεφθείτε την τεκμηρίωση για SparkR και MLlib.

# create the DataFrame
cars <- cbind(model = rownames(mtcars), mtcars)
carsDF <- createDataFrame(cars)

# fit a linear model over the dataset.
model <- spark.glm(carsDF, mpg ~ wt + cyl, family = "gaussian")

# model coefficients are returned in a similar format to R's native glm().
summary(model)