Oktatóanyag: Adatok betöltése és átalakítása Az Apache Spark DataFrame-ek használatával

Ez az oktatóanyag bemutatja, hogyan tölthet be és alakíthat át adatokat az Apache Spark Python (PySpark) DataFrame API, az Apache Spark Scala DataFrame API és a SparkR SparkDataFrame API használatával az Azure Databricksben.

Az oktatóanyag végére megismerheti a DataFrame-et, és megismerkedhet a következő feladatokkal:

Python

Lásd még az Apache Spark PySpark API-referenciát.

Scala

Lásd még az Apache Spark Scala API-referenciát.

R

Lásd még : Apache SparkR API-referencia.

Mi az a DataFrame?

A DataFrame egy kétdimenziós címkézett adatstruktúra, amely különböző típusú oszlopokat tartalmaz. A DataFrame-ekre, például számolótáblákra, SQL-táblákra vagy sorozatobjektumok szótárára is gondolhat. Az Apache Spark DataFrame-ek számos függvényt biztosítanak (oszlopok kiválasztása, szűrés, illesztés, összesítés), amelyek lehetővé teszik a gyakori adatelemzési problémák hatékony megoldását.

Az Apache Spark DataFrame-ek rugalmas elosztott adathalmazokra (RDD-kre) épülő absztrakciók. A Spark DataFrames és a Spark SQL egységes tervezési és optimalizálási motort használ, amely lehetővé teszi, hogy az Azure Databricks (Python, SQL, Scala és R) összes támogatott nyelvén szinte azonos teljesítményt nyújtsunk.

Követelmények

A következő oktatóanyag elvégzéséhez meg kell felelnie a következő követelményeknek:

  • Az oktatóanyagban szereplő példák használatához a munkaterületen engedélyezni kell a Unity Katalógust .

  • Az oktatóanyagban szereplő példák egy Unity Catalog-kötetet használnak a mintaadatok tárolására. A példák használatához hozzon létre egy kötetet, és használja a kötet katalógusát, sémáját és kötetneveit a példák által használt kötetútvonal beállításához.

  • A Unity Katalógusban a következő engedélyekkel kell rendelkeznie:

    • READ VOLUMEvagy WRITE VOLUMEALL PRIVILEGES az oktatóanyaghoz használt kötethez.
    • USE SCHEMA vagy ALL PRIVILEGES az oktatóanyaghoz használt sémához.
    • USE CATALOG vagy ALL PRIVILEGES az oktatóanyaghoz használt katalógushoz.

    Az engedélyek beállításához tekintse meg a Databricks-rendszergazdai vagy a Unity Catalog-jogosultságokat és a biztonságos objektumokat.

1. lépés: Változók definiálása és CSV-fájl betöltése

Ez a lépés meghatározza az oktatóanyagban használt változókat, majd betölt egy CSV-fájlt, amely a health.data.ny.gov babanévadatait tartalmazza a Unity Catalog-kötetbe.

  1. Nyisson meg egy új jegyzetfüzetet az Új ikon ikonra kattintva. Az Azure Databricks-jegyzetfüzetek közötti navigálásról a Databricks-jegyzetfüzetek felületéről és vezérlőiről olvashat.

  2. Másolja és illessze be a következő kódot az új üres jegyzetfüzetcellába. <schema-name><volume-name> Cserélje le <catalog-name>a unitykatalógus köteteinek katalógusát, sémáját és kötetnevét. Cserélje le <table_name> a kívánt táblanévre. Az oktatóanyag későbbi részében babanévadatokat fog betölteni ebbe a táblába.

  3. Nyomja le Shift+Enter a cellát, és hozzon létre egy új üres cellát.

    Python

    catalog = "<catalog_name>"
    schema = "<schema_name>"
    volume = "<volume_name>"
    download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name = "rows.csv"
    table_name = "<table_name>"
    path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
    path_tables = catalog + "." + schema
    print(path_tables) # Show the complete path
    print(path_volume) # Show the complete path
    

    Scala

    val catalog = "<catalog_name>"
    val schema = "<schema_name>"
    val volume = "<volume_name>"
    val download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    val file_name = "rows.csv"
    val table_name = "<table_name>"
    val path_volume = s"/Volumes/$catalog/$schema/$volume"
    val path_tables = s"$catalog.$schema.$table_name"
    print(path_volume) // Show the complete path
    print(path_tables) // Show the complete path
    

    R

    catalog <- "<catalog_name>"
    schema <- "<schema_name>"
    volume <- "<volume_name>"
    download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name <- "rows.csv"
    table_name <- "<table_name>"
    path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "")
    path_tables <- paste(catalog, ".", schema, sep = "")
    print(path_volume) # Show the complete path
    print(path_tables) # Show the complete path
    
  4. Másolja és illessze be a következő kódot az új üres jegyzetfüzetcellába. Ez a kód a Fájlt a rows.csv Databricks dbutuils paranccsal másolja a health.data.ny.gov a Unity Catalog-kötetbe.

  5. Nyomja le Shift+Enter a cellát, majd lépjen a következő cellára.

    Python

    dbutils.fs.cp(f"{download_url}", f"{path_volume}" + "/" + f"{file_name}")
    

    Scala

    dbutils.fs.cp(download_url, s"$path_volume/$file_name")
    

    R

    dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
    

