Tutorial: Carga y transformación de datos mediante DataFrames de Apache Spark

En este tutorial se muestra cómo cargar y transformar datos mediante la API DataFrame de Apache Spark Python (PySpark), la API DataFrame de Apache Spark Scala y la API SparkDataFrame de SparkR en Azure Databricks.

Al final de este tutorial, comprenderá lo que es un DataFrame y estará familiarizado con las siguientes tareas:

Python

Consulte también la referencia de la API de PySpark de Apache Spark.

Scala

Consulte también la referencia de la API de Scala de Apache Spark.

R

Consulte también la referencia de la API de Apache SparkR.

¿Qué es un DataFrame?

Un DataFrame es una estructura de datos etiquetada bidimensional con columnas de tipos potencialmente diferentes. Puede pensar en un DataFrame como una hoja de cálculo, una tabla SQL o un diccionario de objetos de serie. DataFrame de Apache Spark proporciona un amplio conjunto de funciones (selección de columnas, filtro, unión, incorporación) que permiten resolver problemas comunes de análisis de datos de forma eficaz.

Los DataFrames de Apache Spark son una compilación de abstracción basada en conjuntos de datos distribuidos resistentes (RDD). Spark DataFrame y Spark SQL usan un motor unificado de planificación y optimización, lo que le permite obtener un rendimiento casi idéntico en todos los lenguajes admitidos en Azure Databricks (Python, SQL, Scala y R).

Requisitos

Para completar el siguiente tutorial, debe cumplir los siguientes requisitos:

  • Para usar los ejemplos de este tutorial, el área de trabajo debe tener habilitado Unity Catalog.

  • En los ejemplos de este tutorial se usa un volumen de Unity Catalog para almacenar datos de ejemplo. Para usar estos ejemplos, cree un volumen y use los nombres de catálogo, esquema y volumen de ese volumen para establecer la ruta de acceso del volumen usada por los ejemplos.

  • Debe tener estos permisos en Unity Catalog:

    • READ VOLUME y WRITE VOLUME, o ALL PRIVILEGES para el volumen usado para este tutorial.
    • USE SCHEMA o ALL PRIVILEGES para el esquema usado para este tutorial.
    • USE CATALOG o ALL PRIVILEGES para el catálogo usado para este tutorial.

    Para establecer estos permisos, consulte los privilegios de administrador de Databricks o Unity Catalog y objetos protegibles.

Paso 1: Definir variables y cargar el archivo CSV

En este paso se definen las variables para su uso en este tutorial y, a continuación, se carga un archivo CSV que contiene los datos de nombres de bebé de health.data.ny.gov en el volumen de Unity Catalog.

  1. Para abrir un nuevo cuaderno, haga clic en el icono Icono Nuevo. Para obtener información sobre cómo navegar por cuadernos de Azure Databricks, consulte Interfaz y controles del cuaderno de Databricks.

  2. Copie y pegue el código siguiente en la celda del nuevo cuaderno vacío. Reemplace <catalog-name>, <schema-name> y <volume-name> por los nombres de catálogo, esquema y volumen de un volumen de Unity Catalog. Reemplace <table_name> por un nombre de la tabla de su elección. Cargará los datos de nombres de bebé en esta tabla más adelante en este tutorial.

  3. Presione Shift+Enter para ejecutar la celda y crear una nueva celda en blanco.

    Python

    catalog = "<catalog_name>"
    schema = "<schema_name>"
    volume = "<volume_name>"
    download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name = "rows.csv"
    table_name = "<table_name>"
    path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
    path_tables = catalog + "." + schema
    print(path_tables) # Show the complete path
    print(path_volume) # Show the complete path
    

    Scala

    val catalog = "<catalog_name>"
    val schema = "<schema_name>"
    val volume = "<volume_name>"
    val download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    val file_name = "rows.csv"
    val table_name = "<table_name>"
    val path_volume = s"/Volumes/$catalog/$schema/$volume"
    val path_tables = s"$catalog.$schema.$table_name"
    print(path_volume) // Show the complete path
    print(path_tables) // Show the complete path
    

    R

    catalog <- "<catalog_name>"
    schema <- "<schema_name>"
    volume <- "<volume_name>"
    download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name <- "rows.csv"
    table_name <- "<table_name>"
    path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "")
    path_tables <- paste(catalog, ".", schema, sep = "")
    print(path_volume) # Show the complete path
    print(path_tables) # Show the complete path
    
  4. Copie y pegue el código siguiente en la celda del nuevo cuaderno vacío. Este código copia el archivo rows.csv de health.data.ny.gov en el volumen de Unity Catalog mediante el comando dbutuils de Databricks.

  5. Presione Shift+Enter para ejecutar la celda y, a continuación, vaya a la celda siguiente.

    Python

    dbutils.fs.cp(f"{download_url}", f"{path_volume}" + "/" + f"{file_name}")
    

    Scala

    dbutils.fs.cp(download_url, s"$path_volume/$file_name")
    

    R

    dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
    

