Zelfstudie: Gegevens laden en transformeren met Apache Spark DataFrames

In deze zelfstudie leert u hoe u gegevens laadt en transformeert met behulp van de DataFrame-API van Apache Spark (PySpark), de Apache Spark Scala DataFrame-API en de SparkR SparkDataFrame-API in Azure Databricks.

Aan het einde van deze zelfstudie begrijpt u wat een DataFrame is en vertrouwd bent met de volgende taken:

Python

Zie ook naslaginformatie over de Apache Spark PySpark-API.

Scala

Zie ook de Naslaginformatie over de Scala-API van Apache Spark.

R

Zie ook naslaginformatie over de Apache SparkR-API.

Wat is een DataFrame?

Een DataFrame is een tweedimensionale gelabelde gegevensstructuur met kolommen met mogelijk verschillende typen. U kunt een DataFrame beschouwen als een spreadsheet, een SQL-tabel of een woordenlijst met reeksobjecten. Apache Spark DataFrames bieden een uitgebreide set functies (select columns, filter, join, aggregate) waarmee u veelvoorkomende problemen met gegevensanalyse efficiënt kunt oplossen.

Apache Spark DataFrames zijn een abstractie die is gebouwd op RDD's (Resilient Distributed Datasets). Spark DataFrames en Spark SQL maken gebruik van een geïntegreerde plannings- en optimalisatie-engine, zodat u bijna identieke prestaties krijgt in alle ondersteunde talen in Azure Databricks (Python, SQL, Scala en R).

Vereisten

Als u de volgende zelfstudie wilt voltooien, moet u voldoen aan de volgende vereisten:

  • Als u de voorbeelden in deze zelfstudie wilt gebruiken, moet Unity Catalog zijn ingeschakeld voor uw werkruimte.

  • In de voorbeelden in deze zelfstudie wordt een Unity Catalog-volume gebruikt om voorbeeldgegevens op te slaan. Als u deze voorbeelden wilt gebruiken, maakt u een volume en gebruikt u de catalogus, het schema en de volumenamen van dat volume om het volumepad in te stellen dat door de voorbeelden wordt gebruikt.

  • U moet over de volgende machtigingen beschikken in Unity Catalog:

    • READ VOLUME en WRITE VOLUME, of ALL PRIVILEGES voor het volume dat voor deze zelfstudie wordt gebruikt.
    • USE SCHEMA of ALL PRIVILEGES voor het schema dat voor deze zelfstudie wordt gebruikt.
    • USE CATALOG of ALL PRIVILEGES voor de catalogus die voor deze zelfstudie wordt gebruikt.

    Als u deze machtigingen wilt instellen, raadpleegt u uw Databricks-beheerder of Unity Catalog-bevoegdheden en beveiligbare objecten.

Tip

Voor een voltooid notitieblok voor dit artikel raadpleegt u het zelfstudienotitieblok van DataFrame.

Stap 1: Variabelen definiëren en CSV-bestand laden

Deze stap definieert variabelen voor gebruik in deze zelfstudie en laadt vervolgens een CSV-bestand met babynaamgegevens van health.data.ny.gov in uw Unity Catalog-volume.

  1. Open een nieuw notitieblok door op het Nieuw pictogram pictogram te klikken. Zie de Interface en besturingselementen voor Databricks-notebooks voor meer informatie over het navigeren in Azure Databricks-notebooks.

  2. Kopieer en plak de volgende code in de nieuwe lege notebookcel. Vervang , <schema-name>en <volume-name> door <catalog-name>de catalogus-, schema- en volumenamen voor een Unity Catalog-volume. Vervang door <table_name> een tabelnaam van uw keuze. Verderop in deze zelfstudie laadt u babynaamgegevens in deze tabel.

  3. Druk Shift+Enter om de cel uit te voeren en een nieuwe lege cel te maken.

    Python

    catalog = "<catalog_name>"
    schema = "<schema_name>"
    volume = "<volume_name>"
    download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name = "rows.csv"
    table_name = "<table_name>"
    path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
    path_tables = catalog + "." + schema
    print(path_tables) # Show the complete path
    print(path_volume) # Show the complete path
    

    Scala

    val catalog = "<catalog_name>"
    val schema = "<schema_name>"
    val volume = "<volume_name>"
    val download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    val file_name = "rows.csv"
    val table_name = "<table_name>"
    val path_volume = s"/Volumes/$catalog/$schema/$volume"
    val path_tables = s"$catalog.$schema.$table_name"
    print(path_volume) // Show the complete path
    print(path_tables) // Show the complete path
    

    R

    catalog <- "<catalog_name>"
    schema <- "<schema_name>"
    volume <- "<volume_name>"
    download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name <- "rows.csv"
    table_name <- "<table_name>"
    path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "")
    path_tables <- paste(catalog, ".", schema, sep = "")
    print(path_volume) # Show the complete path
    print(path_tables) # Show the complete path
    
  4. Kopieer en plak de volgende code in de nieuwe lege notebookcel. Met deze code wordt het rows.csv bestand van health.data.ny.gov naar uw Unity Catalog-volume gekopieerd met behulp van de databricks-opdracht dbutuils .

  5. Druk Shift+Enter om de cel uit te voeren en naar de volgende cel te gaan.

    Python

    dbutils.fs.cp(f"{download_url}", f"{path_volume}" + "/" + f"{file_name}")
    

    Scala

    dbutils.fs.cp(download_url, s"$path_volume/$file_name")
    

    R

    dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
    

