Oktatóanyag: Adatok betöltése és átalakítása Az Apache Spark DataFrame-ek használatával
Ez az oktatóanyag bemutatja, hogyan tölthet be és alakíthat át adatokat az Apache Spark Python (PySpark) DataFrame API, az Apache Spark Scala DataFrame API és a SparkR SparkDataFrame API használatával az Azure Databricksben.
Az oktatóanyag végére megismerheti a DataFrame-et, és megismerkedhet a következő feladatokkal:
Python
- Változók definiálása és nyilvános adatok másolása Unity Catalog-kötetbe
- DataFrame létrehozása Pythonnal
- Adatok betöltése DataFrame-fájlba CSV-fájlból
- DataFrame megtekintése és használata
- A DataFrame mentése
- SQL-lekérdezések futtatása a PySparkban
Lásd még az Apache Spark PySpark API-referenciát.
Scala
- Változók definiálása és nyilvános adatok másolása Unity Catalog-kötetbe
- DataFrame létrehozása a Scalával
- Adatok betöltése DataFrame-fájlba CSV-fájlból
- DataFrame megtekintése és használata
- A DataFrame mentése
- SQL-lekérdezések futtatása az Apache Sparkban
Lásd még az Apache Spark Scala API-referenciát.
R
- Változók definiálása és nyilvános adatok másolása Unity Catalog-kötetbe
- SparkR SparkDataFrame-ek létrehozása
- Adatok betöltése DataFrame-fájlba CSV-fájlból
- DataFrame megtekintése és használata
- A DataFrame mentése
- SQL-lekérdezések futtatása a SparkR-ben
Lásd még : Apache SparkR API-referencia.
Mi az a DataFrame?
A DataFrame egy kétdimenziós címkézett adatstruktúra, amely különböző típusú oszlopokat tartalmaz. A DataFrame-ekre, például számolótáblákra, SQL-táblákra vagy sorozatobjektumok szótárára is gondolhat. Az Apache Spark DataFrame-ek számos függvényt biztosítanak (oszlopok kiválasztása, szűrés, illesztés, összesítés), amelyek lehetővé teszik a gyakori adatelemzési problémák hatékony megoldását.
Az Apache Spark DataFrame-ek rugalmas elosztott adathalmazokra (RDD-kre) épülő absztrakciók. A Spark DataFrames és a Spark SQL egységes tervezési és optimalizálási motort használ, amely lehetővé teszi, hogy az Azure Databricks (Python, SQL, Scala és R) összes támogatott nyelvén szinte azonos teljesítményt nyújtsunk.
Követelmények
A következő oktatóanyag elvégzéséhez meg kell felelnie a következő követelményeknek:
Az oktatóanyagban szereplő példák használatához a munkaterületen engedélyezni kell a Unity Katalógust .
Az oktatóanyagban szereplő példák egy Unity Catalog-kötetet használnak a mintaadatok tárolására. A példák használatához hozzon létre egy kötetet, és használja a kötet katalógusát, sémáját és kötetneveit a példák által használt kötetútvonal beállításához.
A Unity Katalógusban a következő engedélyekkel kell rendelkeznie:
READ VOLUME
vagyWRITE VOLUME
ALL PRIVILEGES
az oktatóanyaghoz használt kötethez.USE SCHEMA
vagyALL PRIVILEGES
az oktatóanyaghoz használt sémához.USE CATALOG
vagyALL PRIVILEGES
az oktatóanyaghoz használt katalógushoz.
Az engedélyek beállításához tekintse meg a Databricks-rendszergazdai vagy a Unity Catalog-jogosultságokat és a biztonságos objektumokat.
Tipp.
A cikk befejezett jegyzetfüzetét a DataFrame oktatóanyag-jegyzetfüzetében találja.
1. lépés: Változók definiálása és CSV-fájl betöltése
Ez a lépés meghatározza az oktatóanyagban használt változókat, majd betölt egy CSV-fájlt, amely a health.data.ny.gov babanévadatait tartalmazza a Unity Catalog-kötetbe.
Nyisson meg egy új jegyzetfüzetet az ikonra kattintva. Az Azure Databricks-jegyzetfüzetek közötti navigálásról a Databricks-jegyzetfüzetek felületéről és vezérlőiről olvashat.
Másolja és illessze be a következő kódot az új üres jegyzetfüzetcellába.
<schema-name>
<volume-name>
Cserélje le<catalog-name>
a unitykatalógus köteteinek katalógusát, sémáját és kötetnevét. Cserélje le<table_name>
a kívánt táblanévre. Az oktatóanyag későbbi részében babanévadatokat fog betölteni ebbe a táblába.Nyomja le
Shift+Enter
a cellát, és hozzon létre egy új üres cellát.Python
catalog = "<catalog_name>" schema = "<schema_name>" volume = "<volume_name>" download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name = "rows.csv" table_name = "<table_name>" path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume path_tables = catalog + "." + schema print(path_tables) # Show the complete path print(path_volume) # Show the complete path
Scala
val catalog = "<catalog_name>" val schema = "<schema_name>" val volume = "<volume_name>" val download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" val file_name = "rows.csv" val table_name = "<table_name>" val path_volume = s"/Volumes/$catalog/$schema/$volume" val path_tables = s"$catalog.$schema.$table_name" print(path_volume) // Show the complete path print(path_tables) // Show the complete path
R
catalog <- "<catalog_name>" schema <- "<schema_name>" volume <- "<volume_name>" download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name <- "rows.csv" table_name <- "<table_name>" path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "") path_tables <- paste(catalog, ".", schema, sep = "") print(path_volume) # Show the complete path print(path_tables) # Show the complete path
Másolja és illessze be a következő kódot az új üres jegyzetfüzetcellába. Ez a kód a Fájlt a
rows.csv
Databricks dbutuils paranccsal másolja a health.data.ny.gov a Unity Catalog-kötetbe.Nyomja le
Shift+Enter
a cellát, majd lépjen a következő cellára.Python
dbutils.fs.cp(f"{download_url}", f"{path_volume}" + "/" + f"{file_name}")
Scala
dbutils.fs.cp(download_url, s"$path_volume/$file_name")
R
dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
2. lépés: DataFrame létrehozása
Ez a lépés létrehoz egy tesztadatokkal elnevezett df1
DataFrame-et, majd megjeleníti annak tartalmát.
Másolja és illessze be a következő kódot az új üres jegyzetfüzetcellába. Ez a kód létrehozza az adatkeretet tesztadatokkal, majd megjeleníti a DataFrame tartalmát és sémáját.
Nyomja le
Shift+Enter
a cellát, majd lépjen a következő cellára.Python
data = [[2021, "test", "Albany", "M", 42]] columns = ["Year", "First_Name", "County", "Sex", "Count"] df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int") display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
Scala
val data = Seq((2021, "test", "Albany", "M", 42)) val columns = Seq("Year", "First_Name", "County", "Sex", "Count") val df1 = data.toDF(columns: _*) display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization. // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
R
# Load the SparkR package that is already preinstalled on the cluster. library(SparkR) data <- data.frame( Year = c(2021), First_Name = c("test"), County = c("Albany"), Sex = c("M"), Count = c(42) ) df1 <- createDataFrame(data) display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
3. lépés: Adatok betöltése adatkeretbe CSV-fájlból
Ez a lépés létrehoz egy DataFrame-et a korábban a Unity Catalog-kötetbe betöltött CSV-fájlból.df_csv
Lásd: spark.read.csv.
Másolja és illessze be a következő kódot az új üres jegyzetfüzetcellába. Ez a kód betölti a babanév adatait a DataFrame-be
df_csv
a CSV-fájlból, majd megjeleníti a DataFrame tartalmát.Nyomja le
Shift+Enter
a cellát, majd lépjen a következő cellára.Python
df_csv = spark.read.csv(f"{path_volume}/{file_name}", header=True, inferSchema=True, sep=",") display(df_csv)
Scala
val df_csv = spark.read .option("header", "true") .option("inferSchema", "true") .option("delimiter", ",") .csv(s"$path_volume/$file_name") display(df_csv)
R
df_csv <- read.df(paste(path_volume, "/", file_name, sep=""), source="csv", header = TRUE, inferSchema = TRUE, delimiter = ",") display(df_csv)
Számos támogatott fájlformátumból tölthet be adatokat.
4. lépés: A DataFrame megtekintése és használata
Tekintse meg és használja a DataFrames babaneveit az alábbi módszerekkel.
A DataFrame-séma nyomtatása
Megtudhatja, hogyan jelenítheti meg az Apache Spark DataFrame sémáját. Az Apache Spark a kifejezésséma használatával hivatkozik a DataFrame oszlopainak nevére és adattípusára.
Másolja és illessze be a következő kódot egy üres jegyzetfüzetcellába. Ez a kód a DataFrame-ek sémáját mutatja be a .printSchema()
két DataFrame sémáinak megtekintésére használható metódussal , hogy előkészítse a két DataFrame egyesítését.
Python
df_csv.printSchema()
df1.printSchema()
Scala
df_csv.printSchema()
df1.printSchema()
R
printSchema(df_csv)
printSchema(df1)
Feljegyzés
Az Azure Databricks a kifejezésséma használatával is leírja a katalógusban regisztrált táblák gyűjteményét.
Oszlop átnevezése a DataFrame-ben
Megtudhatja, hogyan nevezhet át egy oszlopot egy DataFrame-ben.
Másolja és illessze be a következő kódot egy üres jegyzetfüzetcellába. Ez a kód átnevez egy oszlopot a DataFrame-ben, df1_csv
hogy megfeleljen a DataFrame megfelelő oszlopának df1
. Ez a kód az Apache Spark withColumnRenamed()
metódust használja.
Python
df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
df_csv.printSchema
Scala
val df_csvRenamed = df_csv.withColumnRenamed("First Name", "First_Name")
// when modifying a DataFrame in Scala, you must assign it to a new variable
df_csv_renamed.printSchema()
R
df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
printSchema(df_csv)
Adatkeretek egyesítése
Megtudhatja, hogyan hozhat létre egy új DataFrame-et, amely hozzáadja az egyik DataFrame sorait a másikhoz.
Másolja és illessze be a következő kódot egy üres jegyzetfüzetcellába. Ez a kód az Apache Spark union()
metódussal egyesíti az első DataFrame df
tartalmát a CSV-fájlból betöltött babaneveket tartalmazó DataFrame-lel df_csv
.
Python
df = df1.union(df_csv)
display(df)
Scala
val df = df1.union(df_csv_renamed)
display(df)
R
display(df <- union(df1, df_csv))
Sorok szűrése DataFrame-ben
Az Adathalmaz legnépszerűbb babaneveit a sorok szűrésével, az Apache Spark .filter()
vagy .where()
metódusok használatával fedezheti fel. Szűréssel kiválaszthatja a dataframe-ben visszaadni vagy módosítani kívánt sorok egy részhalmazát. Nincs különbség a teljesítményben vagy a szintaxisban, ahogy az alábbi példákban látható.
.filter() metódus használata
Másolja és illessze be a következő kódot egy üres jegyzetfüzetcellába. Ez a kód az Apache Spark .filter()
metódussal jeleníti meg azokat a sorokat a DataFrame-ben, amelyek száma meghaladja az 50-et.
Python
display(df.filter(df["Count"] > 50))
Scala
display(df.filter(df("Count") > 50))
R
display(filteredDF <- filter(df, df$Count > 50))
.where() metódus használata
Másolja és illessze be a következő kódot egy üres jegyzetfüzetcellába. Ez a kód az Apache Spark .where()
metódussal jeleníti meg azokat a sorokat a DataFrame-ben, amelyek száma meghaladja az 50-et.
Python
display(df.where(df["Count"] > 50))
Scala
display(df.where(df("Count") > 50))
R
display(filtered_df <- where(df, df$Count > 50))
Oszlopok kijelölése DataFrame-ből és sorrend gyakoriság szerint
Megtudhatja, hogy melyik babanév gyakorisága a select()
visszaadni kívánt DataFrame oszlopainak megadására szolgáló metódussal. Az eredményeket az Apache Spark orderby
és desc
a függvények segítségével rendezheti.
Az Apache Spark pyspark.sql modulja támogatja az SQL-függvényeket. Az oktatóanyagban használt függvények közé tartozik az Apache Spark orderBy()
desc()
és expr()
a függvények. Ezeknek a függvényeknek a használatát úgy engedélyezheti, hogy szükség szerint importálja őket a munkamenetbe.
Másolja és illessze be a következő kódot egy üres jegyzetfüzetcellába. Ez a kód importálja a függvényt desc()
, majd az Apache Spark select()
metódust és az Apache Sparkot orderBy()
és desc()
függvényeket használja a leggyakoribb nevek és azok számának csökkenő sorrendben való megjelenítéséhez.
Python
from pyspark.sql.functions import desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))
Scala
import org.apache.spark.sql.functions.desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))
R
display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
Adatkeret-részhalmaz létrehozása
Megtudhatja, hogyan hozhat létre adatkeret-részhalmazt egy meglévő DataFrame-ből.
Másolja és illessze be a következő kódot egy üres jegyzetfüzetcellába. Ez a kód az Apache Spark filter
metódus használatával hoz létre egy új DataFrame-et, amely év, darabszám és nem szerint korlátozza az adatokat. Az Apache Spark select()
metódus használatával korlátozza az oszlopokat. Emellett az Apache Spark orderBy()
és desc()
a függvények használatával rendezi az új DataFrame-et szám szerint.
Python
subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
display(subsetDF)
Scala
val subsetDF = df.filter((df("Year") == 2009) && (df("Count") > 100) && (df("Sex") == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
display(subsetDF)
R
subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
display(subsetDF)
5. lépés: A DataFrame mentése
Megtudhatja, hogyan menthet DataFrame-eket. A DataFrame-et mentheti egy táblába, vagy fájlba vagy több fájlba is írhatja a DataFrame-et.
A DataFrame mentése táblázatba
Az Azure Databricks alapértelmezés szerint az összes tábla Delta Lake-formátumát használja. A DataFrame mentéséhez táblajogokkal kell rendelkeznie CREATE
a katalógusban és a sémában.
Másolja és illessze be a következő kódot egy üres jegyzetfüzetcellába. Ez a kód az oktatóanyag elején definiált változó használatával menti a DataFrame tartalmát egy táblába.
Python
df.write.saveAsTable(f"{path_tables}" + "." + f"{table_name}")
# To overwrite an existing table, use the following code:
# df.write.mode("overwrite").saveAsTable(f"{path_tables}" + "." + f"{table_name}")
Scala
df.write.saveAsTable(s"$path_tables" + "." + s"$table_name")
// To overwrite an existing table, use the following code:
// df.write.mode("overwrite").saveAsTable(s"$tables" + "." + s"$table_name")
R
saveAsTable(df, paste(path_tables, ".", table_name))
# To overwrite an existing table, use the following code:
# saveAsTable(df, paste(path_tables, ".", table_name), mode = "overwrite")
A legtöbb Apache Spark-alkalmazás nagy adatkészleteken és elosztott módon működik. Az Apache Spark egyetlen fájl helyett fájlkönyvtárat ír ki. A Delta Lake felosztja a Parquet-mappákat és -fájlokat. Sok adatrendszer képes beolvasni ezeket a fájlkönyvtárakat. Az Azure Databricks azt javasolja, hogy a legtöbb alkalmazáshoz használja a táblákat fájlelérési utakon keresztül.
A DataFrame mentése JSON-fájlokba
Másolja és illessze be a következő kódot egy üres jegyzetfüzetcellába. Ez a kód jSON-fájlok könyvtárába menti a DataFrame-et.
Python
df.write.format("json").save("/tmp/json_data")
# To overwrite an existing file, use the following code:
# df.write.format("json").mode("overwrite").save("/tmp/json_data")
Scala
df.write.format("json").save("/tmp/json_data")
// To overwrite an existing file, use the following code:
// df.write.format("json").mode("overwrite").save("/tmp/json_data")
R
write.df(df, path = "/tmp/json_data", source = "json")
# To overwrite an existing file, use the following code:
# write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
A DataFrame beolvasása egy JSON-fájlból
Megtudhatja, hogyan olvashatja be a JSON-adatokat egy könyvtárból egy DataFrame-be az Apache Spark spark.read.format()
metódus használatával.
Másolja és illessze be a következő kódot egy üres jegyzetfüzetcellába. Ez a kód megjeleníti az előző példában mentett JSON-fájlokat.
Python
display(spark.read.format("json").json("/tmp/json_data"))
Scala
display(spark.read.format("json").json("/tmp/json_data"))
R
display(read.json("/tmp/json_data"))
További feladatok: SQL-lekérdezések futtatása a PySparkban, a Scalában és az R-ben
Az Apache Spark DataFrame-ek az alábbi lehetőségeket biztosítják az SQL és a PySpark, a Scala és az R kombinálásához. Az alábbi kódot az oktatóanyaghoz létrehozott jegyzetfüzetben futtathatja.
Oszlop megadása SQL-lekérdezésként
Ismerje meg, hogyan használhatja az Apache Spark selectExpr()
metódust. Ez a metódus egy változata, amely elfogadja az select()
SQL-kifejezéseket, és egy frissített DataFrame-et ad vissza. Ez a módszer lehetővé teszi egy SQL-kifejezés, például upper
a .
Másolja és illessze be a következő kódot egy üres jegyzetfüzetcellába. Ez a kód az Apache Spark selectExpr()
metódust és az SQL-kifejezést upper
használja egy sztringoszlop nagybetűssé alakításához (és az oszlop átnevezéséhez).
Python
display(df.selectExpr("Count", "upper(County) as big_name"))
Scala
display(df.selectExpr("Count", "upper(County) as big_name"))
R
display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
Sql-szintaxis expr()
használata oszlophoz
Megtudhatja, hogyan importálhatja és használhatja az Apache Spark expr()
függvényt az SQL-szintaxis használatára bárhol, ahol egy oszlop meg van adva.
Másolja és illessze be a következő kódot egy üres jegyzetfüzetcellába. Ez a kód importálja a függvényt expr()
, majd az Apache Spark expr()
függvényt és az SQL-kifejezést lower
használja egy sztringoszlop kisbetűssé alakításához (és átnevezi az oszlopot).
Python
from pyspark.sql.functions import expr
display(df.select("Count", expr("lower(County) as little_name")))
Scala
import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function
display(df.select(col("Count"), expr("lower(County) as little_name")))
R
display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
# expr() function is not supported in R, selectExpr in SparkR replicates this functionality
Tetszőleges SQL-lekérdezés futtatása spark.sql() függvénnyel
Ismerje meg, hogyan futtathat tetszőleges SQL-lekérdezéseket az Apache Spark spark.sql()
függvény használatával.
Másolja és illessze be a következő kódot egy üres jegyzetfüzetcellába. Ez a kód az Apache Spark spark.sql()
függvénnyel kérdez le egy SQL-táblát SQL-szintaxissal.
Python
display(spark.sql(f"SELECT * FROM {path_tables}" + "." + f"{table_name}"))
Scala
display(spark.sql(s"SELECT * FROM $path_tables.$table_name"))
R
display(sql(paste("SELECT * FROM", path_tables, ".", table_name)))
DataFrame oktatóanyag-jegyzetfüzet
Az alábbi jegyzetfüzet az oktatóanyagból származó példákat tartalmazza.
Python
DataFrames-oktatóanyag Python-jegyzetfüzet használatával
Scala
DataFrames-oktatóanyag Scala-jegyzetfüzet használatával
R
DataFrames-oktatóanyag R-jegyzetfüzet használatával
További erőforrások
Visszajelzés
https://aka.ms/ContentUserFeedback.
Hamarosan elérhető: 2024-ben fokozatosan kivezetjük a GitHub-problémákat a tartalom visszajelzési mechanizmusaként, és lecseréljük egy új visszajelzési rendszerre. További információ:Visszajelzés küldése és megtekintése a következőhöz: