Tutorial: Carregar e transformar dados no PySpark DataFrames

Este tutorial mostra como carregar e transformar dados de cidades dos EUA usando a API DataFrame Apache Spark Python (PySpark) no Azure Databricks.

Ao final deste tutorial, você entenderá o que é um DataFrame e estará familiarizado com as seguintes tarefas:

Consulte também Referência da API do Apache Spark PySpark.

O que é um DataFrame?

Um DataFrame é uma estrutura de dados rotulada bidimensional com colunas de tipos potencialmente diferentes. Você pode pensar em um DataFrame como uma planilha, uma tabela SQL ou um dicionário de objetos de série. O Apache Spark DataFrames fornece um rico conjunto de funções (selecionar colunas, filtrar, juntar, agregar) que permitem resolver problemas comuns de análise de dados de forma eficiente.

Os Apache Spark DataFrames são uma abstração construída sobre conjuntos de dados distribuídos resilientes (RDDs). O Spark DataFrames e o Spark SQL usam um mecanismo unificado de planejamento e otimização, permitindo que você obtenha um desempenho quase idêntico em todas as linguagens com suporte no Azure Databricks (Python, SQL, Scala e R).

Requisitos

Para concluir o tutorial a seguir, você deve atender aos seguintes requisitos:

Nota

Se você não tiver privilégios de controle de cluster, ainda poderá concluir a maioria das etapas a seguir, desde que tenha acesso a um cluster.

Na barra lateral da página inicial, você acessa as entidades do Azure Databricks: o navegador do espaço de trabalho, catálogo, explorador, fluxos de trabalho e computação. Espaço de trabalho é a pasta raiz que armazena seus ativos do Azure Databricks, como blocos de anotações e bibliotecas.

Etapa 1: Criar um DataFrame com Python

Para saber como navegar nos blocos de anotações do Azure Databricks, consulte Interface e controles do bloco de anotações Databricks.

  1. Abra um novo bloco de notas e insira uma nova célula clicando no New Icon ícone.
  2. Copie e cole o código a seguir em uma célula vazia do bloco de anotações e pressione Shift+Enter para executar a célula. O exemplo de código a seguir cria um DataFrame nomeado df1 com dados de população da cidade e exibe seu conteúdo.
data = [[295, "South Bend", "Indiana", "IN", 101190, 112.9]]
columns = ["rank", "city", "state", "code", "population", "price"]

df1 = spark.createDataFrame(data, schema="rank LONG, city STRING, state STRING, code STRING, population LONG, price DOUBLE")
display(df1)

Etapa 2: Carregar dados em um DataFrame a partir de arquivos

Adicione mais dados da população da cidade do /databricks-datasets diretório ao df2.

Para carregar dados no DataFrame df2 a partir do data_geo.csv arquivo:

  1. No bloco de notas, crie uma nova célula.
  2. Copie e cole o seguinte código na célula vazia do bloco de notas e, em seguida, prima Shift+Enter para executar a célula.

Você pode carregar dados de muitos formatos de arquivo suportados. O exemplo a seguir usa um conjunto de dados disponível no diretório, acessível a /databricks-datasets partir da maioria dos espaços de trabalho. Consulte Conjuntos de dados de exemplo.

df2 = (spark.read
  .format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)

Etapa 3: Exibir e interagir com seu DataFrame

Visualize e interaja com DataFrames da população da sua cidade usando os seguintes métodos.

Combinar DataFrames

Combine o conteúdo do seu primeiro DataFrame com o DataFrame que contém o conteúdo do data_geo.csv.

No bloco de anotações, use o código de exemplo a seguir para criar um novo DataFrame que adiciona as linhas de um DataFrame a outro usando a operação union:

# Returns a DataFrame that combines the rows of df1 and df2
df = df1.union(df2)

Ver o DataFrame

Para exibir os dados da cidade dos EUA em um formato tabular, use o comando Azure Databricks em uma célula do bloco de display() anotações.

display(df)

