Руководство. Загрузка и преобразование данных с помощью кадров данных Apache Spark

В этом руководстве показано, как загружать и преобразовывать данные с помощью API Кадра данных Apache Spark (PySpark), API Apache Spark Scala DataFrame и API SparkR SparkDataFrame в Azure Databricks.

В конце этого руководства вы узнаете, что такое кадр данных, и ознакомится со следующими задачами:

Python

См. также справочник по API Apache Spark PySpark.

Scala

См. также справочник по API Apache Spark Scala.

R

См. также справочник по API Apache SparkR.

Что такое кадр данных?

Объект DataFrame, или кадр данных, — это двухмерная размеченная структура данных со столбцами, типы которых могут быть разными. Кадр данных можно представить как электронную таблицу, таблицу SQL или словарь объектов ряда. Кадры данных Apache Spark предоставляют широкий набор функций (выбор столбцов, фильтров, соединения, агрегатов), которые позволяют эффективно решать распространенные проблемы анализа данных.

Кадры данных Apache Spark — это абстракция, созданная на основе устойчивых распределенных наборов данных (RDD). Кадры данных Spark и Spark SQL используют единый механизм планирования и оптимизации, что позволяет получить почти одинаковую производительность на всех поддерживаемых языках в Azure Databricks (Python, SQL, Scala и R).

Требования

Чтобы выполнить следующее руководство, необходимо выполнить следующие требования:

  • Чтобы использовать примеры, приведенные в этом руководстве, рабочая область должна включать каталог Unity.

  • Примеры, приведенные в этом руководстве, используют том каталога Unity для хранения примеров данных. Чтобы использовать эти примеры, создайте том и используйте каталог, схему и имена томов, чтобы задать путь тома, используемый примерами.

  • У вас должны быть следующие разрешения в каталоге Unity:

    • READ VOLUME и WRITE VOLUME( или ALL PRIVILEGES для тома, используемого для этого руководства.
    • USE SCHEMA или ALL PRIVILEGES схему, используемую для этого руководства.
    • USE CATALOG или ALL PRIVILEGES для каталога, используемого для этого руководства.

    Чтобы задать эти разрешения, ознакомьтесь с правами администратора Databricks или каталога Unity и защищаемыми объектами.

Совет

Полный записной книжки для этой статьи см . в записной книжке учебника dataFrame.

Шаг 1. Определение переменных и загрузка CSV-файла

Этот шаг определяет переменные для использования в этом руководстве, а затем загружает CSV-файл, содержащий данные имени ребенка из health.data.ny.gov в том каталога Unity.

  1. Откройте новую записную книжку, щелкнув Значок значок. Чтобы узнать, как перемещаться по записным книжкам Azure Databricks, ознакомьтесь с интерфейсом и элементами управления записной книжки Databricks.

  2. Скопируйте и вставьте следующий код в новую пустую ячейку записной книжки. Замените <catalog-name>, <schema-name>а также <volume-name> именами каталогов, схем и томов для тома каталога Unity. Замените <table_name> выбранным именем таблицы. Вы загрузите данные имени ребенка в эту таблицу далее в этом руководстве.

  3. Нажмите Shift+Enter , чтобы запустить ячейку и создать пустую ячейку.

    Python

    catalog = "<catalog_name>"
    schema = "<schema_name>"
    volume = "<volume_name>"
    download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name = "rows.csv"
    table_name = "<table_name>"
    path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
    path_tables = catalog + "." + schema
    print(path_tables) # Show the complete path
    print(path_volume) # Show the complete path
    

    Scala

    val catalog = "<catalog_name>"
    val schema = "<schema_name>"
    val volume = "<volume_name>"
    val download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    val file_name = "rows.csv"
    val table_name = "<table_name>"
    val path_volume = s"/Volumes/$catalog/$schema/$volume"
    val path_tables = s"$catalog.$schema.$table_name"
    print(path_volume) // Show the complete path
    print(path_tables) // Show the complete path
    

    R

    catalog <- "<catalog_name>"
    schema <- "<schema_name>"
    volume <- "<volume_name>"
    download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name <- "rows.csv"
    table_name <- "<table_name>"
    path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "")
    path_tables <- paste(catalog, ".", schema, sep = "")
    print(path_volume) # Show the complete path
    print(path_tables) # Show the complete path
    
  4. Скопируйте и вставьте следующий код в новую пустую ячейку записной книжки. Этот код копирует rows.csv файл из health.data.ny.gov в том каталога Unity с помощью команды Databricks dbutuils .

  5. Нажмите Shift+Enter , чтобы запустить ячейку, а затем перейдите к следующей ячейке.

    Python

    dbutils.fs.cp(f"{download_url}", f"{path_volume}" + "/" + f"{file_name}")
    

    Scala

    dbutils.fs.cp(download_url, s"$path_volume/$file_name")
    

    R

    dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
    

