Руководство. Извлечение, преобразование и загрузка данных с помощью Azure Databricks

В этом руководстве рассматривается выполнение операций извлечения, преобразования и загрузки данных с помощью Azure Databricks. Мы извлечем данные из Azure Data Lake Storage 2-го поколения в Azure Databricks, выполним преобразование данных в Azure Databricks, а затем загрузим преобразованные данные в Azure Synapse Analytics.

Для действий, описанных в этом руководстве, используется соединитель Azure Synapse для Azure Databricks, позволяющий передавать данные в Azure Databricks. Этот соединитель, в свою очередь, использует хранилище BLOB-объектов Azure как временное хранилище для данных, передаваемых между кластером Azure Databricks и Azure Synapse.

На следующем рисунке показан поток в приложении.

Azure Databricks with Data Lake Store and Azure Synapse

В рамках этого руководства рассматриваются следующие задачи:

  • Создайте службу Azure Databricks.
  • создание кластера Spark в Azure Databricks;
  • Создание файловой системы в учетной записи Data Lake Storage 2-го поколения.
  • Отправка примера данных в учетную запись Azure Data Lake Storage 2-го поколения.
  • Создание субъекта-службы.
  • Извлечение данных из учетной записи Azure Data Lake Storage 2-го поколения.
  • преобразование данных в Azure Databricks;
  • Загрузка данных в Azure Synapse.

Если у вас нет подписки Azure, создайте бесплатную учетную запись, прежде чем приступить к работе.

Примечание.

Инструкции из этого руководство нельзя выполнять с бесплатной пробной версией подписки. Если у вас есть бесплатная учетная запись, перейдите к профилю и измените подписку на подписку с оплатой по мере использования. Дополнительные сведения см. на странице создания бесплатной учетной записи Azure. Затем удалите предельную сумму расходов и запросите увеличение квоты для виртуальных ЦП в своем регионе. При создании рабочей области Azure Databricks можно выбрать ценовую категорию Пробная версия ("Премиум" — 14 дней бесплатно (DBU)) для предоставления рабочей области доступа к бесплатным DBU Azure Databricks уровня "Премиум" на 14 дней.

Необходимые компоненты

Прежде чем начать работу с этим руководством, выполните следующие задачи:

Сбор необходимых сведений

Обязательно выполните предварительные требования данного руководства.

Прежде чем начать, вам понадобится собрать такую информацию:

✔️ имя базы данных, имя сервера базы данных, имя пользователя и пароль Azure Synapse;

✔️ ключ доступа к учетной записи хранения больших двоичных объектов;

✔️ имя учетной записи хранения Data Lake Storage 2-го поколения;

✔️ идентификатор арендатора для подписки;

✔️ Идентификатор приложения, зарегистрированного в идентификаторе Microsoft Entra (ранее — Azure Active Directory).

✔️ Ключ проверки подлинности для приложения, зарегистрированного в идентификаторе Microsoft Entra (ранее — Azure Active Directory).

Создание службы Azure Databricks.

В этом разделе вы создадите службу Azure Databricks с помощью портала Azure.

  1. В меню портала Azure выберите Создать ресурс.

    Create a resource on Azure portal

    Затем выберите Аналитика>Azure Databricks.

    Create Azure Databricks on Azure portal

  2. В разделе Служба Azure Databricks укажите следующие значения, чтобы создать службу Databricks.

    Свойство Description
    Имя рабочей области Укажите имя рабочей области Databricks.
    Подписка Выберите подписку Azure в раскрывающемся списке.
    Группа ресурсов Укажите, следует ли создать новую группу ресурсов или использовать имеющуюся. Группа ресурсов — это контейнер, содержащий связанные ресурсы для решения Azure. Дополнительные сведения см. в обзоре группы ресурсов Azure.
    Местонахождение Выберите Западная часть США 2. Другие доступные регионы см. в статье о доступности служб Azure по регионам.
    Ценовая категория Выберите Стандартное.
  3. Создание учетной записи занимает несколько минут. Чтобы отслеживать состояние операции, просмотрите индикатор выполнения вверху.

  4. Выберите Закрепить на панели мониторинга и нажмите кнопку Создать.

