A SparkR 1.6 áttekintése

Megjegyzés

Információk a legújabb SparkR-kódtárról: Azure Databricks R-fejlesztőknek.

A SparkR egy olyan R-csomag, amely egy kis méretű előtere az R-től származó Apache Spark-hez. A Spark 1.5.1-től kezdődően a SparkR egy elosztott DataFrame-implementációt biztosít, amely támogatja az olyan műveleteket, mint a kiválasztás, a szűrés és az összesítés (az R-adatkerethez és a dplyrhez hasonlóan), de nagy adatkészletek esetén is. A SparkR támogatja az elosztott gépi tanulást is az MLlib használatával.

SparkR-adatkeretek létrehozása

Az alkalmazások létrehozhatnak DataFrame-eket helyi R-adatkeretből, adatforrásból vagy Spark SQL használatával.

A DataFrame legegyszerűbben úgy hozható létre, ha egy helyi R-adatkeretet SparkR DataFrame-keretgé alakít. Pontosabban használhatjuk a DataFrame-et, és átadjuk a helyi R-adatkeretet egy SparkR DataFrame létrehozásához. Az alábbi cella például egy DataFrame-et hoz létre az R-től származó személyes adatkészlet használatával.

df <- createDataFrame(sqlContext, faithful)

# Displays the content of the DataFrame to stdout
head(df)

Adatforrásból a Spark-SQL

Az adatkeretek adatforrásból való létrehozásának általános módszere a read.df. Ez a metódus az SQLContextet, a betölteni kívánt fájl elérési útját és az adatforrás típusát veszi fel. A SparkR támogatja a JSON- és Parquet-fájlok natív olvasását, a Spark-csomagokon keresztül pedig olyan népszerű fájlformátumokhoz találhat adatforrás-összekötőket, mint a CSV és az Avro.

%fs rm dbfs:/tmp/people.json
%fs put dbfs:/tmp/people.json
'{"age": 10, "name": "John"}
{"age": 20, "name": "Jane"}
{"age": 30, "name": "Andy"}'
people <- read.df(sqlContext, "dbfs:/tmp/people.json", source="json")

A SparkR automatikusan kiveszi a sémát a JSON-fájlból.

printSchema(people)
display(people)

Adatforrás-összekötők használata Spark-csomagokkal

Példaként a Spark CSV-csomaggal töltünk be egy CSV-fájlt. A Databricks Spark-csomagjainak listáját itt találja:.

diamonds <- read.df(sqlContext, "/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv",
                    source = "com.databricks.spark.csv", header="true", inferSchema = "true")
head(diamonds)

Az adatforrások API-val a DataFrame-eket több fájlformátumba is mentheti. Az előző példában található DataFrame-et menthetjük például egy Parquet-fájlba a write.df fájl használatával

%fs rm -r dbfs:/tmp/people.parquet
write.df(people, path="dbfs:/tmp/people.parquet", source="parquet", mode="overwrite")
%fs ls dbfs:/tmp/people.parquet

Spark-SQL lekérdezések

SparkR DataFrame-eket Spark-lekérdezésekkel SQL is létrehozhat.

# Register earlier df as temp view
createOrReplaceTempView(people, "peopleTemp")
# Create a df consisting of only the 'age' column using a Spark SQL query
age <- sql(sqlContext, "SELECT age FROM peopleTemp")
head(age)
# Resulting df is a SparkR df
str(age)

DataFrame-műveletek

A SparkR-adatkeretek számos függvényt támogatnak a strukturált adatfeldolgozáshoz. Itt néhány alapvető példát mutatunk be, és egy teljes listát az API-dokumentációban talál.

Sorok és oszlopok kiválasztása

# Create DataFrame
df <- createDataFrame(sqlContext, faithful)
df
# Select only the "eruptions" column
head(select(df, df$eruptions))
# You can also pass in column name as strings
head(select(df, "eruptions"))
# Filter the DataFrame to only retain rows with wait times shorter than 50 mins
head(filter(df, df$waiting < 50))

Csoportosítás és összesítés

A SparkR-adatkeretek számos gyakran használt függvényt támogatnak az adatok csoportosítás utáni összesítéséhez. Megszámolhatja például, hogy az egyes várakozási idők hányszor jelennek meg a számított adatkészletben.

head(count(groupBy(df, df$waiting)))
# We can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- count(groupBy(df, df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))

Oszlopműveletek

A SparkR számos olyan függvényt biztosít, amelyek közvetlenül alkalmazhatók az oszlopokra adatfeldolgozás és -összesítés céljából. Az alábbi példa az alapszintű aritmetikai függvények használatát mutatja be.

# Convert waiting time from hours to seconds.
# You can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)

Gépi tanulás

A Spark 1.5-től a SparkR lehetővé teszi, hogy általánosított lineáris modelleket illesztsen a SparkR DataFrame-ekbe a fogm() függvény használatával. A SparkR a háttérben az MLlib segítségével betanítja a megadott család modelljét. A modellilledéshez elérhető R-képlet operátorok egy részkészletét támogatjuk, beleértve a "~", ".", "+" és "-" operátorokat.

A SparkR a háttérben automatikusan végrehajtja a kategorikus jellemzők one-hot kódolását, így nem szükséges manuálisan elvégezni. A Sztring és a Dupla típusú funkciókon kívül az MLlib Vector funkciói is átférnek a többi MLlib-összetevővel való kompatibilitás érdekében.

Az alábbi példa azt mutatja be, hogy a SparkR használatával miben használható egy egy ofisian ARRAM-modell kiépítése. A lineáris regresszió futtatásához a családnál adja meg a "gaussian" (család) beállítását. A logisztikai regresszió futtatásához állítsa a család "binomial" (binomiális) beállítását.

# Create the DataFrame
df <- createDataFrame(sqlContext, iris)

# Fit a linear model over the dataset.
model <- glm(Sepal_Length ~ Sepal_Width + Species, data = df, family = "gaussian")

# Model coefficients are returned in a similar format to R's native glm().
summary(model)

Helyi R-adatkeretek konvertálása SparkR-adatkeretekké

A használatával createDataFrame átalakíthatja a helyi R-adatkereteket SparkR-adatkeretekké.

# Create SparkR DataFrame using localDF
convertedSparkDF <- createDataFrame(sqlContext, localDF)
str(convertedSparkDF)
# Another example: Create SparkR DataFrame with a local R data frame
anotherSparkDF <- createDataFrame(sqlContext, data.frame(surname = c("Tukey", "Venables", "Tierney", "Ripley", "McNeil"),
                                                         nationality = c("US", "Australia", "US", "UK", "Australia"),
                                                         deceased = c("yes", rep("no", 4))))
count(anotherSparkDF)