Руководство. Azure Data Lake Storage 2-го поколения, Azure Databricks и Spark

В этом руководстве показано, как подключить кластер Azure Databricks к данным, хранящимся в учетной записи хранения Azure с поддержкой Azure Data Lake Storage 2-го поколения. Благодаря этому подключению запросы и аналитику можно изначально выполнять из кластера в данных.

При работе с этим руководством вы сделаете следующее:

  • Прием неструктурированных данных в учетной записи хранения
  • Выполнять анализ данных в хранилище BLOB-объектов.

Если у вас нет подписки Azure, создайте бесплатную учетную запись, прежде чем приступить к работе.

Необходимые компоненты

Создание рабочей области Azure Databricks, кластера и записной книжки

  1. Создайте рабочую область Azure Databricks. См. статью "Создание рабочей области Azure Databricks".

  2. Создание кластера. См. Создание кластера.

  3. Создайте записную книжку. См. статью "Создание записной книжки". Выберите Python в качестве языка записной книжки по умолчанию.

Оставьте записную книжку открытой. Его можно использовать в следующих разделах.

Скачивание данных о рейсах

В этом руководстве используются данные о производительности по времени в январе 2016 года из Бюро статистики транспорта, чтобы продемонстрировать, как выполнять операцию ETL. Эти данные для работы с руководством необходимо скачать.

  1. Скачайте файл On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2016_1.zip. Этот файл содержит данные о рейсах.

  2. Распакуйте содержимое ZIP-файла и запомните имя файла и путь к файлу. Они понадобятся для выполнения последующего шага.

Если вы хотите узнать о информации, записанной в данных о производительности отчета во время, вы можете просмотреть описания полей на веб-сайте Бюро статистики транспорта.

Прием данных

В этом разделе вы отправляете данные тестового пакета .csv в свою учетную запись Azure Data Lake Storage 2-го поколения, а затем подключите учетную запись хранения к кластеру Databricks. Наконец, вы используете Databricks для чтения данных тестов .csv и записи его обратно в хранилище в формате Apache Parquet.

Отправка данных о полете в учетную запись хранения

Используйте AzCopy, чтобы скопировать CSV-файл в учетную запись Azure Data Lake Storage 2-го поколения. Команда используется azcopy make для создания контейнера в учетной записи хранения. Затем вы используете azcopy copy команду, чтобы скопировать данные CSV , которые вы только что скачали в каталог в этом контейнере.

В следующих шагах необходимо ввести имена для создаваемого контейнера, а также каталог и большой двоичный объект, в который нужно отправить данные о полете в контейнер. Вы можете использовать предлагаемые имена на каждом шаге или указать собственные соглашения об именовании контейнеров, каталогов и больших двоичных объектов.

  1. Откройте окно командной строки и введите следующую команду, чтобы войти в Azure Active Directory, чтобы получить доступ к учетной записи хранения.

    azcopy login
    

    Для аутентификации учетной записи пользователя следуйте инструкциям, появляющимся в окне командной строки.

  2. Чтобы создать контейнер в учетной записи хранения для хранения данных о полете, введите следующую команду:

    azcopy make  "https://<storage-account-name>.dfs.core.windows.net/<container-name>" 
    
    • Замените значение заполнителя <storage-account-name> именем вашей учетной записи хранения.

    • <container-name> Замените заполнитель именем контейнера, который вы хотите создать для хранения csv-данных, например flight-data-container.

  3. Чтобы отправить (копировать) csv-данные в учетную запись хранения, введите следующую команду.

    azcopy copy "<csv-folder-path>" https://<storage-account-name>.dfs.core.windows.net/<container-name>/<directory-name>/On_Time.csv
    
    • Замените значение заполнителя <csv-folder-path> путем к CSV-файлу.

    • Замените значение заполнителя <storage-account-name> именем вашей учетной записи хранения.

    • Замените <container-name> заполнитель именем контейнера в учетной записи хранения.

    • Замените <directory-name> заполнитель именем каталога для хранения данных в контейнере, например jan2016.

Подключение учетной записи хранения к кластеру Databricks

