Руководство по Реализация шаблона сохранения озера данных для обновления таблицы Databricks Delta

В этом руководстве описана обработка событий в учетной записи хранения с иерархическим пространством имен.

Вы создадите небольшое решение, которое позволяет пользователю заполнить таблицу Databricks Delta, загружая файл разделенных запятыми значений (CSV) с описанием заказа на продажу. Чтобы создать это решение, вы объедините подписку на Сетку событий, функцию Azure и задание в Azure Databricks.

Выполняя данное руководство, вы сделаете следующее:

  • создавать событие в подписке на Сетку событий, которое вызывает функцию Azure;
  • создавать функцию Azure, которая получает от события уведомление и запускает задание в Azure Databricks;
  • создавать задание Databricks, в рамках которого заказ клиента вставляется в таблицу Databricks Delta в учетной записи хранения.

Мы будем создавать решение в обратном порядке, то есть начнем с рабочей области Azure Databricks.

Предварительные требования

Создание заказа на продажу

Сначала создайте CSV-файл с описанием заказа на продажу и отправьте его в учетную запись хранения. Позже вы примените данные из этого файла для заполнения первой строки в таблице Databricks Delta.

  1. Войдите в новую учетную запись хранения на портале Azure.

  2. Выберите Обозреватель хранилища-Контейнеры>BLOB-объектов-Добавить>контейнер и создайте контейнер с именем data.

    Снимок экрана: создание папки в браузере хранилища.

  3. В контейнере данных создайте каталог с именем input.

  4. Вставьте приведенный ниже текст в текстовый редактор.

    InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
    536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/2010 8:26,2.55,17850,United Kingdom
    
  5. Сохраните этот файл на локальном компьютере и присвойте ему имя data.csv.

  6. В браузере хранилища отправьте этот файл во входную папку.

Создание задания в Azure Databricks

В этом разделе описано выполнение таких задач:

  • создание рабочей области Azure Databricks;
  • Создайте записную книжку.
  • создание и заполнение таблицы Databricks Delta;
  • добавление кода для вставки строк в таблицу Databricks Delta;
  • создание задания.

Создание рабочей области Azure Databricks

В этом разделе вы создадите рабочую область Azure Databricks с помощью портала Azure.

  1. создание рабочей области Azure Databricks; Назовите рабочую область contoso-orders. См. статью Создание рабочей области Azure Databricks.

  2. Создание кластера. Присвойте кластеру customer-order-clusterимя . См. Создание кластера.

  3. Создайте записную книжку. Присвойте записной книжке configure-customer-table имя и выберите Python в качестве языка по умолчанию для записной книжки. См. статью Создание записной книжки.