Stap 2: Een DataFrame maken

Met deze stap maakt u een DataFrame met de naam df1 testgegevens en wordt vervolgens de inhoud ervan weergegeven.

  1. Kopieer en plak de volgende code in de nieuwe lege notebookcel. Met deze code maakt u het Dataframe met testgegevens en geeft u vervolgens de inhoud en het schema van het DataFrame weer.

  2. Druk Shift+Enter om de cel uit te voeren en naar de volgende cel te gaan.

    Python

    data = [[2021, "test", "Albany", "M", 42]]
    columns = ["Year", "First_Name", "County", "Sex", "Count"]
    
    df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    Scala

    val data = Seq((2021, "test", "Albany", "M", 42))
    val columns = Seq("Year", "First_Name", "County", "Sex", "Count")
    
    val df1 = data.toDF(columns: _*)
    display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization.
    // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    R

    # Load the SparkR package that is already preinstalled on the cluster.
    library(SparkR)
    
    data <- data.frame(
      Year = c(2021),
      First_Name = c("test"),
      County = c("Albany"),
      Sex = c("M"),
      Count = c(42)
    )
    
    df1 <- createDataFrame(data)
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
    

Stap 3: Gegevens in een DataFrame laden vanuit een CSV-bestand

Met deze stap maakt u een DataFrame met de naam df_csv van het CSV-bestand dat u eerder in uw Unity Catalog-volume hebt geladen. Zie spark.read.csv.

  1. Kopieer en plak de volgende code in de nieuwe lege notebookcel. Met deze code worden babynaamgegevens vanuit het CSV-bestand in DataFrame df_csv geladen en wordt vervolgens de inhoud van het DataFrame weergegeven.

  2. Druk Shift+Enter om de cel uit te voeren en naar de volgende cel te gaan.

    Python

    df_csv = spark.read.csv(f"{path_volume}/{file_name}",
      header=True,
      inferSchema=True,
      sep=",")
    display(df_csv)
    

    Scala

    val df_csv = spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .option("delimiter", ",")
      .csv(s"$path_volume/$file_name")
    
    display(df_csv)
    

    R

    df_csv <- read.df(paste(path_volume, "/", file_name, sep=""),
      source="csv",
      header = TRUE,
      inferSchema = TRUE,
      delimiter = ",")
    
    display(df_csv)
    

U kunt gegevens laden uit een groot aantal ondersteunde bestandsindelingen.

Stap 4: Uw DataFrame weergeven en ermee werken

Bekijk en communiceer met uw babynamen DataFrames met behulp van de volgende methoden.

Meer informatie over het weergeven van het schema van een Apache Spark DataFrame. Apache Spark gebruikt het termenschema om te verwijzen naar de namen en gegevenstypen van de kolommen in het DataFrame.

Kopieer en plak de volgende code in een lege notebookcel. Deze code toont het schema van uw DataFrames met de .printSchema() methode om de schema's van de twee DataFrames weer te geven, om de twee DataFrames voor te bereiden.

Python

df_csv.printSchema()
df1.printSchema()

Scala

df_csv.printSchema()
df1.printSchema()

R

printSchema(df_csv)
printSchema(df1)

Notitie

Azure Databricks gebruikt ook het termenschema om een verzameling tabellen te beschrijven die zijn geregistreerd bij een catalogus.

De naam van de kolom wijzigen in het DataFrame

Meer informatie over het wijzigen van de naam van een kolom in een DataFrame.

Kopieer en plak de volgende code in een lege notebookcel. Met deze code wordt de naam van een kolom in het df1_csv DataFrame gewijzigd zodat deze overeenkomt met de desbetreffende kolom in het df1 DataFrame. Deze code maakt gebruik van de Apache Spark-methode withColumnRenamed() .

Python

df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
df_csv.printSchema

Scala

val df_csvRenamed = df_csv.withColumnRenamed("First Name", "First_Name")
// when modifying a DataFrame in Scala, you must assign it to a new variable
df_csv_renamed.printSchema()

R

df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
printSchema(df_csv)

DataFrames combineren

