Kurz: Načtení a transformace dat pomocí datových rámců Apache Sparku

V tomto kurzu se dozvíte, jak načíst a transformovat data pomocí rozhraní API datového rámce Apache Spark Python (PySpark), rozhraní API datového rámce Apache Spark Scala a rozhraní API SparkR SparkDataFrame v Azure Databricks.

Na konci tohoto kurzu pochopíte, co datový rámec je, a seznámíte se s následujícími úlohami:

Python

Viz také referenční informace k rozhraní API Apache Spark PySpark.

Scala

Viz také referenční informace k rozhraní Apache Spark Scala API.

R

Viz také referenční informace k rozhraní Apache SparkR API.

Co je datový rámec?

Datový rámec je dvourozměrná datová struktura s sloupci potenciálně různých typů. Datový rámec si můžete představit jako tabulku, tabulku SQL nebo slovník objektů řady. Datové rámce Apache Spark poskytují bohatou sadu funkcí (výběr sloupců, filtrování, spojení, agregace), které umožňují efektivně řešit běžné problémy s analýzou dat.

Datové rámce Apache Sparku jsou abstrakce založená na odolných distribuovaných datových sadách (RDD). Datové rámce Sparku a Spark SQL používají jednotný modul pro plánování a optimalizaci, který umožňuje získat téměř stejný výkon ve všech podporovaných jazycích v Azure Databricks (Python, SQL, Scala a R).

Požadavky

K dokončení následujícího kurzu musíte splnit následující požadavky:

  • Pokud chcete použít příklady v tomto kurzu, musí mít váš pracovní prostor povolený katalog Unity.

  • Příklady v tomto kurzu používají k ukládání ukázkových dat svazek katalogu Unity. Pokud chcete tyto příklady použít, vytvořte svazek a použijte katalog, schéma a názvy svazků k nastavení cesty svazku používané příklady.

  • V katalogu Unity musíte mít následující oprávnění:

    • READ VOLUME a WRITE VOLUME) nebo ALL PRIVILEGES pro svazek použitý pro tento kurz.
    • USE SCHEMA nebo ALL PRIVILEGES pro schéma použité pro tento kurz.
    • USE CATALOG nebo ALL PRIVILEGES pro katalog použitý pro tento kurz.

    Pokud chcete tato oprávnění nastavit, podívejte se na správce Databricks nebo na oprávnění katalogu Unity a zabezpečitelné objekty.

Krok 1: Definování proměnných a načtení souboru CSV

Tento krok definuje proměnné pro použití v tomto kurzu a pak načte soubor CSV obsahující data názvu dítěte z health.data.ny.gov do svazku katalogu Unity.

  1. Kliknutím Nová ikona na ikonu otevřete nový poznámkový blok. Informace o procházení poznámkových bloků Azure Databricks najdete v tématu Rozhraní a ovládací prvky poznámkového bloku Databricks.

  2. Zkopírujte a vložte následující kód do nové prázdné buňky poznámkového bloku. Nahraďte <catalog-name>katalog, <volume-name><schema-name>schéma a názvy svazků pro svazek katalogu Unity. Nahraďte <table_name> zvoleným názvem tabulky. Data názvu dítěte načtete do této tabulky později v tomto kurzu.

  3. Stisknutím spustíte Shift+Enter buňku a vytvoříte novou prázdnou buňku.

    Python

    catalog = "<catalog_name>"
    schema = "<schema_name>"
    volume = "<volume_name>"
    download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name = "rows.csv"
    table_name = "<table_name>"
    path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
    path_tables = catalog + "." + schema
    print(path_tables) # Show the complete path
    print(path_volume) # Show the complete path
    

    Scala

    val catalog = "<catalog_name>"
    val schema = "<schema_name>"
    val volume = "<volume_name>"
    val download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    val file_name = "rows.csv"
    val table_name = "<table_name>"
    val path_volume = s"/Volumes/$catalog/$schema/$volume"
    val path_tables = s"$catalog.$schema.$table_name"
    print(path_volume) // Show the complete path
    print(path_tables) // Show the complete path
    

    R

    catalog <- "<catalog_name>"
    schema <- "<schema_name>"
    volume <- "<volume_name>"
    download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name <- "rows.csv"
    table_name <- "<table_name>"
    path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "")
    path_tables <- paste(catalog, ".", schema, sep = "")
    print(path_volume) # Show the complete path
    print(path_tables) # Show the complete path
    
  4. Zkopírujte a vložte následující kód do nové prázdné buňky poznámkového bloku. Tento kód zkopíruje rows.csv soubor z health.data.ny.gov do svazku katalogu Unity pomocí příkazu Databricks dbutuils .

  5. Stisknutím klávesy Shift+Enter spusťte buňku a přejděte na další buňku.

    Python

    dbutils.fs.cp(f"{download_url}", f"{path_volume}" + "/" + f"{file_name}")
    

    Scala

    dbutils.fs.cp(download_url, s"$path_volume/$file_name")
    

    R

    dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
    