Шаг 2. Создание кадра данных

На этом шаге создается кадр данных с именами df1 тестов, а затем отображается его содержимое.

  1. Скопируйте и вставьте следующий код в новую пустую ячейку записной книжки. Этот код создает кадр данных с тестовых данных, а затем отображает содержимое и схему кадра данных.

  2. Нажмите Shift+Enter , чтобы запустить ячейку, а затем перейдите к следующей ячейке.

    Python

    data = [[2021, "test", "Albany", "M", 42]]
    columns = ["Year", "First_Name", "County", "Sex", "Count"]
    
    df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    Scala

    val data = Seq((2021, "test", "Albany", "M", 42))
    val columns = Seq("Year", "First_Name", "County", "Sex", "Count")
    
    val df1 = data.toDF(columns: _*)
    display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization.
    // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    R

    # Load the SparkR package that is already preinstalled on the cluster.
    library(SparkR)
    
    data <- data.frame(
      Year = c(2021),
      First_Name = c("test"),
      County = c("Albany"),
      Sex = c("M"),
      Count = c(42)
    )
    
    df1 <- createDataFrame(data)
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
    

Шаг 3. Загрузка данных в кадр данных из CSV-файла

На этом шаге создается кадр данных с именем df_csv из CSV-файла, который вы ранее загружали в том каталога Unity. См . spark.read.csv.

  1. Скопируйте и вставьте следующий код в новую пустую ячейку записной книжки. Этот код загружает данные имени ребенка в кадр данных df_csv из CSV-файла, а затем отображает содержимое кадра данных.

  2. Нажмите Shift+Enter , чтобы запустить ячейку, а затем перейдите к следующей ячейке.

    Python

    df_csv = spark.read.csv(f"{path_volume}/{file_name}",
      header=True,
      inferSchema=True,
      sep=",")
    display(df_csv)
    

    Scala

    val df_csv = spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .option("delimiter", ",")
      .csv(s"$path_volume/$file_name")
    
    display(df_csv)
    

    R

    df_csv <- read.df(paste(path_volume, "/", file_name, sep=""),
      source="csv",
      header = TRUE,
      inferSchema = TRUE,
      delimiter = ",")
    
    display(df_csv)
    

Данные можно загрузить из многих поддерживаемых форматов файлов.

Шаг 4. Просмотр и взаимодействие с кадром данных

Просмотр и взаимодействие с кадрами данных ребенка с помощью следующих методов.

Узнайте, как отобразить схему кадра данных Apache Spark. Apache Spark использует схему терминов для ссылки на имена и типы данных столбцов в Кадре данных.

Скопируйте и вставьте следующий код в пустую ячейку записной книжки. В этом коде показана схема кадров данных с .printSchema() методом для просмотра схем двух кадров данных, чтобы подготовиться к объединению двух кадров данных.

Python

df_csv.printSchema()
df1.printSchema()

Scala

df_csv.printSchema()
df1.printSchema()

R

printSchema(df_csv)
printSchema(df1)

Примечание.

Azure Databricks также использует схему терминов для описания коллекции таблиц, зарегистрированных в каталоге.

Переименование столбца в кадре данных

Узнайте, как переименовать столбец в кадре данных.

Скопируйте и вставьте следующий код в пустую ячейку записной книжки. Этот код переименовывает столбец в df1_csv кадре данных для сопоставления соответствующего столбца в кадре df1 данных. Этот код использует метод Apache Spark withColumnRenamed() .

Python

df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
df_csv.printSchema

Scala

val df_csvRenamed = df_csv.withColumnRenamed("First Name", "First_Name")
// when modifying a DataFrame in Scala, you must assign it to a new variable
df_csv_renamed.printSchema()

R