В этом разделе описано, как подключить хранилище объектов облака Azure Data Lake Storage 2-го поколения к файловой системе Databricks (DBFS). Вы используете принцип службы Azure AD, созданный ранее для проверки подлинности с учетной записью хранения. Дополнительные сведения см. в статье Подключение облачного хранилища объектов в Azure Databricks.

  1. Подключите записную книжку к кластеру.

    1. В созданной ранее записной книжке нажмите кнопку Подключение в правом верхнем углу панели инструментов записной книжки. Эта кнопка открывает селектор вычислений. (Если вы уже подключили записную книжку к кластеру, имя этого кластера отображается в тексте кнопки, а не Подключение).

    2. В раскрывающемся меню кластера выберите созданный ранее кластер.

    3. Обратите внимание, что текст в селекторе кластера изменяется на начало. Дождитесь завершения запуска кластера и появления имени кластера в кнопке, прежде чем продолжить.

  2. Скопируйте и вставьте следующий блок кода в первую ячейку, но не запускайте этот код.

    configs = {"fs.azure.account.auth.type": "OAuth",
           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
           "fs.azure.account.oauth2.client.id": "<appId>",
           "fs.azure.account.oauth2.client.secret": "<clientSecret>",
           "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<tenantId>/oauth2/token",
           "fs.azure.createRemoteFileSystemDuringInitialization": "true"}
    
    dbutils.fs.mount(
    source = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<directory-name>",
    mount_point = "/mnt/flightdata",
    extra_configs = configs)
    
  3. В этом блоке кода:

    • <appId><clientSecret><tenantId> Замените configsзначения заполнителей идентификатором приложения, секретом клиента и идентификатором клиента, скопированным при создании субъекта-службы в предварительных требованиях.

    • source В URI замените <storage-account-name>значения заполнителей <container-name><directory-name> на имя учетной записи хранения Azure Data Lake Storage 2-го поколения и имя контейнера и каталога, указанного при отправке данных о полете в учетную запись хранения.

      Примечание.

      Идентификатор схемы в URI abfssсообщает Databricks использовать драйвер файловой системы BLOB-объектов Azure с протоколом TLS. Дополнительные сведения об универсальном коде ресурса (URI) см. в разделе "Использование URI Azure Data Lake Storage 2-го поколения".

  4. Прежде чем продолжить работу, убедитесь, что кластер завершит работу.

  5. Нажмите клавиши SHIFT + ВВОД, чтобы запустить код в этом блоке.

Контейнер и каталог, в которых вы отправили данные о полете в учетной записи хранения, теперь доступен в записной книжке через точку подключения / mnt/flightdata.

Использование Databricks Notebook для преобразования CSV-файла в файл Parquet

Теперь, когда данные тестового пакета CSV доступны через точку подключения DBFS, вы можете использовать кадр данных Apache Spark для загрузки в рабочую область и записи его обратно в формате Apache Parquet в хранилище объектов Azure Data Lake Storage 2-го поколения.

  • Кадр данных Spark — это двухмерная структура данных с столбцами потенциально разных типов. Кадр данных можно использовать для легкого чтения и записи данных в различных поддерживаемых форматах. С помощью кадра данных можно загружать данные из облачного хранилища объектов и выполнять анализ и преобразования в кластере вычислений, не влияя на базовые данные в облачном хранилище объектов. Дополнительные сведения см. в статье "Работа с кадрами данных PySpark в Azure Databricks".

  • Apache Parquet — это формат столбцов с оптимизацией, которая ускоряет запросы. Это более эффективный формат файла, чем CSV или JSON. Дополнительные сведения см. в разделе "Файлы Parquet".

В записной книжке добавьте новую ячейку и вставьте в нее следующий код.

# Use the previously established DBFS mount point to read the data.
# Create a DataFrame to read the csv data.
# The header option specifies that the first row of data should be used as the DataFrame column names
# The inferschema option specifies that the column data types should be inferred from the data in the file
flight_df = spark.read.format('csv').options(
    header='true', inferschema='true').load("/mnt/flightdata/*.csv")

# Read the airline csv file and write the output to parquet format for easy query.
flight_df.write.mode("append").parquet("/mnt/flightdata/parquet/flights")
print("Done")

Нажмите клавиши SHIFT + ВВОД, чтобы запустить код в этом блоке.

Прежде чем перейти к следующему разделу, убедитесь, что все данные parquet были записаны, и "Готово" отображается в выходных данных.

Изучение данных