2. lépés: DataFrame létrehozása

Ez a lépés létrehoz egy tesztadatokkal elnevezett df1 DataFrame-et, majd megjeleníti annak tartalmát.

  1. Másolja és illessze be a következő kódot az új üres jegyzetfüzetcellába. Ez a kód létrehozza az adatkeretet tesztadatokkal, majd megjeleníti a DataFrame tartalmát és sémáját.

  2. Nyomja le Shift+Enter a cellát, majd lépjen a következő cellára.

    Python

    data = [[2021, "test", "Albany", "M", 42]]
    columns = ["Year", "First_Name", "County", "Sex", "Count"]
    
    df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    Scala

    val data = Seq((2021, "test", "Albany", "M", 42))
    val columns = Seq("Year", "First_Name", "County", "Sex", "Count")
    
    val df1 = data.toDF(columns: _*)
    display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization.
    // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    R

    # Load the SparkR package that is already preinstalled on the cluster.
    library(SparkR)
    
    data <- data.frame(
      Year = c(2021),
      First_Name = c("test"),
      County = c("Albany"),
      Sex = c("M"),
      Count = c(42)
    )
    
    df1 <- createDataFrame(data)
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
    

3. lépés: Adatok betöltése adatkeretbe CSV-fájlból

Ez a lépés létrehoz egy DataFrame-et a korábban a Unity Catalog-kötetbe betöltött CSV-fájlból.df_csv Lásd: spark.read.csv.

  1. Másolja és illessze be a következő kódot az új üres jegyzetfüzetcellába. Ez a kód betölti a babanév adatait a DataFrame-be df_csv a CSV-fájlból, majd megjeleníti a DataFrame tartalmát.

  2. Nyomja le Shift+Enter a cellát, majd lépjen a következő cellára.

    Python

    df_csv = spark.read.csv(f"{path_volume}/{file_name}",
      header=True,
      inferSchema=True,
      sep=",")
    display(df_csv)
    

    Scala

    val df_csv = spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .option("delimiter", ",")
      .csv(s"$path_volume/$file_name")
    
    display(df_csv)
    

    R

    df_csv <- read.df(paste(path_volume, "/", file_name, sep=""),
      source="csv",
      header = TRUE,
      inferSchema = TRUE,
      delimiter = ",")
    
    display(df_csv)
    

Számos támogatott fájlformátumból tölthet be adatokat.

4. lépés: A DataFrame megtekintése és használata

Tekintse meg és használja a DataFrames babaneveit az alábbi módszerekkel.

Megtudhatja, hogyan jelenítheti meg az Apache Spark DataFrame sémáját. Az Apache Spark a kifejezésséma használatával hivatkozik a DataFrame oszlopainak nevére és adattípusára.

Másolja és illessze be a következő kódot egy üres jegyzetfüzetcellába. Ez a kód a DataFrame-ek sémáját mutatja be a .printSchema() két DataFrame sémáinak megtekintésére használható metódussal , hogy előkészítse a két DataFrame egyesítését.

Python

df_csv.printSchema()
df1.printSchema()

Scala

df_csv.printSchema()
df1.printSchema()

R

printSchema(df_csv)
printSchema(df1)

Feljegyzés

Az Azure Databricks a kifejezésséma használatával is leírja a katalógusban regisztrált táblák gyűjteményét.

Oszlop átnevezése a DataFrame-ben

Megtudhatja, hogyan nevezhet át egy oszlopot egy DataFrame-ben.

Másolja és illessze be a következő kódot egy üres jegyzetfüzetcellába. Ez a kód átnevez egy oszlopot a DataFrame-ben, df1_csv hogy megfeleljen a DataFrame megfelelő oszlopának df1 . Ez a kód az Apache Spark withColumnRenamed() metódust használja.

Python

df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
df_csv.printSchema

Scala

val df_csvRenamed = df_csv.withColumnRenamed("First Name", "First_Name")
// when modifying a DataFrame in Scala, you must assign it to a new variable
df_csv_renamed.printSchema()

R

df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
printSchema(df_csv)

Adatkeretek egyesítése

Megtudhatja, hogyan hozhat létre egy új DataFrame-et, amely hozzáadja az egyik DataFrame sorait a másikhoz.