Создание и заполнение таблицы Databricks Delta

  1. Скопируйте приведенный ниже блок кода и вставьте его в первую ячейку в новой записной книжке, но пока не выполняйте этот код.

    В этом блоке кода замените значения заполнителей appId, password и tenant значениями, которые вы собрали при подготовке предварительных условий для этого руководства.

    dbutils.widgets.text('source_file', "", "Source File")
    
    spark.conf.set("fs.azure.account.auth.type", "OAuth")
    spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
    spark.conf.set("fs.azure.account.oauth2.client.id", "<appId>")
    spark.conf.set("fs.azure.account.oauth2.client.secret", "<password>")
    spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<tenant>/oauth2/token")
    
    adlsPath = 'abfss://data@contosoorders.dfs.core.windows.net/'
    inputPath = adlsPath + dbutils.widgets.get('source_file')
    customerTablePath = adlsPath + 'delta-tables/customers'
    

    Этот код позволяет создать мини-приложение с именем source_file. Позже вы создадите функцию Azure, которая вызывает этот код и передает мини-приложению путь к файлу. Этот код также позволяет выполнить аутентификацию субъекта-службы в учетной записи хранения, а затем создать переменные для использования в других ячейках.

    Примечание

    При настройке рабочей среды рассмотрите возможность сохранения ключа проверки подлинности в Azure Databricks. Затем в блоке кода замените ключ проверки подлинности ключом поиска.

    Например, вместо использования строки кода spark.conf.set("fs.azure.account.oauth2.client.secret", "<password>") следует использовать строку spark.conf.set("fs.azure.account.oauth2.client.secret", dbutils.secrets.get(scope = "<scope-name>", key = "<key-name-for-service-credential>")).

    Выполнив инструкции из этого руководства, ознакомьтесь с примерами такого подхода в статье о Data Lake Storage 2-го поколения на веб-сайте Azure Databricks.

  2. Нажмите клавиши SHIFT + ВВОД, чтобы запустить код в этом блоке.

  3. Скопируйте приведенный ниже блок кода и вставьте его в другую ячейку записной книжки. Затем нажмите сочетание клавиш SHIFT+ВВОД, чтобы выполнить этот блок кода.

    from pyspark.sql.types import StructType, StructField, DoubleType, IntegerType, StringType
    
    inputSchema = StructType([
    StructField("InvoiceNo", IntegerType(), True),
    StructField("StockCode", StringType(), True),
    StructField("Description", StringType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("InvoiceDate", StringType(), True),
    StructField("UnitPrice", DoubleType(), True),
    StructField("CustomerID", IntegerType(), True),
    StructField("Country", StringType(), True)
    ])
    
    rawDataDF = (spark.read
     .option("header", "true")
     .schema(inputSchema)
     .csv(adlsPath + 'input')
    )
    
    (rawDataDF.write
      .mode("overwrite")
      .format("delta")
      .saveAsTable("customer_data", path=customerTablePath))
    

    Этот код позволяет создать в учетной записи хранения таблицу Databricks Delta, а затем загрузить в нее начальные данные из ранее отправленного CSV-файла.

  4. После успешного выполнения этого блока кода удалите его из записной книжки.

Добавление кода для вставки строк в таблицу Databricks Delta

  1. Скопируйте приведенный ниже блок кода и вставьте его в другую ячейку, но пока не запускайте ее.

    upsertDataDF = (spark
      .read
      .option("header", "true")
      .csv(inputPath)
    )
    upsertDataDF.createOrReplaceTempView("customer_data_to_upsert")
    

    Этот код позволяет вставлять во временное табличное представление данные из CSV-файла. Путь к этому CSV-файлу получен из входных данных мини-приложения, которое вы создали на предыдущем шаге.

  2. Скопируйте и вставьте следующий блок кода в другую ячейку. Этот код объединяет содержимое представления временной таблицы с таблицей Databricks Delta.

    %sql
    MERGE INTO customer_data cd
    USING customer_data_to_upsert cu
    ON cd.CustomerID = cu.CustomerID
    WHEN MATCHED THEN
      UPDATE SET
        cd.StockCode = cu.StockCode,
        cd.Description = cu.Description,
        cd.InvoiceNo = cu.InvoiceNo,
        cd.Quantity = cu.Quantity,
        cd.InvoiceDate = cu.InvoiceDate,
        cd.UnitPrice = cu.UnitPrice,
        cd.Country = cu.Country
    WHEN NOT MATCHED
      THEN INSERT (InvoiceNo, StockCode, Description, Quantity, InvoiceDate, UnitPrice, CustomerID, Country)
      VALUES (
        cu.InvoiceNo,
        cu.StockCode,
        cu.Description,
        cu.Quantity,
        cu.InvoiceDate,
        cu.UnitPrice,
        cu.CustomerID,
        cu.Country)
    

Создание задания

Создайте задание для запуска созданной ранее записной книжки. Позже вы создадите функцию Azure, которая запускает это задание при возникновении события.

  1. Выберите Создатьзадание>.

  2. Присвойте заданию имя, выберите созданную записную книжку и кластер. Затем выберите Создать , чтобы создать задание.

Создание функции Azure

Создайте функцию Azure, которая запускает это задание.

  1. В рабочей области Azure Databricks щелкните имя пользователя Azure Databricks на верхней панели, а затем в раскрывающемся списке выберите Параметры пользователя.

  2. На вкладке Маркеры доступа выберите Создать новый маркер.

  3. Скопируйте отображаемый маркер и нажмите кнопку Готово.

  4. В верхнем углу рабочей области Databricks Delta щелкните значок "Люди", а затем щелкните Параметры пользователя.

    Управление параметрами пользователя учетной записи

  5. Нажмите кнопку Создать новый маркер , а затем нажмите кнопку Создать .

    Не забудьте скопировать значение токена в надежное расположение. Этот токен потребуется функции Azure для аутентификации в Databricks, чтобы запустить задание.

  6. На домашней странице или в меню портала Azure выберите Создать ресурс.

  7. На странице Создать щелкните Вычислительные ресурсы>Приложение-функция.

  8. На вкладке Основные сведения на странице Создание приложения-функции выберите группу ресурсов, а затем измените или проверьте следующие параметры:

    Параметр Значение
    Имя приложения-функции contosoorder
    Стек среды выполнения .NET
    Публикация Код
    Операционная система Windows
    Тип плана Потребление (бессерверный)
  9. Выберите Просмотр и создание и затем Создать.

    После завершения развертывания выберите Перейти к ресурсу , чтобы открыть страницу обзора приложения-функции.

  10. В группе Параметры выберите Конфигурация.

  11. На странице Параметры приложения нажмите кнопку Новый параметр приложения поочередно для каждого параметра.

    Добавление параметра конфигурации

    Добавьте следующие параметры:

    Имя параметра Значение
    DBX_INSTANCE Регион для рабочей области Databricks. Например: westus2.azuredatabricks.net
    DBX_PAT Личный маркер доступа, который вы создали ранее.
    DBX_JOB_ID Уникальный идентификатор выполняемого задания.
  12. Выберите Сохранить , чтобы зафиксировать эти параметры.

  13. В группе Функции выберите Функции, а затем — Создать.

  14. Щелкните Azure Event Grid Trigger (Триггер Сетки событий Azure).

    Установите расширение Microsoft.Azure.WebJobs.Extensions.EventGrid, если появится такое предложение. Если потребуется установить расширение, повторно щелкните Azure Event Grid Trigger (Триггер Сетки событий Azure), чтобы создать функцию.

    Появится область Новая функция.

  15. В области Новая функция присвойте функции имя UpsertOrder и нажмите кнопку Создать .

  16. Замените содержимое файла кода этим кодом и нажмите кнопку Сохранить :

      #r "Azure.Messaging.EventGrid"
      #r "System.Memory.Data"
      #r "Newtonsoft.Json"
      #r "System.Text.Json"
      using Azure.Messaging.EventGrid;
      using Azure.Messaging.EventGrid.SystemEvents;
      using Newtonsoft.Json;
      using Newtonsoft.Json.Linq;
    
      private static HttpClient httpClient = new HttpClient();
    
      public static async Task Run(EventGridEvent eventGridEvent, ILogger log)
      {
         log.LogInformation("Event Subject: " + eventGridEvent.Subject);
         log.LogInformation("Event Topic: " + eventGridEvent.Topic);
         log.LogInformation("Event Type: " + eventGridEvent.EventType);
         log.LogInformation(eventGridEvent.Data.ToString());
    
         if (eventGridEvent.EventType == "Microsoft.Storage.BlobCreated" || eventGridEvent.EventType == "Microsoft.Storage.FileRenamed") {
            StorageBlobCreatedEventData fileData = eventGridEvent.Data.ToObjectFromJson<StorageBlobCreatedEventData>();
            if (fileData.Api == "FlushWithClose") {
                  log.LogInformation("Triggering Databricks Job for file: " + fileData.Url);
                  var fileUrl = new Uri(fileData.Url);
                  var httpRequestMessage = new HttpRequestMessage {
                     Method = HttpMethod.Post,
                     RequestUri = new Uri(String.Format("https://{0}/api/2.0/jobs/run-now", System.Environment.GetEnvironmentVariable("DBX_INSTANCE", EnvironmentVariableTarget.Process))),
                     Headers = { 
                        { System.Net.HttpRequestHeader.Authorization.ToString(), "Bearer " + System.Environment.GetEnvironmentVariable("DBX_PAT", EnvironmentVariableTarget.Process)},
                        { System.Net.HttpRequestHeader.ContentType.ToString(), "application/json" }
                     },
                     Content = new StringContent(JsonConvert.SerializeObject(new {
                        job_id = System.Environment.GetEnvironmentVariable("DBX_JOB_ID", EnvironmentVariableTarget.Process),
                        notebook_params = new {
                              source_file = String.Join("", fileUrl.Segments.Skip(2))
                        }
                     }))
                  };
                  var response = await httpClient.SendAsync(httpRequestMessage);
                  response.EnsureSuccessStatusCode();
            }
         }
      }
    

Этот код позволяет проанализировать сведения о возникшем событии хранилища и создать сообщение запроса с URL-адресом файла, вызвавшего событие. В составе этого сообщения функция передает значение в мини-приложение source_file, которое вы создали ранее. С помощью кода функции сообщение отправляется в задание Databricks Delta и для аутентификации применяется токен, который вы получили ранее.

Создание подписки Сетки событий

В этом разделе вы создадите подписку на службу "Сетка событий", которая вызывает функцию Azure при отправке файлов в учетную запись хранения.

  1. Выберите Интеграция, а затем на странице Интеграция выберите Триггер Сетки событий.

  2. На панели Изменение триггера присвойте событию eventGridEventимя , а затем выберите Создать подписку на событие.

    Примечание

    Имя eventGridEvent соответствует параметру с именем, который передается в функцию Azure.

  3. На вкладке Основные сведения на странице Создание подписки на события измените или проверьте следующие параметры:

    Параметр Значение
    Имя contoso-order-event-subscription
    Тип раздела Учетная запись хранения
    Исходный ресурс contosoorders
    Имя системного раздела <create any name>
    Фильтр по типам событий Blob Created and Blob Deleted
  4. Нажмите кнопку Создать.

Тестирование подписки на Сетку событий

  1. Создайте файл с именем customer-order.csv, вставьте в него приведенный ниже код JSON и сохраните файл на локальном компьютере.

    InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
    536371,99999,EverGlow Single,228,1/1/2018 9:01,33.85,20993,Sierra Leone
    
  2. С помощью Обозревателя службы хранилища отправьте этот файл в папку input в учетной записи хранения.

    При отправке файла вызывается событие Microsoft.Storage.BlobCreated. Сетка событий уведомляет всех, кто подписался на это событие. В нашем примере единственным подписчиком является наша функция Azure. Эта функция Azure анализирует параметры события и определяет, какое событие произошло. Затем она передает URL-адрес файла в задание Databricks. В рамках задания Databricks файл считывается и в таблицу Databricks Delta, размещенную в учетной записи хранения, добавляется соответствующая строка.

  3. Чтобы проверить, успешно ли выполнено задание, просмотрите запуски задания. Вы увидите состояние завершения. Дополнительные сведения о просмотре запусков для задания см. в разделе Просмотр запусков для задания.

  4. В новой ячейке книги выполните приведенный ниже запрос, чтобы просмотреть обновленную таблицу Databricks Delta.

    %sql select * from customer_data
    

    Возвращается таблица, которая содержит последнюю запись.

    Последняя запись отображается в таблице

  5. Чтобы обновить эту запись, создайте файл с именем customer-order-update.csv, вставьте в него приведенный ниже код и сохраните файл на локальном компьютере.

    InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
    536371,99999,EverGlow Single,22,1/1/2018 9:01,33.85,20993,Sierra Leone
    

    Этот CSV-файл почти идентичен предыдущему, за исключением того, что количество в заказе изменилось с 228 на 22.

  6. С помощью Обозревателя службы хранилища отправьте этот файл в папку input в учетной записи хранения.

  7. Снова выполните запрос select, чтобы просмотреть обновленную разностную таблицу.

    %sql select * from customer_data
    

    Будет возвращена таблица, которая содержит обновленную запись.

    Обновленная запись отображается в таблице

Очистка ресурсов

Удалите группу ресурсов и все связанные с ней ресурсы, когда надобность в них отпадет. Для этого выберите группу ресурсов для учетной записи хранения и выберите Удалить.

Дальнейшие действия

Reacting to Blob storage events (preview) (Реагирование на события хранилища BLOB-объектов)