Создание кластера Spark в Azure Databricks.

  1. На портале Azure перейдите к созданной службе Databricks, а затем выберите Launch Workspace (Запустить рабочую область).

  2. Вы будете перенаправлены на портал Azure Databricks. На портале выберите Кластер.

    Databricks on Azure

  3. На странице создания кластера укажите значения для создания кластера.

    Create Databricks Spark cluster on Azure

  4. Заполните значения в следующих полях, и примите значения по умолчанию для других полей.

    • Введите имя кластера.

    • Убедитесь, что установлен флажок Terminate after __  minutes of inactivity (Завершить через __ мин бездействия). Укажите длительность (в минутах) для завершения работы кластера, если тот не используется.

    • Выберите Create cluster (Создать кластер). Когда кластер будет выполняться, можно присоединить к нему записные книжки и запустить на нем задания Spark.

Создание файловой системы в учетной записи Azure Data Lake Storage 2-го поколения

В этом разделе показано создание записной книжки в рабочей области Azure Databricks и выполнение фрагментов кода для настройки учетной записи хранения.

  1. На портале Azure перейдите к созданной службе Azure Databricks, а затем выберите Launch Workspace (Запустить рабочую область).

  2. В левой области выберите Рабочая область. В раскрывающемся списке Рабочая область выберите Создать>Notebook (Записная книжка).

    Create a notebook in Databricks

  3. В диалоговом окне создания записной книжки введите имя записной книжки. Выберите Scala в качестве языка, а затем выберите созданный ранее кластер Spark.

    Provide details for a notebook in Databricks

  4. Нажмите кнопку создания.

  5. Следующий блок кода устанавливает учетные данные субъекта-службы по умолчанию для любой учетной записи ADLS 2-го поколения, доступ к которой осуществляется в сеансе Spark. Второй блок кода добавляет имя учетной записи к параметру, чтобы указать учетные данные для конкретной учетной записи ADLS 2-го поколения. Скопируйте и вставьте любой блок кода в первую ячейку записной книжки Azure Databricks.

    Конфигурация сеанса

    val appID = "<appID>"
    val secret = "<secret>"
    val tenantID = "<tenant-id>"
    
    spark.conf.set("fs.azure.account.auth.type", "OAuth")
    spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
    spark.conf.set("fs.azure.account.oauth2.client.id", "<appID>")
    spark.conf.set("fs.azure.account.oauth2.client.secret", "<secret>")
    spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<tenant-id>/oauth2/token")
    spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")
    

    Конфигурация учетной записи

    val storageAccountName = "<storage-account-name>"
    val appID = "<app-id>"
    val secret = "<secret>"
    val fileSystemName = "<file-system-name>"
    val tenantID = "<tenant-id>"
    
    spark.conf.set("fs.azure.account.auth.type." + storageAccountName + ".dfs.core.windows.net", "OAuth")
    spark.conf.set("fs.azure.account.oauth.provider.type." + storageAccountName + ".dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
    spark.conf.set("fs.azure.account.oauth2.client.id." + storageAccountName + ".dfs.core.windows.net", "" + appID + "")
    spark.conf.set("fs.azure.account.oauth2.client.secret." + storageAccountName + ".dfs.core.windows.net", "" + secret + "")
    spark.conf.set("fs.azure.account.oauth2.client.endpoint." + storageAccountName + ".dfs.core.windows.net", "https://login.microsoftonline.com/" + tenantID + "/oauth2/token")
    spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")
    dbutils.fs.ls("abfss://" + fileSystemName  + "@" + storageAccountName + ".dfs.core.windows.net/")
    spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "false")
    
  6. В этом блоке кода замените значения заполнителя <app-id>, <secret>, <tenant-id> и <storage-account-name> значениями, полученными в ходе выполнения предварительных условий этого руководства. Значение заполнителя <file-system-name> замените на имя файловой системы.

    • <app-id> и <secret> заменяются значениями из приложения, которое вы зарегистрировали в Azure AD при создании субъекта-службы.

    • Значение <tenant-id> берется из подписки.

    • <storage-account-name> — это имя учетной записи хранения Azure Data Lake Storage 2-го поколения.

  7. Нажмите клавиши SHIFT + ВВОД, чтобы запустить код в этом блоке.