Leer hoe u een nieuw DataFrame maakt waarmee de rijen van het ene DataFrame aan het andere worden toegevoegd.

Kopieer en plak de volgende code in een lege notebookcel. Deze code maakt gebruik van de Apache Spark-methode union() om de inhoud van uw eerste DataFrame df te combineren met DataFrame df_csv met de babynamen die zijn geladen vanuit het CSV-bestand.

Python

df = df1.union(df_csv)
display(df)

Scala

val df = df1.union(df_csv_renamed)
display(df)

R

display(df <- union(df1, df_csv))

Rijen filteren in een DataFrame

Ontdek de populairste babynamen in uw gegevensset door rijen te filteren met behulp van de Apache Spark .filter() of .where() methoden. Filter gebruiken om een subset rijen te selecteren die moeten worden geretourneerd of gewijzigd in een DataFrame. Er is geen verschil in prestaties of syntaxis, zoals te zien is in de volgende voorbeelden.

Methode .filter() gebruiken

Kopieer en plak de volgende code in een lege notebookcel. Deze code maakt gebruik van de Apache Spark-methode .filter() om deze rijen in het DataFrame weer te geven met een telling van meer dan 50.

Python
display(df.filter(df["Count"] > 50))
Scala
display(df.filter(df("Count") > 50))
R
display(filteredDF <- filter(df, df$Count > 50))

De methode .where() gebruiken

Kopieer en plak de volgende code in een lege notebookcel. Deze code maakt gebruik van de Apache Spark-methode .where() om deze rijen in het DataFrame weer te geven met een telling van meer dan 50.

Python
display(df.where(df["Count"] > 50))
Scala
display(df.where(df("Count") > 50))
R
display(filtered_df <- where(df, df$Count > 50))

Kolommen selecteren in een DataFrame en sorteren op frequentie

Meer informatie over welke babynaamfrequentie met de select() methode wordt gebruikt om de kolommen van het DataFrame op te geven die moeten worden geretourneerd. Gebruik Apache Spark orderby en desc functies om de resultaten te ordenen.

De pyspark.sql-module voor Apache Spark biedt ondersteuning voor SQL-functies. Een van deze functies die we in deze zelfstudie gebruiken, zijn de Apache Spark orderBy()en desc()expr() functies. U schakelt het gebruik van deze functies in door ze indien nodig in uw sessie te importeren.

Kopieer en plak de volgende code in een lege notebookcel. Met deze code wordt de desc() functie geïmporteerd en vervolgens de Apache Spark-methode select() en Apache Spark orderBy() en desc() functies gebruikt om de meest voorkomende namen en hun aantallen in aflopende volgorde weer te geven.

Python

from pyspark.sql.functions import desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))

Scala

import org.apache.spark.sql.functions.desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))

R

display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))

Een subset DataFrame maken

Meer informatie over het maken van een subset DataFrame op basis van een bestaand DataFrame.

Kopieer en plak de volgende code in een lege notebookcel. Deze code maakt gebruik van de Apache Spark-methode filter om een nieuw DataFrame te maken dat de gegevens per jaar, aantal en geslacht beperkt. Er wordt gebruikgemaakt van de Apache Spark-methode select() om de kolommen te beperken. Het maakt ook gebruik van Apache Spark orderBy() en desc() functies om het nieuwe DataFrame te sorteren op aantal.

Python

subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
display(subsetDF)

Scala

val subsetDF = df.filter((df("Year") == 2009) && (df("Count") > 100) && (df("Sex") == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))

display(subsetDF)

R

subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
display(subsetDF)

Stap 5: Het DataFrame opslaan

Meer informatie over het opslaan van een DataFrame. U kunt uw DataFrame opslaan in een tabel of het DataFrame naar een bestand of meerdere bestanden schrijven.

Het DataFrame opslaan in een tabel

Azure Databricks maakt standaard gebruik van de Delta Lake-indeling voor alle tabellen. Als u uw DataFrame wilt opslaan, moet u tabelbevoegdheden hebben CREATE voor de catalogus en het schema.

Kopieer en plak de volgende code in een lege notebookcel. Met deze code wordt de inhoud van het DataFrame opgeslagen in een tabel met behulp van de variabele die u aan het begin van deze zelfstudie hebt gedefinieerd.

Python

df.write.saveAsTable(f"{path_tables}" + "." + f"{table_name}")

# To overwrite an existing table, use the following code:
# df.write.mode("overwrite").saveAsTable(f"{path_tables}" + "." + f"{table_name}")

Scala

df.write.saveAsTable(s"$path_tables" + "." + s"$table_name")

// To overwrite an existing table, use the following code:
// df.write.mode("overwrite").saveAsTable(s"$tables" + "." + s"$table_name")

R

