Создание кластеров, записных книжек и заданий с помощью Terraform

В этой статье описано, как использовать поставщик Databricks Terraform для создания кластера, записной книжки и задания в существующей рабочей области Azure Databricks.

Эта статья является сопроводительной к следующим статьям, посвященным началу работы с Azure Databricks:

Вы также можете адаптировать конфигурации Terraform, как описано в этой статье, для создания настраиваемых кластеров, записных книжек и заданий в рабочих областях.

Шаг 1. Создание и настройка проекта Terraform

  1. Создайте проект Terraform, следуя инструкциям в разделе "Требования " статьи о поставщике Databricks Terraform.

  2. Чтобы создать кластер, создайте файл с именем cluster.tfи добавьте в файл следующее содержимое. Это содержимое создает кластер с наименьшим количеством разрешенных ресурсов. В этом кластере используется последняя версия Databricks Runtime Long Term Support (LTS).

    Для кластера, который работает с Unity Catalog:

    variable "cluster_name" {}
    variable "cluster_autotermination_minutes" {}
    variable "cluster_num_workers" {}
    variable "cluster_data_security_mode" {}
    
    # Create the cluster with the "smallest" amount
    # of resources allowed.
    data "databricks_node_type" "smallest" {
      local_disk = true
    }
    
    # Use the latest Databricks Runtime
    # Long Term Support (LTS) version.
    data "databricks_spark_version" "latest_lts" {
      long_term_support = true
    }
    
    resource "databricks_cluster" "this" {
      cluster_name            = var.cluster_name
      node_type_id            = data.databricks_node_type.smallest.id
      spark_version           = data.databricks_spark_version.latest_lts.id
      autotermination_minutes = var.cluster_autotermination_minutes
      num_workers             = var.cluster_num_workers
      data_security_mode      = var.cluster_data_security_mode
    }
    
    output "cluster_url" {
     value = databricks_cluster.this.url
    }
    

    Для кластера общего назначения:

    variable "cluster_name" {
      description = "A name for the cluster."
      type        = string
      default     = "My Cluster"
    }
    
    variable "cluster_autotermination_minutes" {
      description = "How many minutes before automatically terminating due to inactivity."
      type        = number
      default     = 60
    }
    
    variable "cluster_num_workers" {
      description = "The number of workers."
      type        = number
      default     = 1
    }
    
    # Create the cluster with the "smallest" amount
    # of resources allowed.
    data "databricks_node_type" "smallest" {
      local_disk = true
    }
    
    # Use the latest Databricks Runtime
    # Long Term Support (LTS) version.
    data "databricks_spark_version" "latest_lts" {
      long_term_support = true
    }
    
    resource "databricks_cluster" "this" {
      cluster_name            = var.cluster_name
      node_type_id            = data.databricks_node_type.smallest.id
      spark_version           = data.databricks_spark_version.latest_lts.id
      autotermination_minutes = var.cluster_autotermination_minutes
      num_workers             = var.cluster_num_workers
    }
    
    output "cluster_url" {
     value = databricks_cluster.this.url
    }
    
  3. Чтобы создать кластер, создайте другой файл с именем cluster.auto.tfvarsи добавьте в файл следующее содержимое. Этот файл содержит значения переменных для настройки кластера. Замените значения заполнителей собственными значениями.

    Для кластера, который работает с Unity Catalog:

    cluster_name                    = "My Cluster"
    cluster_autotermination_minutes = 60
    cluster_num_workers             = 1
    cluster_data_security_mode      = "SINGLE_USER"
    

    Для кластера общего назначения:

    cluster_name                    = "My Cluster"
    cluster_autotermination_minutes = 60
    cluster_num_workers             = 1
    
  4. Чтобы создать записную книжку, создайте другой файл с именем notebook.tfи добавьте в файл следующее содержимое:

    variable "notebook_subdirectory" {
      description = "A name for the subdirectory to store the notebook."
      type        = string
      default     = "Terraform"
    }
    
    variable "notebook_filename" {
      description = "The notebook's filename."
      type        = string
    }
    
    variable "notebook_language" {
      description = "The language of the notebook."
      type        = string
    }
    
    resource "databricks_notebook" "this" {
      path     = "${data.databricks_current_user.me.home}/${var.notebook_subdirectory}/${var.notebook_filename}"
      language = var.notebook_language
      source   = "./${var.notebook_filename}"
    }
    
    output "notebook_url" {
     value = databricks_notebook.this.url
    }
    
  5. Если вы создаете кластер, сохраните следующий код записной книжки в файл в том же каталоге, что notebook.tf и файл:

    Для записной книжки Python для учебника: запуск сквозного конвейера аналитики lakehouse— файл с следующим notebook-getting-started-lakehouse-e2e.py содержимым:

    # Databricks notebook source
    external_location = "<your_external_location>"
    catalog = "<your_catalog>"
    
    dbutils.fs.put(f"{external_location}/foobar.txt", "Hello world!", True)
    display(dbutils.fs.head(f"{external_location}/foobar.txt"))
    dbutils.fs.rm(f"{external_location}/foobar.txt")
    
    display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))
    
    # COMMAND ----------
    
    from pyspark.sql.functions import col
    
    # Set parameters for isolation in workspace and reset demo
    username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
    database = f"{catalog}.e2e_lakehouse_{username}_db"
    source = f"{external_location}/e2e-lakehouse-source"
    table = f"{database}.target_table"
    checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"
    
    spark.sql(f"SET c.username='{username}'")
    spark.sql(f"SET c.database={database}")
    spark.sql(f"SET c.source='{source}'")
    
    spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
    spark.sql("CREATE DATABASE ${c.database}")
    spark.sql("USE ${c.database}")
    
    # Clear out data from previous demo execution
    dbutils.fs.rm(source, True)
    dbutils.fs.rm(checkpoint_path, True)
    
    # Define a class to load batches of data to source
    class LoadData:
    
      def __init__(self, source):
        self.source = source
    
      def get_date(self):
        try:
          df = spark.read.format("json").load(source)
        except:
            return "2016-01-01"
        batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
        if batch_date.month == 3:
          raise Exception("Source data exhausted")
          return batch_date
    
      def get_batch(self, batch_date):
        return (
          spark.table("samples.nyctaxi.trips")
            .filter(col("tpep_pickup_datetime").cast("date") == batch_date)
        )
    
      def write_batch(self, batch):
        batch.write.format("json").mode("append").save(self.source)
    
      def land_batch(self):
        batch_date = self.get_date()
        batch = self.get_batch(batch_date)
        self.write_batch(batch)
    
    RawData = LoadData(source)
    
    # COMMAND ----------
    
    RawData.land_batch()
    
    # COMMAND ----------
    
    # Import functions
    from pyspark.sql.functions import col, current_timestamp
    
    # Configure Auto Loader to ingest JSON data to a Delta table
    (spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "json")
      .option("cloudFiles.schemaLocation", checkpoint_path)
      .load(file_path)
      .select("*", col("_metadata.file_path").alias("source_file"), current_timestamp().alias("processing_time"))
      .writeStream
      .option("checkpointLocation", checkpoint_path)
      .trigger(availableNow=True)
      .option("mergeSchema", "true")
      .toTable(table))
    
    # COMMAND ----------
    
    df = spark.read.table(table_name)
    
    # COMMAND ----------
    
    display(df)
    

    Записная книжка Python для статьи Краткое руководство. Выполнение задания Spark в рабочей области Azure Databricks с помощью портала Azure, файл notebook-quickstart-create-databricks-workspace-portal.py со следующим содержимым.

    # Databricks notebook source
    blob_account_name = "azureopendatastorage"
    blob_container_name = "citydatacontainer"
    blob_relative_path = "Safety/Release/city=Seattle"
    blob_sas_token = r""
    
    # COMMAND ----------
    
    wasbs_path = 'wasbs://%s@%s.blob.core.windows.net/%s' % (blob_container_name, blob_account_name,blob_relative_path)
    spark.conf.set('fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name), blob_sas_token)
    print('Remote blob path: ' + wasbs_path)
    
    # COMMAND ----------
    
    df = spark.read.parquet(wasbs_path)
    print('Register the DataFrame as a SQL temporary view: source')
    df.createOrReplaceTempView('source')
    
    # COMMAND ----------
    
    print('Displaying top 10 rows: ')
    display(spark.sql('SELECT * FROM source LIMIT 10'))
    
  6. Если вы создаете записную книжку, создайте другой файл с именем notebook.auto.tfvarsи добавьте в файл следующее содержимое. Этот файл содержит значения переменных для настройки конфигурации записной книжки.

    Для записной книжки Python для учебника: запуск сквозного конвейера аналитики lakehouse:

    notebook_subdirectory = "Terraform"
    notebook_filename     = "notebook-getting-started-lakehouse-e2e.py"
    notebook_language     = "PYTHON"
    

    Записная книжка Python для статьи Краткое руководство. Выполнение задания Spark в рабочей области Azure Databricks с помощью портала Azure.

    notebook_subdirectory = "Terraform"
    notebook_filename     = "notebook-quickstart-create-databricks-workspace-portal.py"
    notebook_language     = "PYTHON"
    
  7. При создании записной книжки в рабочей области Azure Databricks обязательно настройте все требования для успешного выполнения записной книжки, выполнив следующие инструкции.

  8. Чтобы создать задание, создайте другой файл с именем job.tfи добавьте в файл следующее содержимое. Это содержимое создает задание для выполнения записной книжки.

    variable "job_name" {
      description = "A name for the job."
      type        = string
      default     = "My Job"
    }
    
    variable "task_key" {
      description = "A name for the task."
      type        = string
      default     = "my_task"
    }
    
    resource "databricks_job" "this" {
      name = var.job_name
      task {
        task_key = var.task_key
        existing_cluster_id = databricks_cluster.this.cluster_id
        notebook_task {
          notebook_path = databricks_notebook.this.path
        }
      }
      email_notifications {
        on_success = [ data.databricks_current_user.me.user_name ]
        on_failure = [ data.databricks_current_user.me.user_name ]
      }
    }
    
    output "job_url" {
      value = databricks_job.this.url
    }
    
  9. При создании задания создайте другой файл с именем job.auto.tfvars и добавьте в него следующее содержимое. Этот файл содержит значение переменной для настройки конфигурации задания.

    job_name = "My Job"
    task_key = "my_task"
    

Шаг 2. Выполнение конфигураций

На этом шаге вы выполните конфигурации Terraform для развертывания кластера, записной книжки и задания в рабочей области Azure Databricks.

  1. Проверьте, являются ли конфигурации Terraform допустимыми, выполнив команду terraform validate. Если будет сообщено об ошибках, исправьте их и снова выполните команду.

    terraform validate
    
  2. Предусмотрительно узнайте об операциях, которые Terraform выполнит в вашей рабочей области, выполнив команду terraform plan.

    terraform plan
    
  3. Разверните кластер, записную книжку и задание в рабочей области, выполнив команду terraform apply. При появлении запроса на развертывание введите yes и нажмите клавишу ВВОД.

    terraform apply
    

    Terraform развернет ресурсы, указанные в проекте. Развертывание этих ресурсов (особенно кластера) может занять несколько минут.

Шаг 3. Изучение результатов

  1. Если вы создали кластер, в выходных данных команды terraform apply скопируйте ссылку рядом с cluster_url и вставьте ее в адресную строку веб-браузера.

  2. Если вы создали записную книжку, в выходных данных команды terraform apply скопируйте ссылку рядом с notebook_url и вставьте ее в адресную строку веб-браузера.

    Примечание.

    Перед использованием записной книжки может потребоваться настроить ее содержимое. См. соответствующую документацию по настройке записной книжки.

  3. Если вы создали задание, в выходных данных команды terraform apply скопируйте ссылку рядом с job_url и вставьте ее в адресную строку веб-браузера.

    Примечание.

    Перед выполнением записной книжки может потребоваться настроить ее содержимое. См. ссылки в начале этой статьи, чтобы узнать, как настроить записную книжку.

  4. Если вы создали задание, выполните задание следующим образом:

    1. Щелкните Выполнить на странице задания.
    2. Чтобы просмотреть результаты выполнения задания после завершения задания, в списке Завершенные запуски (за последние 60 дней) на странице задания щелкните последнюю запись по времени в столбце Время начала. В области Выходные данные появится результат выполнения кода записной книжки.

Шаг 4. Очистка

На этом шаге вы удалите использовавшиеся ресурсы из рабочей области.

  1. Предусмотрительно узнайте об операциях, которые Terraform выполнит в вашей рабочей области, выполнив команду terraform plan.

    terraform plan
    
  2. Удалите кластер, записную книжку и задание из рабочей области, выполнив команду terraform destroy. При появлении запроса на удаление введите yes и нажмите клавишу ВВОД.

    terraform destroy
    

    Terraform удалит ресурсы, указанные в проекте.