В этом разделе вы используете служебную программу файловой системы Databricks для изучения хранилища объектов Azure Data Lake Storage 2-го поколения с помощью точки подключения DBFS, созданной в предыдущем разделе.

В новой ячейке вставьте следующий код, чтобы получить список файлов в точке подключения. Первая команда выводит список файлов и каталогов. Вторая команда отображает выходные данные в табличном формате для упрощения чтения.

dbutils.fs.ls("/mnt/flightdata")
display(dbutils.fs.ls("/mnt/flightdata"))

Нажмите клавиши SHIFT + ВВОД, чтобы запустить код в этом блоке.

Обратите внимание, что каталог Parquet отображается в списке. Вы сохранили данные тестового файла CSV в формате parquet в каталог parquet/flight в предыдущем разделе. Чтобы получить список файлов в каталоге parquet/flight, вставьте следующий код в новую ячейку и запустите его:

display(dbutils.fs.ls("/mnt/flightdata/parquet/flights"))

Чтобы создать файл и перечислить его, вставьте следующий код в новую ячейку и запустите его:

dbutils.fs.put("/mnt/flightdata/mydirectory/mysubdirectory/1.txt", "Hello, World!", True)
display(dbutils.fs.ls("/mnt/flightdata/mydirectory/mysubdirectory"))

Так как файл 1.txt в этом руководстве не нужен, вы можете вставить следующий код в ячейку и запустить его для рекурсивного удаления mydirectory. Параметр True указывает рекурсивное удаление.

dbutils.fs.rm("/mnt/flightdata/mydirectory", True)

В качестве удобства можно использовать команду справки для получения подробных сведений о других командах.

dbutils.fs.help("rm")

В этих примерах кода вы изучили иерархический характер HDFS с помощью данных, хранящихся в учетной записи хранения с включенным Azure Data Lake Storage 2-го поколения.

Запрос данных

Теперь вы можете запрашивать данные, отправленные в учетную запись хранения. Введите каждый из следующих блоков кода в новую ячейку и нажмите клавиши SHIFT+ ВВОД , чтобы запустить скрипт Python.

Кадры данных предоставляют широкий набор функций (выбор столбцов, фильтров, соединений, агрегатов), которые позволяют эффективно решать распространенные проблемы анализа данных.

Чтобы загрузить кадр данных из ранее сохраненных данных тестов parquet и изучить некоторые поддерживаемые функции, введите этот сценарий в новую ячейку и запустите ее.

# Read the existing parquet file for the flights database that was created earlier
flight_df = spark.read.parquet("/mnt/flightdata/parquet/flights")

# Print the schema of the dataframe
flight_df.printSchema()

# Print the flight database size
print("Number of flights in the database: ", flight_df.count())

# Show the first 25 rows (20 is the default)
# To show the first n rows, run: df.show(n)
# The second parameter indicates that column lengths shouldn't be truncated (default is 20 characters)
flight_df.show(25, False)

# You can also use the DataFrame to run simple queries. Results are returned in a DataFrame.
# Show the first 25 rows of the results of a query that returns selected colums for all flights originating from airports in Texas
flight_df.select("FlightDate", "Reporting_Airline", "Flight_Number_Reporting_Airline", "OriginCityName", "DepTime", "DestCityName", "ArrTime", "ArrDelay").filter("OriginState = 'TX'").show(258, False)

# Use display to run visualizations
# Preferably run this in a separate cmd cell
display(flight_df)

Введите этот скрипт в новую ячейку, чтобы выполнить некоторые базовые запросы анализа данных. Можно выбрать запуск всего скрипта (SHIFT+ВВОД), выделить каждый запрос и запустить его отдельно с помощью CTRL+SHIFT+ВВОД или ввести каждый запрос в отдельную ячейку и запустить его там.

# create a temporary sql view for querying flight information
flight_data = spark.read.parquet('/mnt/flightdata/parquet/flights')
flight_data.createOrReplaceTempView('FlightTable')

# Print the total number of flights in Jan 2016 (the number of rows in the flight data).
print("Number of flights in Jan 2016: ", flight_data.count())

# Using spark sql, query the parquet file to return the total flights of each airline
num_flights_by_airline=spark.sql("SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline ORDER BY NumFlights DESC")
num_flights_by_airline.show()

# List out all the airports in Texas
airports_in_texas = spark.sql(
    "SELECT DISTINCT(OriginCityName) FROM FlightTable WHERE OriginStateName = 'Texas'")