Krok 2: Vytvoření datového rámce

Tento krok vytvoří datový rámec s testovacími df1 daty a pak zobrazí jeho obsah.

  1. Zkopírujte a vložte následující kód do nové prázdné buňky poznámkového bloku. Tento kód vytvoří datový rámec s testovacími daty a pak zobrazí obsah a schéma datového rámce.

  2. Stisknutím klávesy Shift+Enter spusťte buňku a přejděte na další buňku.

    Python

    data = [[2021, "test", "Albany", "M", 42]]
    columns = ["Year", "First_Name", "County", "Sex", "Count"]
    
    df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    Scala

    val data = Seq((2021, "test", "Albany", "M", 42))
    val columns = Seq("Year", "First_Name", "County", "Sex", "Count")
    
    val df1 = data.toDF(columns: _*)
    display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization.
    // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    R

    # Load the SparkR package that is already preinstalled on the cluster.
    library(SparkR)
    
    data <- data.frame(
      Year = c(2021),
      First_Name = c("test"),
      County = c("Albany"),
      Sex = c("M"),
      Count = c(42)
    )
    
    df1 <- createDataFrame(data)
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
    

Krok 3: Načtení dat do datového rámce ze souboru CSV

Tento krok vytvoří datový rámec pojmenovaný df_csv ze souboru CSV, který jste předtím načetli do svazku katalogu Unity. Viz spark.read.csv.

  1. Zkopírujte a vložte následující kód do nové prázdné buňky poznámkového bloku. Tento kód načte data názvu dítěte do datového rámce df_csv ze souboru CSV a pak zobrazí obsah datového rámce.

  2. Stisknutím klávesy Shift+Enter spusťte buňku a přejděte na další buňku.

    Python

    df_csv = spark.read.csv(f"{path_volume}/{file_name}",
      header=True,
      inferSchema=True,
      sep=",")
    display(df_csv)
    

    Scala

    val df_csv = spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .option("delimiter", ",")
      .csv(s"$path_volume/$file_name")
    
    display(df_csv)
    

    R

    df_csv <- read.df(paste(path_volume, "/", file_name, sep=""),
      source="csv",
      header = TRUE,
      inferSchema = TRUE,
      delimiter = ",")
    
    display(df_csv)
    

Můžete načíst data z mnoha podporovaných formátů souborů.

Krok 4: Zobrazení datového rámce a interakce s ním

Pomocí následujících metod můžete zobrazit datové rámce s názvy dětí a pracovat s nimi.

Zjistěte, jak zobrazit schéma datového rámce Apache Spark. Apache Spark používá schéma termínů k odkazování na názvy a datové typy sloupců v datovém rámci.

Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód ukazuje schéma datových rámců s metodou .printSchema() pro zobrazení schémat dvou datových rámců – pro přípravu na sjednocení těchto dvou datových rámců.

Python

df_csv.printSchema()
df1.printSchema()

Scala

df_csv.printSchema()
df1.printSchema()

R

printSchema(df_csv)
printSchema(df1)

Poznámka:

Azure Databricks také používá schéma termínů k popisu kolekce tabulek zaregistrovaných v katalogu.

Přejmenování sloupce v datovém rámci

Zjistěte, jak přejmenovat sloupec v datovém rámci.

Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód přejmenuje sloupec v datovém df1_csv rámci tak, aby odpovídal příslušnému sloupci v datovém rámci df1 . Tento kód používá metodu Apache Spark withColumnRenamed() .

Python