Paso 2: Crear un DataFrame

En este paso se crea un DataFrame denominado df1 con datos de prueba y, a continuación, se muestra su contenido.

  1. Copie y pegue el código siguiente en la celda del nuevo cuaderno vacío. Este código crea el DataFrame con datos de prueba y, a continuación, muestra el contenido y el esquema del DataFrame.

  2. Presione Shift+Enter para ejecutar la celda y, a continuación, vaya a la celda siguiente.

    Python

    data = [[2021, "test", "Albany", "M", 42]]
    columns = ["Year", "First_Name", "County", "Sex", "Count"]
    
    df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    Scala

    val data = Seq((2021, "test", "Albany", "M", 42))
    val columns = Seq("Year", "First_Name", "County", "Sex", "Count")
    
    val df1 = data.toDF(columns: _*)
    display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization.
    // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    R

    # Load the SparkR package that is already preinstalled on the cluster.
    library(SparkR)
    
    data <- data.frame(
      Year = c(2021),
      First_Name = c("test"),
      County = c("Albany"),
      Sex = c("M"),
      Count = c(42)
    )
    
    df1 <- createDataFrame(data)
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
    

Paso 3: Cargar datos en un DataFrame desde un archivo CSV

Este paso crea un DataFrame denominado df_csv desde el archivo CSV que cargó anteriormente en el volumen de Unity Catalog. Consulte spark.read.csv.

  1. Copie y pegue el código siguiente en la celda del nuevo cuaderno vacío. Este código carga los datos de nombres de bebé en el DataFrame df_csv desde el archivo CSV y, a continuación, muestra el contenido del DataFrame.

  2. Presione Shift+Enter para ejecutar la celda y, a continuación, vaya a la celda siguiente.

    Python

    df_csv = spark.read.csv(f"{path_volume}/{file_name}",
      header=True,
      inferSchema=True,
      sep=",")
    display(df_csv)
    

    Scala

    val df_csv = spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .option("delimiter", ",")
      .csv(s"$path_volume/$file_name")
    
    display(df_csv)
    

    R

    df_csv <- read.df(paste(path_volume, "/", file_name, sep=""),
      source="csv",
      header = TRUE,
      inferSchema = TRUE,
      delimiter = ",")
    
    display(df_csv)
    

Puede cargar datos de muchos formatos de archivo admitidos.

Paso 4: Ver e interactuar con el DataFrame

Vea e interactúe con los DataFrames de nombres de bebé mediante los métodos siguientes.

Aprenda a mostrar el esquema de un DataFrame de Apache Spark. Spark usa el término esquema para hacer referencia a los nombres y los tipos de datos de las columnas del DataFrame.

Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código muestra el esquema de los DataFrames con el método .printSchema() para ver los esquemas de los dos DataFrames y preparar la unión de ambos.

Python

df_csv.printSchema()
df1.printSchema()

Scala

df_csv.printSchema()
df1.printSchema()

R

printSchema(df_csv)
printSchema(df1)

Nota:

Azure Databricks también usa el esquema de términos para describir una colección de tablas registradas en un catálogo.

Cambiar un nombre de columna en el DataFrame

Obtenga información sobre cómo cambiar el nombre de una columna en un DataFrame.

Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código cambia el nombre de una columna del DataFrame df1_csv para que coincida con la columna correspondiente en el DataFrame df1. Este código usa el método withColumnRenamed() de Apache Spark.