Másolja és illessze be a következő kódot egy üres jegyzetfüzetcellába. Ez a kód az Apache Spark union() metódussal egyesíti az első DataFrame df tartalmát a CSV-fájlból betöltött babaneveket tartalmazó DataFrame-lel df_csv .

Python

df = df1.union(df_csv)
display(df)

Scala

val df = df1.union(df_csv_renamed)
display(df)

R

display(df <- union(df1, df_csv))

Sorok szűrése DataFrame-ben

Az Adathalmaz legnépszerűbb babaneveit a sorok szűrésével, az Apache Spark .filter() vagy .where() metódusok használatával fedezheti fel. Szűréssel kiválaszthatja a dataframe-ben visszaadni vagy módosítani kívánt sorok egy részhalmazát. Nincs különbség a teljesítményben vagy a szintaxisban, ahogy az alábbi példákban látható.

.filter() metódus használata

Másolja és illessze be a következő kódot egy üres jegyzetfüzetcellába. Ez a kód az Apache Spark .filter() metódussal jeleníti meg azokat a sorokat a DataFrame-ben, amelyek száma meghaladja az 50-et.

Python
display(df.filter(df["Count"] > 50))
Scala
display(df.filter(df("Count") > 50))
R
display(filteredDF <- filter(df, df$Count > 50))

.where() metódus használata

Másolja és illessze be a következő kódot egy üres jegyzetfüzetcellába. Ez a kód az Apache Spark .where() metódussal jeleníti meg azokat a sorokat a DataFrame-ben, amelyek száma meghaladja az 50-et.

Python
display(df.where(df["Count"] > 50))
Scala
display(df.where(df("Count") > 50))
R
display(filtered_df <- where(df, df$Count > 50))

Oszlopok kijelölése DataFrame-ből és sorrend gyakoriság szerint

Megtudhatja, hogy melyik babanév gyakorisága a select() visszaadni kívánt DataFrame oszlopainak megadására szolgáló metódussal. Az eredményeket az Apache Spark orderby és desc a függvények segítségével rendezheti.

Az Apache Spark pyspark.sql modulja támogatja az SQL-függvényeket. Az oktatóanyagban használt függvények közé tartozik az Apache Spark orderBy()desc()és expr() a függvények. Ezeknek a függvényeknek a használatát úgy engedélyezheti, hogy szükség szerint importálja őket a munkamenetbe.

Másolja és illessze be a következő kódot egy üres jegyzetfüzetcellába. Ez a kód importálja a függvényt desc() , majd az Apache Spark select() metódust és az Apache Sparkot orderBy() és desc() függvényeket használja a leggyakoribb nevek és azok számának csökkenő sorrendben való megjelenítéséhez.

Python

from pyspark.sql.functions import desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))

Scala

import org.apache.spark.sql.functions.desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))

R

display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))

Adatkeret-részhalmaz létrehozása

Megtudhatja, hogyan hozhat létre adatkeret-részhalmazt egy meglévő DataFrame-ből.

Másolja és illessze be a következő kódot egy üres jegyzetfüzetcellába. Ez a kód az Apache Spark filter metódus használatával hoz létre egy új DataFrame-et, amely év, darabszám és nem szerint korlátozza az adatokat. Az Apache Spark select() metódus használatával korlátozza az oszlopokat. Emellett az Apache Spark orderBy() és desc() a függvények használatával rendezi az új DataFrame-et szám szerint.

Python

subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
display(subsetDF)

Scala

val subsetDF = df.filter((df("Year") == 2009) && (df("Count") > 100) && (df("Sex") == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))

display(subsetDF)

R

subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
display(subsetDF)

5. lépés: A DataFrame mentése

Megtudhatja, hogyan menthet DataFrame-eket. A DataFrame-et mentheti egy táblába, vagy fájlba vagy több fájlba is írhatja a DataFrame-et.

A DataFrame mentése táblázatba

Az Azure Databricks alapértelmezés szerint az összes tábla Delta Lake-formátumát használja. A DataFrame mentéséhez táblajogokkal kell rendelkeznie CREATE a katalógusban és a sémában.

Másolja és illessze be a következő kódot egy üres jegyzetfüzetcellába. Ez a kód az oktatóanyag elején definiált változó használatával menti a DataFrame tartalmát egy táblába.

Python

df.write.saveAsTable(s"$path_tables" + "." + s"$table_name")

# To overwrite an existing table, use the following code:
# df.write.mode("overwrite").saveAsTable(fs"$path_tables" + "." + s"$table_name")

Scala

df.write.saveAsTable(s"$path_tables" + "." + s"$table_name")

// To overwrite an existing table, use the following code:
// df.write.mode("overwrite").saveAsTable(s"$path_volume" + "." + s"$table_name")

R

saveAsTable(df, paste(path_tables, ".", table_name))
# To overwrite an existing table, use the following code:
# saveAsTable(df, paste(path_tables, ".", table_name), mode = "overwrite")

