Share via


適用於 Scala 的 Databricks 連線 中使用者定義的函式

注意

本文涵蓋 Databricks 連線 Databricks Runtime 14.1 和更新版本。

本文說明如何使用適用於 Scala 的 Databricks 連線 來執行使用者定義函式。 Databricks 連線 可讓您將熱門 IDE、Notebook 伺服器和自定義應用程式連線到 Azure Databricks 叢集。 如需本文的 Python 版本,請參閱適用於 Python 的 Databricks 連線 中的使用者定義函式。

注意

開始使用 Databricks 連線 之前,您必須先設定 Databricks 連線 用戶端

針對 Databricks Runtime 14.1 和更新版本,適用於 Scala 的 Databricks 連線 支援執行使用者定義的函式 (UDF)。

若要執行 UDF,必須將 UDF 所需的已編譯類別和 JAR 上傳至叢集。 addCompiledArtifacts() API 可用來指定必須上傳的已編譯類別和 JAR 檔案。

注意

用戶端所使用的 Scala 必須符合 Azure Databricks 叢集上的 Scala 版本。 若要檢查叢集的 Scala 版本,請參閱 Databricks Runtime 版本資訊版本和相容性叢集 Databricks Runtime 版本的一節。

下列 Scala 程式會設定簡單的 UDF,以將數據行中的值平方。

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI

    val spark = DatabricksSession.builder()
      .addCompiledArtifacts(sourceLocation)
      .getOrCreate()

    def squared(x: Int): Int = x * x

    val squared_udf = udf(squared _)

    spark.range(3)
      .withColumn("squared", squared_udf(col("id")))
      .select("squared")
      .show()
  }
}

在上述範例中,由於 UDF 完全包含在 內 Main,因此只會新增 的 Main 已編譯成品。 如果UDF散佈於其他類別,或使用外部連結庫(亦即JAR),則也應該包含所有這些連結庫。

當 Spark 工作階段已初始化時,可以使用 API 上傳 spark.addArtifact() 進一步編譯的類別和 JAR。

注意

上傳 JAR 時,必須包含所有可轉移的相依性 JAR 才能上傳。 API 不會執行任何可轉移相依性的自動偵測。

具類型的數據集 API

上一節中針對UDF所述的相同機制也適用於具類型的數據集API。

具類型的數據集 API 可讓其中一個 API 在產生的數據集上執行轉換,例如地圖、篩選和匯總。 這些也會在 Databricks 叢集上執行類似 UDF。

下列 Scala 應用程式會 map() 使用 API,將結果數據行中的數位修改為前置字串。

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI

    val spark = DatabricksSession.builder()
      .addCompiledArtifacts(sourceLocation)
      .getOrCreate()

    spark.range(3).map(f => s"row-$f").show()
  }
}

雖然此範例使用 map() API,但這也適用於其他具類型的數據集 API,例如 filter()mapPartitions()等。