df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
df_csv.printSchema

Scala

val df_csvRenamed = df_csv.withColumnRenamed("First Name", "First_Name")
// when modifying a DataFrame in Scala, you must assign it to a new variable
df_csv_renamed.printSchema()

R

df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
printSchema(df_csv)

Kombinování datových rámců

Zjistěte, jak vytvořit nový datový rámec, který přidá řádky jednoho datového rámce do druhého.

Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód používá metodu Apache Spark union() ke kombinování obsahu prvního datového rámce df s datovým rámcem df_csv obsahujícím data názvů dětí načtená ze souboru CSV.

Python

df = df1.union(df_csv)
display(df)

Scala

val df = df1.union(df_csv_renamed)
display(df)

R

display(df <- union(df1, df_csv))

Filtrování řádků v datovém rámci

Seznamte se s nejoblíbenějšími názvy dětí v sadě dat filtrováním řádků pomocí Apache Sparku .filter() nebo .where() metod. Pomocí filtrování vyberte podmnožinu řádků, které chcete vrátit nebo upravit v datovém rámci. V výkonu nebo syntaxi není žádný rozdíl, jak je vidět v následujících příkladech.

Použití metody .filter()

Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód používá metodu Apache Spark .filter() k zobrazení těchto řádků v datovém rámci s počtem více než 50.

Python
display(df.filter(df["Count"] > 50))
Scala
display(df.filter(df("Count") > 50))
R
display(filteredDF <- filter(df, df$Count > 50))

Použití metody .where()

Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód používá metodu Apache Spark .where() k zobrazení těchto řádků v datovém rámci s počtem více než 50.

Python
display(df.where(df["Count"] > 50))
Scala
display(df.where(df("Count") > 50))
R
display(filtered_df <- where(df, df$Count > 50))

Výběr sloupců z datového rámce a pořadí podle frekvence

Přečtěte si, jakou frekvenci select() názvu dítěte metoda určuje sloupce z datového rámce, které se mají vrátit. K seřazení výsledků použijte Apache Spark orderby a desc funkce.

Modul pyspark.sql pro Apache Spark poskytuje podporu funkcí SQL. Mezi tyto funkce, které používáme v tomto kurzu, patří Apache Spark orderBy(), desc()a expr() funkce. Použití těchto funkcí povolíte tak, že je podle potřeby naimportujete do relace.

Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód naimportuje desc() funkci a pak použije metodu Apache Spark select() a Apache Spark orderBy() a desc() funkce k zobrazení nejběžnějších názvů a jejich počtu v sestupném pořadí.

Python

from pyspark.sql.functions import desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))

Scala

import org.apache.spark.sql.functions.desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))

R

display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))

Vytvoření datového rámce podmnožina

Zjistěte, jak vytvořit podmnožinu datového rámce z existujícího datového rámce.

Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód používá metodu Apache Spark filter k vytvoření nového datového rámce, který omezuje data podle roku, počtu a pohlaví. K omezení sloupců používá metodu Apache Spark select() . Používá také Apache Spark orderBy() a desc() funkce k seřazení nového datového rámce podle počtu.

Python

subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
display(subsetDF)

Scala

val subsetDF = df.filter((df("Year") == 2009) && (df("Count") > 100) && (df("Sex") == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))

display(subsetDF)

R

subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
display(subsetDF)

Krok 5: Uložení datového rámce

Naučte se ukládat datový rámec. Datový rámec můžete uložit do tabulky nebo zapsat datový rámec do souboru nebo do více souborů.

Uložení datového rámce do tabulky

Azure Databricks ve výchozím nastavení používá formát Delta Lake pro všechny tabulky. Pokud chcete datový rámec uložit, musíte mít CREATE oprávnění tabulky k katalogu a schématu.

Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód uloží obsah datového rámce do tabulky pomocí proměnné, kterou jste definovali na začátku tohoto kurzu.

Python

df.write.saveAsTable(s"$path_tables" + "." + s"$table_name")

# To overwrite an existing table, use the following code:
# df.write.mode("overwrite").saveAsTable(fs"$path_tables" + "." + s"$table_name")

Scala

df.write.saveAsTable(s"$path_tables" + "." + s"$table_name")