Прием примера данных в учетную запись Azure Data Lake Storage 2-го поколения

Прежде чем приступить к работе с этим разделом, выполните следующие предварительные требования.

Введите следующий код в ячейку записной книжки:

%sh wget -P /tmp https://raw.githubusercontent.com/Azure/usql/master/Examples/Samples/Data/json/radiowebsite/small_radio_json.json

В ячейке нажмите клавиши SHIFT+ВВОД, чтобы выполнить код.

Теперь в новую ячейку под этой введите следующий код и замените значения, которые отображаются в скобках, использованными ранее.

dbutils.fs.cp("file:///tmp/small_radio_json.json", "abfss://" + fileSystemName + "@" + storageAccountName + ".dfs.core.windows.net/")

В ячейке нажмите клавиши SHIFT+ВВОД, чтобы выполнить код.

Извлечение данных из учетной записи Azure Data Lake Storage 2-го поколения

  1. Теперь можно загрузить пример JSON-файла в виде кадра данных в Azure Databricks. Вставьте следующий код в новую ячейку. Замените значения заменителей, показанные в скобках, собственными.

    val df = spark.read.json("abfss://" + fileSystemName + "@" + storageAccountName + ".dfs.core.windows.net/small_radio_json.json")
    
  2. Нажмите клавиши SHIFT + ВВОД, чтобы запустить код в этом блоке.

  3. Чтобы просмотреть содержимое кадра данных, выполните следующий код:

    df.show()
    

    Должен отобразиться результат, аналогичный приведенному ниже фрагменту кода.

    +---------------------+---------+---------+------+-------------+----------+---------+-------+--------------------+------+--------+-------------+---------+--------------------+------+-------------+------+
    |               artist|     auth|firstName|gender|itemInSession|  lastName|   length|  level|            location|method|    page| registration|sessionId|                song|status|           ts|userId|
    +---------------------+---------+---------+------+-------------+----------+---------+-------+--------------------+------+--------+-------------+---------+--------------------+------+-------------+------+
    | El Arrebato         |Logged In| Annalyse|     F|            2|Montgomery|234.57914| free  |  Killeen-Temple, TX|   PUT|NextSong|1384448062332|     1879|Quiero Quererte Q...|   200|1409318650332|   309|
    | Creedence Clearwa...|Logged In|   Dylann|     M|            9|    Thomas|340.87138| paid  |       Anchorage, AK|   PUT|NextSong|1400723739332|       10|        Born To Move|   200|1409318653332|    11|
    | Gorillaz            |Logged In|     Liam|     M|           11|     Watts|246.17751| paid  |New York-Newark-J...|   PUT|NextSong|1406279422332|     2047|                DARE|   200|1409318685332|   201|
    ...
    ...
    

    В результате были извлечены данные из Azure Data Lake Storage Gen2 в Azure Databricks.

Преобразование данных в Azure Databricks.

