Tutorial: Carga y transformación de datos mediante DataFrames de Apache Spark
En este tutorial se muestra cómo cargar y transformar datos mediante la API DataFrame de Apache Spark Python (PySpark), la API DataFrame de Apache Spark Scala y la API SparkDataFrame de SparkR en Azure Databricks.
Al final de este tutorial, comprenderá lo que es un DataFrame y estará familiarizado con las siguientes tareas:
Python
- Definición de variables y copia de datos públicos en un volumen de Unity Catalog
- Creación de un objeto DataFrame con Python
- Carga de datos en un DataFrame desde un archivo CSV
- Ver e interactuar con un DataFrame
- Guardar el DataFrame
- Ejecución de consultas SQL en PySpark
Consulte también la referencia de la API de PySpark de Apache Spark.
Scala
- Definición de variables y copia de datos públicos en un volumen de Unity Catalog
- Crear un Dataframe con Scala
- Carga de datos en un DataFrame desde un archivo CSV
- Ver e interactuar con una DataFrame
- Guardar el DataFrame
- Ejecución de consultas SQL en Apache Spark
Consulte también la referencia de la API de Scala de Apache Spark.
R
- Definición de variables y copia de datos públicos en un volumen de Unity Catalog
- Creación de un SparkDataFrames de SparkR
- Carga de datos en un DataFrame desde un archivo CSV
- Ver e interactuar con un DataFrame
- Guardar el DataFrame
- Ejecución de consultas SQL en SparkR
Consulte también la referencia de la API de Apache SparkR.
¿Qué es un DataFrame?
Un DataFrame es una estructura de datos etiquetada bidimensional con columnas de tipos potencialmente diferentes. Puede pensar en un DataFrame como una hoja de cálculo, una tabla SQL o un diccionario de objetos de serie. DataFrame de Apache Spark proporciona un amplio conjunto de funciones (selección de columnas, filtro, unión, incorporación) que permiten resolver problemas comunes de análisis de datos de forma eficaz.
Los DataFrames de Apache Spark son una compilación de abstracción basada en conjuntos de datos distribuidos resistentes (RDD). Spark DataFrame y Spark SQL usan un motor unificado de planificación y optimización, lo que le permite obtener un rendimiento casi idéntico en todos los lenguajes admitidos en Azure Databricks (Python, SQL, Scala y R).
Requisitos
Para completar el siguiente tutorial, debe cumplir los siguientes requisitos:
Para usar los ejemplos de este tutorial, el área de trabajo debe tener habilitado Unity Catalog.
En los ejemplos de este tutorial se usa un volumen de Unity Catalog para almacenar datos de ejemplo. Para usar estos ejemplos, cree un volumen y use los nombres de catálogo, esquema y volumen de ese volumen para establecer la ruta de acceso del volumen usada por los ejemplos.
Debe tener estos permisos en Unity Catalog:
READ VOLUME
yWRITE VOLUME
, oALL PRIVILEGES
para el volumen usado para este tutorial.USE SCHEMA
oALL PRIVILEGES
para el esquema usado para este tutorial.USE CATALOG
oALL PRIVILEGES
para el catálogo usado para este tutorial.
Para establecer estos permisos, consulte los privilegios de administrador de Databricks o Unity Catalog y objetos protegibles.
Paso 1: Definir variables y cargar el archivo CSV
En este paso se definen las variables para su uso en este tutorial y, a continuación, se carga un archivo CSV que contiene los datos de nombres de bebé de health.data.ny.gov en el volumen de Unity Catalog.
Para abrir un nuevo cuaderno, haga clic en el icono . Para obtener información sobre cómo navegar por cuadernos de Azure Databricks, consulte Interfaz y controles del cuaderno de Databricks.
Copie y pegue el código siguiente en la celda del nuevo cuaderno vacío. Reemplace
<catalog-name>
,<schema-name>
y<volume-name>
por los nombres de catálogo, esquema y volumen de un volumen de Unity Catalog. Reemplace<table_name>
por un nombre de la tabla de su elección. Cargará los datos de nombres de bebé en esta tabla más adelante en este tutorial.Presione
Shift+Enter
para ejecutar la celda y crear una nueva celda en blanco.Python
catalog = "<catalog_name>" schema = "<schema_name>" volume = "<volume_name>" download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name = "rows.csv" table_name = "<table_name>" path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume path_tables = catalog + "." + schema print(path_tables) # Show the complete path print(path_volume) # Show the complete path
Scala
val catalog = "<catalog_name>" val schema = "<schema_name>" val volume = "<volume_name>" val download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" val file_name = "rows.csv" val table_name = "<table_name>" val path_volume = s"/Volumes/$catalog/$schema/$volume" val path_tables = s"$catalog.$schema.$table_name" print(path_volume) // Show the complete path print(path_tables) // Show the complete path
R
catalog <- "<catalog_name>" schema <- "<schema_name>" volume <- "<volume_name>" download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name <- "rows.csv" table_name <- "<table_name>" path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "") path_tables <- paste(catalog, ".", schema, sep = "") print(path_volume) # Show the complete path print(path_tables) # Show the complete path
Copie y pegue el código siguiente en la celda del nuevo cuaderno vacío. Este código copia el archivo
rows.csv
de health.data.ny.gov en el volumen de Unity Catalog mediante el comando dbutuils de Databricks.Presione
Shift+Enter
para ejecutar la celda y, a continuación, vaya a la celda siguiente.Python
dbutils.fs.cp(f"{download_url}", f"{path_volume}" + "/" + f"{file_name}")
Scala
dbutils.fs.cp(download_url, s"$path_volume/$file_name")
R
dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
Paso 2: Crear un DataFrame
En este paso se crea un DataFrame denominado df1
con datos de prueba y, a continuación, se muestra su contenido.
Copie y pegue el código siguiente en la celda del nuevo cuaderno vacío. Este código crea el DataFrame con datos de prueba y, a continuación, muestra el contenido y el esquema del DataFrame.
Presione
Shift+Enter
para ejecutar la celda y, a continuación, vaya a la celda siguiente.Python
data = [[2021, "test", "Albany", "M", 42]] columns = ["Year", "First_Name", "County", "Sex", "Count"] df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int") display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
Scala
val data = Seq((2021, "test", "Albany", "M", 42)) val columns = Seq("Year", "First_Name", "County", "Sex", "Count") val df1 = data.toDF(columns: _*) display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization. // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
R
# Load the SparkR package that is already preinstalled on the cluster. library(SparkR) data <- data.frame( Year = c(2021), First_Name = c("test"), County = c("Albany"), Sex = c("M"), Count = c(42) ) df1 <- createDataFrame(data) display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
Paso 3: Cargar datos en un DataFrame desde un archivo CSV
Este paso crea un DataFrame denominado df_csv
desde el archivo CSV que cargó anteriormente en el volumen de Unity Catalog. Consulte spark.read.csv.
Copie y pegue el código siguiente en la celda del nuevo cuaderno vacío. Este código carga los datos de nombres de bebé en el DataFrame
df_csv
desde el archivo CSV y, a continuación, muestra el contenido del DataFrame.Presione
Shift+Enter
para ejecutar la celda y, a continuación, vaya a la celda siguiente.Python
df_csv = spark.read.csv(f"{path_volume}/{file_name}", header=True, inferSchema=True, sep=",") display(df_csv)
Scala
val df_csv = spark.read .option("header", "true") .option("inferSchema", "true") .option("delimiter", ",") .csv(s"$path_volume/$file_name") display(df_csv)
R
df_csv <- read.df(paste(path_volume, "/", file_name, sep=""), source="csv", header = TRUE, inferSchema = TRUE, delimiter = ",") display(df_csv)
Puede cargar datos de muchos formatos de archivo admitidos.
Paso 4: Ver e interactuar con el DataFrame
Vea e interactúe con los DataFrames de nombres de bebé mediante los métodos siguientes.
Imprimir el esquema Dataframe
Aprenda a mostrar el esquema de un DataFrame de Apache Spark. Spark usa el término esquema para hacer referencia a los nombres y los tipos de datos de las columnas del DataFrame.
Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código muestra el esquema de los DataFrames con el método .printSchema()
para ver los esquemas de los dos DataFrames y preparar la unión de ambos.
Python
df_csv.printSchema()
df1.printSchema()
Scala
df_csv.printSchema()
df1.printSchema()
R
printSchema(df_csv)
printSchema(df1)
Nota:
Azure Databricks también usa el esquema de términos para describir una colección de tablas registradas en un catálogo.
Cambiar un nombre de columna en el DataFrame
Obtenga información sobre cómo cambiar el nombre de una columna en un DataFrame.
Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código cambia el nombre de una columna del DataFrame df1_csv
para que coincida con la columna correspondiente en el DataFrame df1
. Este código usa el método withColumnRenamed()
de Apache Spark.
Python
df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
df_csv.printSchema
Scala
val df_csvRenamed = df_csv.withColumnRenamed("First Name", "First_Name")
// when modifying a DataFrame in Scala, you must assign it to a new variable
df_csv_renamed.printSchema()
R
df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
printSchema(df_csv)
Combinar DataFrames
Obtenga información sobre cómo crear un DataFrame que agregue las filas de un DataFrame a otro.
Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código usa el método Apache Spark union()
para combinar el contenido del primer DataFrame df
con el DataFrame df_csv
que contiene los datos de nombres de bebé cargados desde el archivo CSV.
Python
df = df1.union(df_csv)
display(df)
Scala
val df = df1.union(df_csv_renamed)
display(df)
R
display(df <- union(df1, df_csv))
Filtrado de filas en un DataFrame
Descubra los nombres de bebé más populares en el conjunto de datos mediante el filtrado de filas mediante los métodos .filter()
o .where()
de Apache Spark. Use el filtrado para seleccionar un subconjunto de filas para devolver o modificar en un DataFrame. No hay ninguna diferencia en el rendimiento o la sintaxis, como se muestra en los siguientes ejemplos.
Uso del método .filter()
Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código usa el método Apache Spark .filter()
para mostrar esas filas en el DataFrame con un recuento de más de 50.
Python
display(df.filter(df["Count"] > 50))
Scala
display(df.filter(df("Count") > 50))
R
display(filteredDF <- filter(df, df$Count > 50))
Uso del método .where()
Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código usa el método Apache Spark .where()
para mostrar esas filas en el DataFrame con un recuento de más de 50.
Python
display(df.where(df["Count"] > 50))
Scala
display(df.where(df("Count") > 50))
R
display(filtered_df <- where(df, df$Count > 50))
Seleccionar columnas de un DataFrame y ordenar por frecuencia
Obtenga información sobre la frecuencia de un nombre de bebé con el método select()
para especificar las columnas de DataFrame que se van a devolver. Use las funciones orderby
y desc
de Apache Spark para ordenar los resultados.
El módulo pyspark.sql para Apache Spark proporciona compatibilidad con funciones SQL. Entre estas funciones que se usan en este tutorial se encuentran las funciones orderBy()
, desc()
y expr()
de Apache Spark. Para habilitar el uso de estas funciones, debe importarlas en la sesión según sea necesario.
Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código importa la función desc()
y, a continuación, usa el método select()
de Apache Spark y las funciones orderBy()
y desc()
de Apache Spark para mostrar los nombres más comunes y sus recuentos en orden descendente.
Python
from pyspark.sql.functions import desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))
Scala
import org.apache.spark.sql.functions.desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))
R
display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
Creación de un DataFrame de subconjunto
Obtenga información sobre cómo crear un DataFrame de subconjunto a partir de un DataFrame existente.
Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código usa el método filter
de Apache Spark para crear un nuevo DataFrame que restringe los datos por año, recuento y sexo. Usa el método select()
de Apache Spark para limitar las columnas. También usa las funciones orderBy()
y desc()
de Apache Spark para ordenar el nuevo DataFrame por recuento.
Python
subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
display(subsetDF)
Scala
val subsetDF = df.filter((df("Year") == 2009) && (df("Count") > 100) && (df("Sex") == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
display(subsetDF)
R
subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
display(subsetDF)
Paso 5: Guardar el DataFrame
Obtenga información sobre cómo guardar un DataFrame. Puede guardar el DataFrame en una tabla o escribir el DataFrame en un archivo o en varios archivos.
Guardar el DataFrame en una tabla
Azure Databricks usa el formato Delta Lake para todas las tablas de forma predeterminada. Para guardar el Dataframe, debe tener privilegios de tabla CREATE
en el catálogo y el esquema.
Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código guarda el contenido de DataFrame en una tabla mediante la variable que definió al principio de este tutorial.
Python
df.write.saveAsTable(f"{path_tables}" + "." + f"{table_name}")
# To overwrite an existing table, use the following code:
# df.write.mode("overwrite").saveAsTable(f"{path_tables}" + "." + f"{table_name}")
Scala
df.write.saveAsTable(s"$path_tables" + "." + s"$table_name")
// To overwrite an existing table, use the following code:
// df.write.mode("overwrite").saveAsTable(s"$path_volume" + "." + s"$table_name")
R
saveAsTable(df, paste(path_tables, ".", table_name))
# To overwrite an existing table, use the following code:
# saveAsTable(df, paste(path_tables, ".", table_name), mode = "overwrite")
La mayoría de las aplicaciones Spark funcionan en grandes conjuntos de datos y de forma distribuida. Spark escribe un directorio de archivos en lugar de un solo archivo. Delta Lake divide las carpetas y los archivos de Parquet. Muchos sistemas de datos pueden leer estos directorios de archivos. Azure Databricks recomienda usar tablas a través de rutas de acceso de archivo para la mayoría de las aplicaciones.
Guardar el DataFrame en archivos JSON
Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código guarda el DataFrame en un directorio de archivos JSON.
Python
df.write.format("json").save("/tmp/json_data")
# To overwrite an existing file, use the following code:
# df.write.format("json").mode("overwrite").save("/tmp/json_data")
Scala
df.write.format("json").save("/tmp/json_data")
// To overwrite an existing file, use the following code:
// df.write.format("json").mode("overwrite").save("/tmp/json_data")
R
write.df(df, path = "/tmp/json_data", source = "json")
# To overwrite an existing file, use the following code:
# write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
Lea el DataFrame de un archivo JSON
Aprenda a usar el método spark.read.format()
de Apache Spark para leer datos JSON de un directorio en un DataFrame.
Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código muestra los archivos JSON que guardó en el ejemplo anterior.
Python
display(spark.read.format("json").json("/tmp/json_data"))
Scala
display(spark.read.format("json").json("/tmp/json_data"))
R
display(read.json("/tmp/json_data"))
Tareas adicionales: Ejecución de consultas SQL en PySpark, Scala y R
Los DataFrames de Apache Spark proporcionan las siguientes opciones para combinar SQL con PySpark, Scala y R. Puede ejecutar el código siguiente en el mismo cuaderno que creó para este tutorial.
Especificar una columna como una consulta SQL
Aprenda a usar el método selectExpr()
de Apache Spark. Se trata de una variante del método select()
que acepta expresiones SQL y devuelve un DataFrame actualizado. Este método permite usar una expresión SQL, como upper
.
Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código usa el método Apache Spark selectExpr()
y la expresión SQL upper
para convertir una columna de cadena en mayúsculas (y cambiar el nombre de la columna).
Python
display(df.selectExpr("Count", "upper(County) as big_name"))
Scala
display(df.selectExpr("Count", "upper(County) as big_name"))
R
display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
Uso de expr()
para usar la sintaxis SQL para una columna
Obtenga información sobre cómo importar y usar la función de Apache Spark expr()
para usar la sintaxis SQL en cualquier lugar en el que se especifique una columna.
Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código importa la función expr()
y, a continuación, usa la función expr()
de Apache Spark y la expresión SQL lower
para convertir una columna de cadena en minúsculas (y cambiar el nombre de la columna).
Python
from pyspark.sql.functions import expr
display(df.select("Count", expr("lower(County) as little_name")))
Scala
import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function
display(df.select(col("Count"), expr("lower(County) as little_name")))
R
display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
# expr() function is not supported in R, selectExpr in SparkR replicates this functionality
Ejecución de una consulta SQL arbitraria mediante la función spark.sql()
Aprenda a usar la función spark.sql()
de Apache Spark para ejecutar consultas SQL arbitrarias.
Copie y pegue el código siguiente en una celda de cuaderno vacía. Este código usa la función spark.sql()
de Apache Spark para consultar una tabla SQL mediante la sintaxis SQL.
Python
display(spark.sql(f"SELECT * FROM {path_tables}" + "." + f"{table_name}"))
Scala
display(spark.sql(s"SELECT * FROM $path_tables.$table_name"))
R
display(sql(paste("SELECT * FROM", path_tables, ".", table_name)))
Cuaderno del tutorial de DataFrame
En el cuaderno siguiente se incluyen las consultas de ejemplos de este tutorial.