// To overwrite an existing table, use the following code:
// df.write.mode("overwrite").saveAsTable(s"$path_volume" + "." + s"$table_name")

R

saveAsTable(df, paste(path_tables, ".", table_name))
# To overwrite an existing table, use the following code:
# saveAsTable(df, paste(path_tables, ".", table_name), mode = "overwrite")

Většina aplikací Apache Spark pracuje na velkých datových sadách a distribuovaným způsobem. Apache Spark zapisuje adresář souborů místo jednoho souboru. Delta Lake rozdělí složky a soubory Parquet. Mnoho datových systémů může tyto adresáře souborů číst. Azure Databricks doporučuje používat tabulky přes cesty k souborům pro většinu aplikací.

Uložení datového rámce do souborů JSON

Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód uloží datový rámec do adresáře souborů JSON.

Python

df.write.format("json").save("/tmp/json_data")

# To overwrite an existing file, use the following code:
# df.write.format("json").mode("overwrite").save("/tmp/json_data")

Scala

df.write.format("json").save("/tmp/json_data")

// To overwrite an existing file, use the following code:
// df.write.format("json").mode("overwrite").save("/tmp/json_data")

R

write.df(df, path = "/tmp/json_data", source = "json")
# To overwrite an existing file, use the following code:
# write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")

Čtení datového rámce ze souboru JSON

Naučte se používat metodu Apache Spark spark.read.format() ke čtení dat JSON z adresáře do datového rámce.

Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód zobrazí soubory JSON, které jste uložili v předchozím příkladu.

Python

display(spark.read.format("json").json("/tmp/json_data"))

Scala

display(spark.read.format("json").json("/tmp/json_data"))

R

display(read.json("/tmp/json_data"))

Další úlohy: Spouštění dotazů SQL v PySpark, Scala a R

Datové rámce Apache Spark nabízejí následující možnosti pro kombinování SQL s PySpark, Scala a R. Následující kód můžete spustit ve stejném poznámkovém bloku, který jste vytvořili pro účely tohoto kurzu.

Zadání sloupce jako dotazu SQL

Naučte se používat metodu Apache Spark selectExpr() . Jedná se o variantu select() metody, která přijímá výrazy SQL a vrací aktualizovaný datový rámec. Tato metoda umožňuje použít výraz SQL, například upper.

Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód používá metodu Apache Spark selectExpr() a výraz SQL upper k převodu sloupce řetězce na velká písmena (a přejmenování sloupce).

Python

display(df.selectExpr("Count", "upper(County) as big_name"))

Scala

display(df.selectExpr("Count", "upper(County) as big_name"))

R

display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))

Použití expr() syntaxe SQL pro sloupec

Naučte se importovat a používat funkci Apache Spark expr() k použití syntaxe SQL kdekoli, kde by byl zadaný sloupec.

Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód naimportuje expr() funkci a pak použije funkci Apache Spark expr() a výraz SQL lower k převodu sloupce řetězce na malá písmena (a přejmenování sloupce).

Python

from pyspark.sql.functions import expr
display(df.select("Count", expr("lower(County) as little_name")))

Scala

import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function

display(df.select(col("Count"), expr("lower(County) as little_name")))

R

display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
# expr() function is not supported in R, selectExpr in SparkR replicates this functionality

Spuštění libovolného dotazu SQL pomocí funkce spark.sql()

Naučte se používat funkci Apache Spark spark.sql() ke spouštění libovolných dotazů SQL.

Zkopírujte a vložte následující kód do prázdné buňky poznámkového bloku. Tento kód používá funkci Apache Spark spark.sql() k dotazování tabulky SQL pomocí syntaxe SQL.

Python

display(spark.sql(f"SELECT * FROM {path_tables}" + "." + f"{table_name}"))

Scala

display(spark.sql(s"SELECT * FROM $path_tables.$table_name"))

R

display(sql(paste("SELECT * FROM", path_tables, ".", table_name)))

Poznámkový blok kurzu datového rámce

Následující poznámkový blok obsahuje příklady dotazů z tohoto kurzu.

Python

Poznámkový blok kurzu datových rámců

Získat poznámkový blok

Scala

Poznámkový blok kurzu datových rámců

Získat poznámkový blok

R

Poznámkový blok kurzu datových rámců

Získat poznámkový blok

Další materiály