df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
printSchema(df_csv)

Объединение кадров данных

Узнайте, как создать новый кадр данных, который добавляет строки одного кадра данных в другой.

Скопируйте и вставьте следующий код в пустую ячейку записной книжки. Этот код использует метод Apache Spark union() для объединения содержимого первого кадра df данных с кадром данных df_csv , содержащим данные дочерних имен, загруженные из CSV-файла.

Python

df = df1.union(df_csv)
display(df)

Scala

val df = df1.union(df_csv_renamed)
display(df)

R

display(df <- union(df1, df_csv))

Фильтрация строк в кадре данных

Узнайте о самых популярных именах детей в наборе данных, отфильтровав строки с помощью Apache Spark .filter() или .where() методов. Используйте фильтрацию для выбора подмножества строк для возврата или изменения в кадре данных. Нет различий в производительности или синтаксисе, как показано в следующих примерах.

Использование метода .filter()

Скопируйте и вставьте следующий код в пустую ячейку записной книжки. Этот код использует метод Apache Spark .filter() для отображения этих строк в кадре данных с числом более 50.

Python
display(df.filter(df["Count"] > 50))
Scala
display(df.filter(df("Count") > 50))
R
display(filteredDF <- filter(df, df$Count > 50))

Использование метода .where()

Скопируйте и вставьте следующий код в пустую ячейку записной книжки. Этот код использует метод Apache Spark .where() для отображения этих строк в кадре данных с числом более 50.

Python
display(df.where(df["Count"] > 50))
Scala
display(df.where(df("Count") > 50))
R
display(filtered_df <- where(df, df$Count > 50))

Выбор столбцов из кадра данных и порядка по частоте

Сведения о частоте имени ребенка с методом select() , чтобы указать столбцы из кадра данных для возврата. Используйте Apache Spark orderby и desc функции для упорядочивания результатов.

Модуль pyspark.sql для Apache Spark обеспечивает поддержку функций SQL. Среди этих функций, которые мы используем в этом руководстве, являются Apache Spark orderBy()desc()и expr() функции. Вы можете использовать эти функции, импортируя их в сеанс по мере необходимости.

Скопируйте и вставьте следующий код в пустую ячейку записной книжки. Этот код импортирует desc() функцию, а затем использует метод Apache Spark select()orderBy()desc() и функции Для отображения наиболее распространенных имен и их счетчиков в порядке убывания.

Python

from pyspark.sql.functions import desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))

Scala

import org.apache.spark.sql.functions.desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))

R

display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))

Создание подмножества кадра данных

Узнайте, как создать подмножество кадра данных из существующего кадра данных.

Скопируйте и вставьте следующий код в пустую ячейку записной книжки. Этот код использует метод Apache Spark filter для создания нового кадра данных, ограничивающего данные по годам, подсчетам и сексу. Он использует метод Apache Spark select() для ограничения столбцов. Он также использует Apache Spark orderBy() и desc() функции для сортировки нового кадра данных по количеству.

Python

subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
display(subsetDF)

Scala

val subsetDF = df.filter((df("Year") == 2009) && (df("Count") > 100) && (df("Sex") == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))

display(subsetDF)

R

subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
display(subsetDF)

Шаг 5. Сохранение кадра данных

Узнайте, как сохранить кадр данных. Вы можете сохранить кадр данных в таблицу или записать кадр данных в файл или несколько файлов.

Сохранение кадра данных в таблице

Azure Databricks использует формат Delta Lake для всех таблиц по умолчанию. Чтобы сохранить кадр данных, необходимо иметь CREATE права на таблицу в каталоге и схеме.

Скопируйте и вставьте следующий код в пустую ячейку записной книжки. Этот код сохраняет содержимое кадра данных в таблицу с помощью переменной, определенной в начале этого руководства.

Python

df.write.saveAsTable(f"{path_tables}" + "." + f"{table_name}")

# To overwrite an existing table, use the following code:
# df.write.mode("overwrite").saveAsTable(f"{path_tables}" + "." + f"{table_name}")

Scala

df.write.saveAsTable(s"$path_tables" + "." + s"$table_name")

// To overwrite an existing table, use the following code:
// df.write.mode("overwrite").saveAsTable(s"$tables" + "." + s"$table_name")

R