A legtöbb Apache Spark-alkalmazás nagy adatkészleteken és elosztott módon működik. Az Apache Spark egyetlen fájl helyett fájlkönyvtárat ír ki. A Delta Lake felosztja a Parquet-mappákat és -fájlokat. Sok adatrendszer képes beolvasni ezeket a fájlkönyvtárakat. Az Azure Databricks azt javasolja, hogy a legtöbb alkalmazáshoz használja a táblákat fájlelérési utakon keresztül.

A DataFrame mentése JSON-fájlokba

Másolja és illessze be a következő kódot egy üres jegyzetfüzetcellába. Ez a kód jSON-fájlok könyvtárába menti a DataFrame-et.

Python

df.write.format("json").save("/tmp/json_data")

# To overwrite an existing file, use the following code:
# df.write.format("json").mode("overwrite").save("/tmp/json_data")

Scala

df.write.format("json").save("/tmp/json_data")

// To overwrite an existing file, use the following code:
// df.write.format("json").mode("overwrite").save("/tmp/json_data")

R

write.df(df, path = "/tmp/json_data", source = "json")
# To overwrite an existing file, use the following code:
# write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")

A DataFrame beolvasása egy JSON-fájlból

Megtudhatja, hogyan olvashatja be a JSON-adatokat egy könyvtárból egy DataFrame-be az Apache Spark spark.read.format() metódus használatával.

Másolja és illessze be a következő kódot egy üres jegyzetfüzetcellába. Ez a kód megjeleníti az előző példában mentett JSON-fájlokat.

Python

display(spark.read.format("json").json("/tmp/json_data"))

Scala

display(spark.read.format("json").json("/tmp/json_data"))

R

display(read.json("/tmp/json_data"))

További feladatok: SQL-lekérdezések futtatása a PySparkban, a Scalában és az R-ben

Az Apache Spark DataFrame-ek az alábbi lehetőségeket biztosítják az SQL és a PySpark, a Scala és az R kombinálásához. Az alábbi kódot az oktatóanyaghoz létrehozott jegyzetfüzetben futtathatja.

Oszlop megadása SQL-lekérdezésként

Ismerje meg, hogyan használhatja az Apache Spark selectExpr() metódust. Ez a metódus egy változata, amely elfogadja az select() SQL-kifejezéseket, és egy frissített DataFrame-et ad vissza. Ez a módszer lehetővé teszi egy SQL-kifejezés, például uppera .

Másolja és illessze be a következő kódot egy üres jegyzetfüzetcellába. Ez a kód az Apache Spark selectExpr() metódust és az SQL-kifejezést upper használja egy sztringoszlop nagybetűssé alakításához (és az oszlop átnevezéséhez).

Python

display(df.selectExpr("Count", "upper(County) as big_name"))

Scala

display(df.selectExpr("Count", "upper(County) as big_name"))

R

display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))

Sql-szintaxis expr() használata oszlophoz

Megtudhatja, hogyan importálhatja és használhatja az Apache Spark expr() függvényt az SQL-szintaxis használatára bárhol, ahol egy oszlop meg van adva.

Másolja és illessze be a következő kódot egy üres jegyzetfüzetcellába. Ez a kód importálja a függvényt expr() , majd az Apache Spark expr() függvényt és az SQL-kifejezést lower használja egy sztringoszlop kisbetűssé alakításához (és átnevezi az oszlopot).

Python

from pyspark.sql.functions import expr
display(df.select("Count", expr("lower(County) as little_name")))

Scala

import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function

display(df.select(col("Count"), expr("lower(County) as little_name")))

R

display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
# expr() function is not supported in R, selectExpr in SparkR replicates this functionality

Tetszőleges SQL-lekérdezés futtatása spark.sql() függvénnyel

Ismerje meg, hogyan futtathat tetszőleges SQL-lekérdezéseket az Apache Spark spark.sql() függvény használatával.

Másolja és illessze be a következő kódot egy üres jegyzetfüzetcellába. Ez a kód az Apache Spark spark.sql() függvénnyel kérdez le egy SQL-táblát SQL-szintaxissal.

Python

display(spark.sql(f"SELECT * FROM {path_tables}" + "." + f"{table_name}"))

Scala

display(spark.sql(s"SELECT * FROM $path_tables.$table_name"))

R

display(sql(paste("SELECT * FROM", path_tables, ".", table_name)))

DataFrame oktatóanyag-jegyzetfüzet

Az alábbi jegyzetfüzet az oktatóanyagból származó példákat tartalmazza.

Python

DataFrames oktatójegyzetfüzet

Jegyzetfüzet beszerzése

Scala

DataFrames oktatójegyzetfüzet

Jegyzetfüzet beszerzése

R

DataFrames oktatójegyzetfüzet

Jegyzetfüzet beszerzése

További erőforrások