Python

df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
df_csv.printSchema

Scala

val df_csvRenamed = df_csv.withColumnRenamed("First Name", "First_Name")
// when modifying a DataFrame in Scala, you must assign it to a new variable
df_csv_renamed.printSchema()

R

df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
printSchema(df_csv)

Combinar DataFrames

Obtenga información sobre cómo crear un DataFrame que agregue las filas de un DataFrame a otro.

Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código usa el método Apache Spark union() para combinar el contenido del primer DataFrame df con el DataFrame df_csv que contiene los datos de nombres de bebé cargados desde el archivo CSV.

Python

df = df1.union(df_csv)
display(df)

Scala

val df = df1.union(df_csv_renamed)
display(df)

R

display(df <- union(df1, df_csv))

Filtrado de filas en un DataFrame

Descubra los nombres de bebé más populares en el conjunto de datos mediante el filtrado de filas mediante los métodos .filter() o .where() de Apache Spark. Use el filtrado para seleccionar un subconjunto de filas para devolver o modificar en un DataFrame. No hay ninguna diferencia en el rendimiento o la sintaxis, como se muestra en los siguientes ejemplos.

Uso del método .filter()

Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código usa el método Apache Spark .filter() para mostrar esas filas en el DataFrame con un recuento de más de 50.

Python
display(df.filter(df["Count"] > 50))
Scala
display(df.filter(df("Count") > 50))
R
display(filteredDF <- filter(df, df$Count > 50))

Uso del método .where()

Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código usa el método Apache Spark .where() para mostrar esas filas en el DataFrame con un recuento de más de 50.

Python
display(df.where(df["Count"] > 50))
Scala
display(df.where(df("Count") > 50))
R
display(filtered_df <- where(df, df$Count > 50))

Seleccionar columnas de un DataFrame y ordenar por frecuencia

Obtenga información sobre la frecuencia de un nombre de bebé con el método select() para especificar las columnas de DataFrame que se van a devolver. Use las funciones orderby y desc de Apache Spark para ordenar los resultados.

El módulo pyspark.sql para Apache Spark proporciona compatibilidad con funciones SQL. Entre estas funciones que se usan en este tutorial se encuentran las funciones orderBy(), desc() y expr() de Apache Spark. Para habilitar el uso de estas funciones, debe importarlas en la sesión según sea necesario.

Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código importa la función desc() y, a continuación, usa el método select() de Apache Spark y las funciones orderBy() y desc() de Apache Spark para mostrar los nombres más comunes y sus recuentos en orden descendente.

Python

from pyspark.sql.functions import desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))

Scala

import org.apache.spark.sql.functions.desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))

R

display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))

Creación de un DataFrame de subconjunto

Obtenga información sobre cómo crear un DataFrame de subconjunto a partir de un DataFrame existente.

Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código usa el método filter de Apache Spark para crear un nuevo DataFrame que restringe los datos por año, recuento y sexo. Usa el método select() de Apache Spark para limitar las columnas. También usa las funciones orderBy() y desc() de Apache Spark para ordenar el nuevo DataFrame por recuento.

Python

subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
display(subsetDF)

Scala

val subsetDF = df.filter((df("Year") == 2009) && (df("Count") > 100) && (df("Sex") == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))

display(subsetDF)

R

subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
display(subsetDF)

Paso 5: Guardar el DataFrame

Obtenga información sobre cómo guardar un DataFrame. Puede guardar el DataFrame en una tabla o escribir el DataFrame en un archivo o en varios archivos.

Guardar el DataFrame en una tabla

Azure Databricks usa el formato Delta Lake para todas las tablas de forma predeterminada. Para guardar el Dataframe, debe tener privilegios de tabla CREATE en el catálogo y el esquema.

Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código guarda el contenido de DataFrame en una tabla mediante la variable que definió al principio de este tutorial.

Python

df.write.saveAsTable(f"{path_tables}" + "." + f"{table_name}")

# To overwrite an existing table, use the following code:
# df.write.mode("overwrite").saveAsTable(f"{path_tables}" + "." + f"{table_name}")

Scala

df.write.saveAsTable(s"$path_tables" + "." + s"$table_name")