O Spark usa o termo esquema para se referir aos nomes e tipos de dados das colunas no DataFrame.

Imprima o esquema do seu DataFrame com o seguinte .printSchema() método no seu bloco de notas. Use os metadados resultantes para interagir com o conteúdo do seu DataFrame.

df.printSchema()

Nota

O Azure Databricks também usa o termo esquema para descrever uma coleção de tabelas registradas em um catálogo.

Filtrar linhas em um DataFrame

Descubra as cinco cidades mais populosas do seu conjunto de dados filtrando linhas, usando .filter() ou .where(). Use a filtragem para selecionar um subconjunto de linhas para retornar ou modificar em um DataFrame. Não há diferença no desempenho ou na sintaxe, como visto nos exemplos a seguir.

# Filter rows using .filter()
filtered_df = df.filter(df["rank"] < 6)
display(filtered_df)

# Filter rows using .where()
filtered_df = df.where(df["rank"] < 6)
display(filtered_df)

Selecionar colunas de um DataFrame

Saiba em que estado uma cidade está localizada com o select() método. Selecione colunas passando um ou mais nomes de colunas para .select(), como no exemplo a seguir:

select_df = df.select("City", "State")
display(select_df)

Criar um subconjunto DataFrame

Crie um subconjunto DataFrame com as dez cidades com maior população e exiba os dados resultantes. Combine consultas de seleção e filtro para limitar linhas e colunas retornadas, usando o seguinte código em seu bloco de anotações:

subset_df = df.filter(df["rank"] < 11).select("City")
display(subset_df)

Etapa 4: salvar o DataFrame

Você pode salvar seu DataFrame em uma tabela ou gravar o DataFrame em um arquivo ou em vários arquivos.

Salvar o DataFrame em uma tabela

O Azure Databricks usa o formato Delta Lake para todas as tabelas por padrão. Para salvar seu DataFrame, você deve ter CREATE privilégios de tabela no catálogo e no esquema. O exemplo a seguir salva o conteúdo do DataFrame em uma tabela chamada us_cities:

df.write.saveAsTable("us_cities")

A maioria dos aplicativos Spark funciona em grandes conjuntos de dados e de forma distribuída. O Spark grava um diretório de arquivos em vez de um único arquivo. Delta Lake divide as pastas e arquivos do Parquet. Muitos sistemas de dados podem ler esses diretórios de arquivos. O Azure Databricks recomenda o uso de tabelas em caminhos de arquivo para a maioria dos aplicativos.

Salve o DataFrame em arquivos JSON

O exemplo a seguir salva um diretório de arquivos JSON:

# Write a DataFrame to a collection of files
df.write.format("json").save("/tmp/json_data")

Ler o DataFrame de um arquivo JSON

# Read a DataFrame from a JSON file
df3 = spark.read.format("json").json("/tmp/json_data")
display(df3)

Tarefas adicionais: Executar consultas SQL no PySpark

O Spark DataFrames fornece as seguintes opções para combinar SQL com Python. Você pode executar o código a seguir no mesmo bloco de anotações que você criou para este tutorial.

Especificar uma coluna como uma consulta SQL

O selectExpr() método permite especificar cada coluna como uma consulta SQL, como no exemplo a seguir:

display(df.selectExpr("`rank`", "upper(city) as big_name"))

Importação expr()

Você pode importar a função para pyspark.sql.functions usar a sintaxe SQL em qualquer lugar onde uma coluna seja especificada, como no exemplo a expr() seguir:

from pyspark.sql.functions import expr

display(df.select("rank", expr("lower(city) as little_name")))

Executar uma consulta SQL arbitrária

Você pode usar spark.sql() para executar consultas SQL arbitrárias, como no exemplo a seguir:

query_df = spark.sql("SELECT * FROM us_cities")

Parametrizar consultas SQL

Você pode usar a formatação Python para parametrizar consultas SQL, como no exemplo a seguir:

table_name = "us_cities"

query_df = spark.sql(f"SELECT * FROM {table_name}")

Recursos adicionais