saveAsTable(df, paste(path_tables, ".", table_name))
# To overwrite an existing table, use the following code:
# saveAsTable(df, paste(path_tables, ".", table_name), mode = "overwrite")

De meeste Apache Spark-toepassingen werken op grote gegevenssets en op gedistribueerde wijze. Apache Spark schrijft een map met bestanden uit in plaats van één bestand. Delta Lake splitst de Parquet-mappen en -bestanden. Veel gegevenssystemen kunnen deze mappen met bestanden lezen. Azure Databricks raadt het gebruik van tabellen aan via bestandspaden voor de meeste toepassingen.

Het DataFrame opslaan in JSON-bestanden

Kopieer en plak de volgende code in een lege notebookcel. Met deze code wordt het DataFrame opgeslagen in een map met JSON-bestanden.

Python

df.write.format("json").save("/tmp/json_data")

# To overwrite an existing file, use the following code:
# df.write.format("json").mode("overwrite").save("/tmp/json_data")

Scala

df.write.format("json").save("/tmp/json_data")

// To overwrite an existing file, use the following code:
// df.write.format("json").mode("overwrite").save("/tmp/json_data")

R

write.df(df, path = "/tmp/json_data", source = "json")
# To overwrite an existing file, use the following code:
# write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")

Het DataFrame lezen uit een JSON-bestand

Meer informatie over het gebruik van de Apache Spark-methode spark.read.format() om JSON-gegevens uit een map te lezen in een DataFrame.

Kopieer en plak de volgende code in een lege notebookcel. Met deze code worden de JSON-bestanden weergegeven die u in het vorige voorbeeld hebt opgeslagen.

Python

display(spark.read.format("json").json("/tmp/json_data"))

Scala

display(spark.read.format("json").json("/tmp/json_data"))

R

display(read.json("/tmp/json_data"))

Aanvullende taken: SQL-query's uitvoeren in PySpark, Scala en R

Apache Spark DataFrames bieden de volgende opties om SQL te combineren met PySpark, Scala en R. U kunt de volgende code uitvoeren in hetzelfde notebook dat u voor deze zelfstudie hebt gemaakt.

Een kolom opgeven als een SQL-query

Meer informatie over het gebruik van de Apache Spark-methode selectExpr() . Dit is een variant van de select() methode die SQL-expressies accepteert en een bijgewerkt DataFrame retourneert. Met deze methode kunt u een SQL-expressie gebruiken, zoals upper.

Kopieer en plak de volgende code in een lege notebookcel. Deze code maakt gebruik van de Apache Spark-methode selectExpr() en de SQL-expressie upper om een tekenreekskolom te converteren naar hoofdletters (en de naam van de kolom te wijzigen).

Python

display(df.selectExpr("Count", "upper(County) as big_name"))

Scala

display(df.selectExpr("Count", "upper(County) as big_name"))

R

display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))

Gebruiken expr() voor het gebruik van SQL-syntaxis voor een kolom

Meer informatie over het importeren en gebruiken van de Apache Spark-functie expr() voor het gebruik van SQL-syntaxis overal waar een kolom zou worden opgegeven.

Kopieer en plak de volgende code in een lege notebookcel. Met deze code wordt de expr() functie geïmporteerd en vervolgens de Apache Spark-functie expr() en de SQL-expressie lower gebruikt om een tekenreekskolom te converteren naar kleine letters (en de naam van de kolom te wijzigen).

Python

from pyspark.sql.functions import expr
display(df.select("Count", expr("lower(County) as little_name")))

Scala

import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function

display(df.select(col("Count"), expr("lower(County) as little_name")))

R

display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
# expr() function is not supported in R, selectExpr in SparkR replicates this functionality

Een willekeurige SQL-query uitvoeren met behulp van de functie spark.sql()

Meer informatie over het gebruik van de Apache Spark-functie spark.sql() om willekeurige SQL-query's uit te voeren.

Kopieer en plak de volgende code in een lege notebookcel. Deze code maakt gebruik van de Apache Spark-functie spark.sql() om een query uit te voeren op een SQL-tabel met behulp van sql-syntaxis.

Python

display(spark.sql(f"SELECT * FROM {path_tables}" + "." + f"{table_name}"))

Scala

display(spark.sql(s"SELECT * FROM $path_tables.$table_name"))

R

display(sql(paste("SELECT * FROM", path_tables, ".", table_name)))

Notebook voor zelfstudie over DataFrame

Het volgende notebook bevat de voorbeeldenquery's uit deze zelfstudie.

Python

Zelfstudie over DataFrames met behulp van Python-notebook

Notebook downloaden

Scala

Zelfstudie over DataFrames met behulp van Scala-notebook

Notebook downloaden

R

Zelfstudie over DataFrames met R-notebook

Notebook downloaden

Aanvullende bronnen