// To overwrite an existing table, use the following code:
// df.write.mode("overwrite").saveAsTable(s"$path_volume" + "." + s"$table_name")

R

saveAsTable(df, paste(path_tables, ".", table_name))
# To overwrite an existing table, use the following code:
# saveAsTable(df, paste(path_tables, ".", table_name), mode = "overwrite")

La mayoría de las aplicaciones Spark funcionan en grandes conjuntos de datos y de forma distribuida. Spark escribe un directorio de archivos en lugar de un solo archivo. Delta Lake divide las carpetas y los archivos de Parquet. Muchos sistemas de datos pueden leer estos directorios de archivos. Azure Databricks recomienda usar tablas a través de rutas de acceso de archivo para la mayoría de las aplicaciones.

Guardar el DataFrame en archivos JSON

Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código guarda el DataFrame en un directorio de archivos JSON.

Python

df.write.format("json").save("/tmp/json_data")

# To overwrite an existing file, use the following code:
# df.write.format("json").mode("overwrite").save("/tmp/json_data")

Scala

df.write.format("json").save("/tmp/json_data")

// To overwrite an existing file, use the following code:
// df.write.format("json").mode("overwrite").save("/tmp/json_data")

R

write.df(df, path = "/tmp/json_data", source = "json")
# To overwrite an existing file, use the following code:
# write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")

Lea el DataFrame de un archivo JSON

Aprenda a usar el método spark.read.format() de Apache Spark para leer datos JSON de un directorio en un DataFrame.

Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código muestra los archivos JSON que guardó en el ejemplo anterior.

Python

display(spark.read.format("json").json("/tmp/json_data"))

Scala

display(spark.read.format("json").json("/tmp/json_data"))

R

display(read.json("/tmp/json_data"))

Tareas adicionales: Ejecución de consultas SQL en PySpark, Scala y R

Los DataFrames de Apache Spark proporcionan las siguientes opciones para combinar SQL con PySpark, Scala y R. Puede ejecutar el código siguiente en el mismo cuaderno que creó para este tutorial.

Especificar una columna como una consulta SQL

Aprenda a usar el método selectExpr() de Apache Spark. Se trata de una variante del método select() que acepta expresiones SQL y devuelve un DataFrame actualizado. Este método permite usar una expresión SQL, como upper.

Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código usa el método Apache Spark selectExpr() y la expresión SQL upper para convertir una columna de cadena en mayúsculas (y cambiar el nombre de la columna).

Python

display(df.selectExpr("Count", "upper(County) as big_name"))

Scala

display(df.selectExpr("Count", "upper(County) as big_name"))

R

display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))

Uso de expr() para usar la sintaxis SQL para una columna

Obtenga información sobre cómo importar y usar la función de Apache Spark expr() para usar la sintaxis SQL en cualquier lugar en el que se especifique una columna.

Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código importa la función expr() y, a continuación, usa la función expr() de Apache Spark y la expresión SQL lower para convertir una columna de cadena en minúsculas (y cambiar el nombre de la columna).

Python

from pyspark.sql.functions import expr
display(df.select("Count", expr("lower(County) as little_name")))

Scala

import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function

display(df.select(col("Count"), expr("lower(County) as little_name")))

R

display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
# expr() function is not supported in R, selectExpr in SparkR replicates this functionality

Ejecución de una consulta SQL arbitraria mediante la función spark.sql()

Aprenda a usar la función spark.sql() de Apache Spark para ejecutar consultas SQL arbitrarias.

Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código usa la función spark.sql() de Apache Spark para consultar una tabla SQL mediante la sintaxis SQL.

Python

display(spark.sql(f"SELECT * FROM {path_tables}" + "." + f"{table_name}"))

Scala

display(spark.sql(s"SELECT * FROM $path_tables.$table_name"))

R

display(sql(paste("SELECT * FROM", path_tables, ".", table_name)))

Cuaderno del tutorial de DataFrame

En el cuaderno siguiente se incluyen las consultas de ejemplos de este tutorial.

Python

Cuaderno del tutorial de DataFrames

Obtener el cuaderno

Scala

Cuaderno del tutorial de DataFrames

Obtener el cuaderno

R

Cuaderno del tutorial de DataFrames

Obtener el cuaderno

Recursos adicionales