Необработанный файл с примером данных small_radio_json.json содержит сведения о слушателях радиостанции и имеет множество столбцов. В этом разделе мы преобразуем данные, чтобы извлечь только определенные столбцы из набора данных.

  1. Сначала извлеките только столбцы firstName, lastName, gender, location и level из созданного кадра данных.

    val specificColumnsDf = df.select("firstname", "lastname", "gender", "location", "level")
    specificColumnsDf.show()
    

    Выходные данные должны выглядеть так, как показано в следующем фрагменте кода:

    +---------+----------+------+--------------------+-----+
    |firstname|  lastname|gender|            location|level|
    +---------+----------+------+--------------------+-----+
    | Annalyse|Montgomery|     F|  Killeen-Temple, TX| free|
    |   Dylann|    Thomas|     M|       Anchorage, AK| paid|
    |     Liam|     Watts|     M|New York-Newark-J...| paid|
    |     Tess|  Townsend|     F|Nashville-Davidso...| free|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...| free|
    |     Alan|     Morse|     M|Chicago-Napervill...| paid|
    |Gabriella|   Shelton|     F|San Jose-Sunnyval...| free|
    |   Elijah|  Williams|     M|Detroit-Warren-De...| paid|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...| free|
    |     Tess|  Townsend|     F|Nashville-Davidso...| free|
    |     Alan|     Morse|     M|Chicago-Napervill...| paid|
    |     Liam|     Watts|     M|New York-Newark-J...| paid|
    |     Liam|     Watts|     M|New York-Newark-J...| paid|
    |   Dylann|    Thomas|     M|       Anchorage, AK| paid|
    |     Alan|     Morse|     M|Chicago-Napervill...| paid|
    |   Elijah|  Williams|     M|Detroit-Warren-De...| paid|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...| free|
    |     Alan|     Morse|     M|Chicago-Napervill...| paid|
    |   Dylann|    Thomas|     M|       Anchorage, AK| paid|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...| free|
    +---------+----------+------+--------------------+-----+
    
  2. Эти данные можно еще преобразовывать, переименовав столбец level на subscription_type.

    val renamedColumnsDF = specificColumnsDf.withColumnRenamed("level", "subscription_type")
    renamedColumnsDF.show()
    

    Выходные данные должны выглядеть так, как показано в следующем фрагменте кода.

    +---------+----------+------+--------------------+-----------------+
    |firstname|  lastname|gender|            location|subscription_type|
    +---------+----------+------+--------------------+-----------------+
    | Annalyse|Montgomery|     F|  Killeen-Temple, TX|             free|
    |   Dylann|    Thomas|     M|       Anchorage, AK|             paid|
    |     Liam|     Watts|     M|New York-Newark-J...|             paid|
    |     Tess|  Townsend|     F|Nashville-Davidso...|             free|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...|             free|
    |     Alan|     Morse|     M|Chicago-Napervill...|             paid|
    |Gabriella|   Shelton|     F|San Jose-Sunnyval...|             free|
    |   Elijah|  Williams|     M|Detroit-Warren-De...|             paid|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...|             free|
    |     Tess|  Townsend|     F|Nashville-Davidso...|             free|
    |     Alan|     Morse|     M|Chicago-Napervill...|             paid|
    |     Liam|     Watts|     M|New York-Newark-J...|             paid|
    |     Liam|     Watts|     M|New York-Newark-J...|             paid|
    |   Dylann|    Thomas|     M|       Anchorage, AK|             paid|
    |     Alan|     Morse|     M|Chicago-Napervill...|             paid|
    |   Elijah|  Williams|     M|Detroit-Warren-De...|             paid|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...|             free|
    |     Alan|     Morse|     M|Chicago-Napervill...|             paid|
    |   Dylann|    Thomas|     M|       Anchorage, AK|             paid|
    |  Margaux|     Smith|     F|Atlanta-Sandy Spr...|             free|
    +---------+----------+------+--------------------+-----------------+
    

Загрузка данных в Azure Synapse

В этом разделе преобразованные данные отправляются в Azure Synapse. С помощью соединителя Azure Synapse для Azure Databricks кадр данных можно напрямую отправить в виде таблицы в пул Spark Synapse.