saveAsTable(df, paste(path_tables, ".", table_name))
# To overwrite an existing table, use the following code:
# saveAsTable(df, paste(path_tables, ".", table_name), mode = "overwrite")

Большинство приложений Apache Spark работают над большими наборами данных и распределенной модой. Apache Spark записывает каталог файлов, а не один файл. Delta Lake разделяет папки и файлы Parquet. Многие системы данных могут считывать эти каталоги файлов. Azure Databricks рекомендует использовать таблицы по пути к файлам для большинства приложений.

Сохранение кадра данных в JSON-файлы

Скопируйте и вставьте следующий код в пустую ячейку записной книжки. Этот код сохраняет кадр данных в каталог JSON-файлов.

Python

df.write.format("json").save("/tmp/json_data")

# To overwrite an existing file, use the following code:
# df.write.format("json").mode("overwrite").save("/tmp/json_data")

Scala

df.write.format("json").save("/tmp/json_data")

// To overwrite an existing file, use the following code:
// df.write.format("json").mode("overwrite").save("/tmp/json_data")

R

write.df(df, path = "/tmp/json_data", source = "json")
# To overwrite an existing file, use the following code:
# write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")

Чтение кадра данных из JSON-файла

Узнайте, как использовать метод Apache Spark spark.read.format() для чтения данных JSON из каталога в кадр данных.

Скопируйте и вставьте следующий код в пустую ячейку записной книжки. Этот код отображает файлы JSON, сохраненные в предыдущем примере.

Python

display(spark.read.format("json").json("/tmp/json_data"))

Scala

display(spark.read.format("json").json("/tmp/json_data"))

R

display(read.json("/tmp/json_data"))

Дополнительные задачи: выполнение запросов SQL в PySpark, Scala и R

Кадры данных Apache Spark предоставляют следующие параметры для объединения SQL с PySpark, Scala и R. Приведенный ниже код можно запустить в той же записной книжке, которую вы создали для этого руководства.

Указание столбца в виде SQL-запроса

Узнайте, как использовать метод Apache Spark selectExpr() . Это вариант select() метода, который принимает выражения SQL и возвращает обновленный кадр данных. Этот метод позволяет использовать выражение SQL, например upper.

Скопируйте и вставьте следующий код в пустую ячейку записной книжки. Этот код использует метод Apache Spark selectExpr() и выражение SQL upper для преобразования строкового столбца в верхний регистр (и переименования столбца).

Python

display(df.selectExpr("Count", "upper(County) as big_name"))

Scala

display(df.selectExpr("Count", "upper(County) as big_name"))

R

display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))

Использование expr() синтаксиса SQL для столбца

Узнайте, как импортировать и использовать функцию Apache Spark expr() для использования синтаксиса SQL в любом месте, где будет указан столбец.

Скопируйте и вставьте следующий код в пустую ячейку записной книжки. Этот код импортирует expr() функцию, а затем использует функцию Apache Spark expr() и выражение SQL lower для преобразования строкового столбца в нижний регистр (и переименования столбца).

Python

from pyspark.sql.functions import expr
display(df.select("Count", expr("lower(County) as little_name")))

Scala

import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function

display(df.select(col("Count"), expr("lower(County) as little_name")))

R

display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
# expr() function is not supported in R, selectExpr in SparkR replicates this functionality

Выполнение произвольного SQL-запроса с помощью функции spark.sql()

Узнайте, как использовать функцию Apache Spark spark.sql() для выполнения произвольных запросов SQL.

Скопируйте и вставьте следующий код в пустую ячейку записной книжки. Этот код использует функцию Apache Spark spark.sql() для запроса таблицы SQL с помощью синтаксиса SQL.

Python

display(spark.sql(f"SELECT * FROM {path_tables}" + "." + f"{table_name}"))

Scala

display(spark.sql(s"SELECT * FROM $path_tables.$table_name"))

R

display(sql(paste("SELECT * FROM", path_tables, ".", table_name)))

Записная книжка учебника по кадрам данных

В следующей записной книжке приведены примеры запросов из этого руководства.

Python

Руководство по кадрам данных с помощью записной книжки Python

Получить записную книжку

Scala

Руководство по кадрам данных с помощью записной книжки Scala

Получить записную книжку

R

Руководство по кадрам данных с помощью записной книжки R

Получить записную книжку

Дополнительные ресурсы