print('Airports in Texas: ', airports_in_texas.count())
airports_in_texas.show(100, False)

# Find all airlines that fly from Texas
airlines_flying_from_texas = spark.sql(
    "SELECT DISTINCT(Reporting_Airline) FROM FlightTable WHERE OriginStateName='Texas'")
print('Airlines that fly to/from Texas: ', airlines_flying_from_texas.count())
airlines_flying_from_texas.show(100, False)

# List airlines by average arrival delay (negative values indicate early flights)
avg_arrival_delay=spark.sql(
    "SELECT Reporting_Airline, count(*) AS NumFlights, avg(DepDelay) AS AverageDepDelay, avg(ArrDelay) AS AverageArrDelay FROM FlightTable GROUP BY Reporting_Airline ORDER BY AverageArrDelay DESC")
print("Airlines by average arrival delay")
avg_arrival_delay.show()

# List airlines by the highest percentage of delayed flights. A delayed flight is one with a  departure or arrival delay that is greater than 15 minutes
spark.sql("DROP VIEW IF EXISTS totalFlights")
spark.sql("DROP VIEW IF EXISTS delayedFlights")
spark.sql(
    "CREATE TEMPORARY VIEW totalFlights AS SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline")
spark.sql(
    "CREATE TEMPORARY VIEW delayedFlights AS SELECT Reporting_Airline, count(*) AS NumDelayedFlights FROM FlightTable WHERE DepDelay>15 or ArrDelay>15 GROUP BY Reporting_Airline")
percent_delayed_flights=spark.sql(
    "SELECT totalFlights.Reporting_Airline, totalFlights.NumFlights, delayedFlights.NumDelayedFlights, delayedFlights.NumDelayedFlights/totalFlights.NumFlights*100 AS PercentFlightsDelayed FROM totalFlights INNER JOIN delayedFlights ON totalFlights.Reporting_Airline = delayedFlights.Reporting_Airline ORDER BY PercentFlightsDelayed DESC")
print("Airlines by percentage of flights delayed")
percent_delayed_flights.show()

Итоги

Изучив это руководство, вы:

  • Создали ресурсы Azure, включая учетную запись хранения Azure Data Lake Storage 2-го поколения и субъект-службу Azure AD и назначенные разрешения для доступа к учетной записи хранения.

  • Создана рабочая область Azure Databricks, записная книжка и вычислительный кластер.

  • Используется AzCopy для отправки неструктурированных данных тестового csv-файла в учетную запись хранения Azure Data Lake Storage 2-го поколения.

  • Используется служебная программа файловой системы Databricks для подключения учетной записи хранения Azure Data Lake Storage 2-го поколения и изучения иерархической файловой системы.

  • Используется Apache Spark DataFrames для преобразования данных тестового файла .csv в формат Apache Parquet и сохранения его обратно в учетную запись хранения Azure Data Lake Storage 2-го поколения.

  • Используемые кадры данных для изучения данных тестов и выполнения простого запроса.

  • Используется Apache Spark SQL для запроса данных о рейсах для общего количества рейсов для каждой авиакомпании в январе 2016 года, аэропортов Техаса, авиакомпаний, которые летают из Техаса, средняя задержка прибытия в минутах для каждой авиакомпании на национальном уровне, а также процент рейсов каждой авиакомпании, которые задерживают вылеты или прибытия.

Очистка ресурсов

Если вы хотите сохранить записную книжку и вернуться к ней позже, рекомендуется завершить работу (завершить) кластер, чтобы избежать расходов. Чтобы завершить работу кластера, выберите его в селекторе вычислений, расположенном в правом верхнем углу панели инструментов записной книжки, выберите " Завершить " в меню и подтвердите выбор. (По умолчанию кластер будет автоматически завершаться через 120 минут бездействия.)

Если вы хотите удалить отдельные ресурсы рабочей области, такие как записные книжки и кластеры, можно сделать это с левой боковой панели рабочей области. Подробные инструкции см. в разделе "Удаление кластера " или "Удаление записной книжки".

Удалите группу ресурсов и все связанные с ней ресурсы, когда надобность в них отпадет. Чтобы сделать это в портал Azure, выберите группу ресурсов для учетной записи хранения и рабочей области и нажмите кнопку "Удалить".

Следующие шаги