Как упоминалось ранее, соединитель Azure Synapse использует хранилище BLOB-объектов Azure в качестве временного хранилища для передачи данных между Azure Databricks и Azure Synapse. Таким образом сначала нужно предоставить конфигурацию для подключения к учетной записи хранения. Вы уже должны были создать учетную запись, выполняя предварительные требования для этой статьи.

  1. Предоставьте конфигурацию для получения доступа к учетной записи хранения Azure из Azure Databricks.

    val blobStorage = "<blob-storage-account-name>.blob.core.windows.net"
    val blobContainer = "<blob-container-name>"
    val blobAccessKey =  "<access-key>"
    
  2. Укажите временную папку, которая будет использоваться при перемещении данных между Azure Databricks и Azure Synapse.

    val tempDir = "wasbs://" + blobContainer + "@" + blobStorage +"/tempDirs"
    
  3. Запустите следующий фрагмент кода, чтобы сохранить ключ доступа к хранилищу BLOB-объектов Azure в конфигурации. Благодаря этому действию вам не придется хранить ключ доступа в записной книжке в виде обычного текста.

    val acntInfo = "fs.azure.account.key."+ blobStorage
    sc.hadoopConfiguration.set(acntInfo, blobAccessKey)
    
  4. Укажите значения для подключения к экземпляру Azure Synapse. В качестве необходимого компонента следует создать службу Azure Synapse Analytics. Используйте полное имя сервера для dwServer. Например, <servername>.database.windows.net.

    //Azure Synapse related settings
    val dwDatabase = "<database-name>"
    val dwServer = "<database-server-name>"
    val dwUser = "<user-name>"
    val dwPass = "<password>"
    val dwJdbcPort =  "1433"
    val dwJdbcExtraOptions = "encrypt=true;trustServerCertificate=true;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
    val sqlDwUrl = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass + ";$dwJdbcExtraOptions"
    val sqlDwUrlSmall = "jdbc:sqlserver://" + dwServer + ":" + dwJdbcPort + ";database=" + dwDatabase + ";user=" + dwUser+";password=" + dwPass
    
  5. Выполните следующий фрагмент кода, чтобы загрузить преобразованный кадр данных renamedColumnsDF в качестве таблицы в Azure Synapse. Этот фрагмент кода создает таблицу с именем SampleTable в базе данных SQL.

    spark.conf.set(
        "spark.sql.parquet.writeLegacyFormat",
        "true")
    
    renamedColumnsDF.write.format("com.databricks.spark.sqldw").option("url", sqlDwUrlSmall).option("dbtable", "SampleTable")       .option( "forward_spark_azure_storage_credentials","True").option("tempdir", tempDir).mode("overwrite").save()
    

    Примечание.

    В этом примере используется флаг forward_spark_azure_storage_credentials, который вызывает Azure Synapse для доступа к данным из хранилища BLOB-объектов с помощью ключа доступа. Это единственный поддерживаемый способ проверки подлинности.

    Если для хранилища BLOB-объектов Azure нельзя выбирать виртуальные сети, Azure Synapse запросит Управляемое удостоверение службы, а не ключи доступа. В таком случае вы получите ошибку с сообщением о том, что у вызывающей стороны нет прав на выполнение этой операции.

  6. Подключитесь к базе данных SQL и убедитесь, что вы видите базу данных SampleTable.

    Verify the sample table

  7. Выполните запрос SELECT, чтобы проверить содержимое таблицы. В таблице должны быть те же данные, что и в кадре данных renamedColumnsDF.

    Verify the sample table content

Очистка ресурсов

После выполнения заданий из этого руководства вы можете завершить работу кластера. В рабочей области Azure Databricks в области слева выберите Кластеры. Для завершения работы кластера в разделе Действия наведите указатель мыши на многоточие (...) и выберите значок Завершить.

Stop a Databricks cluster

Если не завершить работу кластера вручную, она завершится автоматически, если во время создания кластера вы установили флажок Terminate after __ minutes of inactivity (Завершить через __ мин бездействия). В этом случае работа кластера автоматически завершается, если он был неактивным в течение определенного времени.

Следующие шаги

Из этого руководства вы узнали, как:

  • Создание службы Azure Databricks.
  • Создание кластера Spark в Azure Databricks.
  • Создание записной книжки в Azure Databricks.
  • Извлечение данных из учетной записи Data Lake Storage 2-го поколения.
  • Преобразование данных в Azure Databricks.
  • Загрузка данных в Azure Synapse