Databricks Labs 的 dbx
重要
Databricks Labs 的 dbx 按原样提供,Databricks 不会通过客户技术支持渠道为它提供官方支持。 如需请求支持、解答问题和提出功能请求,可以通过 GitHub 上 databrickslabs/dbx 存储库的“问题”页沟通。 Databricks 支持人员不会解答或调查与此代码的使用相关的问题。
Databricks Labs 的 dbx 是一个开源工具,旨在扩展 Databricks 命令行接口 (Databricks CLI),并为 Azure Databricks 平台上的快速开发生命周期以及持续集成和持续交付/部署 (CI/CD) 提供功能。
dbx 简化了跨多个环境的作业启动和部署过程。 它还可以帮助打包项目并以版本受控的方式将其交付到 Azure Databricks 环境。 它采用 CLI 优先的设计,可以在 CI/CD 管道中有效使用,或者用作本地工具(例如本地 IDE,包括 Visual Studio Code 和 PyCharm)的一部分。
使用 dbx 的典型开发工作流为:
- 如果还没有可用的远程存储库,则使用 Databricks 支持的 Git 提供程序创建一个远程存储库。
- 将远程存储库克隆到 Azure Databricks 工作区。
- 在 Azure Databricks 工作区中已克隆的存储库内创建或移入一个 Azure Databricks 笔记本。 使用此笔记本开始为 Azure Databricks 群集需要运行的代码制作原型。
- 若要通过添加单独的帮助器类和函数、配置文件和测试来增强和模块化笔记本代码,请改用装有
dbx、首选 IDE 和 Git 的本地开发计算机。 - 将远程存储库克隆到本地开发计算机。
- 将笔记本中的代码移入一个或多个本地代码文件。
- 在本地编写代码时,将工作从本地存储库推送到远程存储库。 此外,将远程存储库与 Azure Databricks 工作区同步。
- 继续使用 Azure Databricks 工作区中的笔记本快速进行原型制作,并继续将已验证的代码从笔记本移到本地计算机。 继续使用本地 IDE 完成各种任务,例如代码模块化、代码完成、Lint 分析、单元测试,以及逐步调试代码和不需要实时连接到 Azure Databricks 的对象。
- 根据需要使用
dbx在目标群集上批量运行本地代码。 (这类似于在 Spark 的bin目录中运行 spark-submit 脚本,以在 Spark 群集上启动应用程序。) - 做好生产准备时,使用 GitHub Actions、Azure DevOps 或 GitLab 等 CI/CD 平台在群集上自动运行远程存储库的代码。
要求
若要使用 dbx,必须在本地开发计算机上安装以下必备组件,无论你的代码是使用 Python、Scala 还是 Java:
Python 3.6 或更高版本。
如果你的代码使用 Python,你应使用与目标群集上安装的版本匹配的 Python 版本。 若要获取现有群集上安装的 Python 版本,可以使用群集的 Web 终端运行
python --version命令。 另请参阅 Databricks Runtime 版本中的“系统环境”部分,了解适用于你的目标群集的 Databricks Runtime 版本。一个用于创建 Python 虚拟环境的方法,以确保在
dbx项目中使用正确版本的 Python 和包依赖项(如果你的代码使用 Python)。 本文将介绍 pipenv。Python 包索引 (PyPI) 中的 dbx 包。 可以运行
pip install dbx来安装此包。设置了身份验证的 Databricks CLI。 安装
dbx时会自动安装 Databricks CLI。 可以在本地开发计算机上的以下一个或两个位置设置此身份验证:- 在
DATABRICKS_HOST和DATABRICKS_TOKEN环境变量中(从 Databricks CLI 版本 0.8.0 开始)。 - 在
.databrickscfg文件中的配置文件内。
dbx分别在这两个位置查找身份验证凭据。dbx仅使用它找到的第一组匹配的凭据。注意
dbx不支持使用 .netrc 文件进行身份验证。- 在
用于推送和同步本地与远程代码更改的 git。
继续按照适用于以下 IDE 之一的说明操作:
注意
Databricks 已验证了将上述 IDE 与 dbx 配合使用;但是,dbx 应该可与任何 IDE 配合使用。 也可以不使用 IDE(仅限终端)。
dbx 经过了优化,可以处理单文件 Python 代码文件与已编译的 Scala 和 Java JAR 文件。 dbx 无法处理单文件 R 代码文件或已编译的 R 代码包。 这是因为 dbx 使用作业 API 2.0 和 2.1,而这些 API 无法将单文件 R 代码文件或已编译的 R 代码包作为作业运行。
Visual Studio Code
按照以下说明完成操作,以便开始将 Visual Studio Code 和 Python 与 dbx 配合使用。
在本地开发计算机上,除了满足一般要求以外,还必须安装以下组件:
Visual Studio Code 的 Python 扩展。 有关详细信息,请参阅 Visual Studio Code 网站上的扩展市场。
有关详细信息,请参阅 Visual Studio Code 文档中的 VS Code 中的 Python 入门。
按照以下步骤开始设置 dbx 项目结构:
在终端中创建一个空白文件夹。 这些说明使用名为
dbx-demo的文件夹。 可以为dbx项目的根文件夹指定所需的任何名称。 如果使用其他名称,请在所有这些步骤中替换该名称。 创建文件夹后,切换到该文件夹,然后从该文件夹启动 Visual Studio Code。对于 Linux 和 macOS:
mkdir dbx-demo cd dbx-demo code .提示
如果在运行
code .后显示了command not found: code,请参阅 Microsoft 网站上的从命令行启动。对于 Windows:
md dbx-demo cd dbx-demo code .在 Visual Studio Code 中,为此项目创建一个 Python 虚拟环境:
在菜单栏上,单击“视图”>“终端”。
从
dbx-demo文件夹的根目录,结合以下选项运行pipenv命令,其中<version>是已在本地安装的 Python 的目标版本(最好是与目标群集的 Python 版本匹配的版本),例如3.7.5。pipenv --python <version>记下
pipenv命令输出中的Virtualenv location值,因为在下一步骤中需要用到。
选择目标 Python 解释器,然后激活 Python 虚拟环境:
- 在菜单栏上,单击“视图”>“命令面板”,键入
Python: Select,然后单击“Python: 选择解释器”。 - 选择刚刚创建的 Python 虚拟环境的路径中的 Python 解释器。 (此路径在
pipenv命令的输出中作为Virtualenv location值列出。) - 在菜单栏上,单击“视图”>“命令面板”,键入
Terminal: Create,然后单击“终端: 创建新终端”。
有关详细信息,请参阅 Visual Studio Code 文档中的在 VS Code 中使用 Python 环境。
- 在菜单栏上,单击“视图”>“命令面板”,键入
继续创建 dbx 项目。
PyCharm
按照以下说明完成操作,以便开始将 PyCharm 和 Python 与 dbx 配合使用。
在本地开发计算机上,除了满足一般要求以外,还必须安装 PyCharm。
按照以下步骤开始设置 dbx 项目结构:
- 在 PyCharm 中的菜单栏上,单击“文件”>“新建项目”。
- 在“创建项目”对话框中,选择新项目的位置。
- 展开“Python 解释器: 新建 Pipenv 环境”。
- 选择“新建环境的方式”(如果尚未选择),然后从下拉列表中选择“Pipenv”。
- 对于“基本解释器”,请选择包含已在本地安装的目标 Python 版本的 Python 解释器(最好是与目标群集的 Python 版本匹配的版本)的位置。
- 对于“Pipenv 可执行文件”,请选择包含本地安装的
pipenv的位置(如果尚未自动检测到)。 - 如果你要创建一个极简的
dbx项目,并想要将main.py文件用于该极简dbx项目,请选中“创建 main.py 欢迎脚本”框。 否则清除此框。 - 单击“创建”。
- 在“项目”工具窗口中,右键单击项目的根文件夹,然后单击“打开方式”>“终端”。
- 继续创建 dbx 项目。
IntelliJ IDEA
按照以下说明完成操作,以便开始将 IntelliJ IDEA 和 Scala 与 dbx 配合使用。 按照这些说明执行操作可以创建一个基于 sbt 的极简 Scala 项目,该项目可用于启动 dbx 项目。
在本地开发计算机上,除了满足一般要求以外,还必须安装以下组件:
- IntelliJ IDEA。
- 适用于 IntelliJ IDEA 的 Scala 插件。 有关详细信息,请参阅 IntelliJ IDEA 文档中的发现适用于 Scala 的 IntelliJ IDEA。
- Java 运行时环境 (JRE) 8。 尽管 JRE 8 的任何版本都应适用,但 Databricks 迄今为止仅验证了将
dbx和 IntelliJ IDEA 与 OpenJDK 8 JRE 配合使用。 Databricks 尚未验证将dbx与 IntelliJ IDEA 和 Java 11 配合使用。 有关详细信息,请参阅 IntelliJ IDEA 文档中的 Java 开发工具包 (JDK)。
按照以下步骤开始设置 dbx 项目结构:
步骤 1:创建基于 sbt 的 Scala 项目
- 在 IntelliJ IDEA 中,根据你的视图,单击“项目”>“新建项目”或“文件”>“新建”>“项目”。
- 在“新建项目”对话框中,依次单击“Scala”、“sbt”和“下一步”。
- 输入项目的项目名称和位置。
- 对于“JDK”,请选择你安装的 OpenJDK 8 JRE。
- 对于“sbt”,请选择列出的最高可用
sbt版本。 - 对于“Scala”,最好选择与目标群集的 Scala 版本匹配的 Scala 版本。 请参阅 Databricks Runtime 版本中的“系统环境”部分,了解适用于你的目标群集的 Databricks Runtime 版本。
- 在“Scala”旁边,选中“源”框(如果尚未选中)。
- 将包前缀添加到“包前缀”。 这些步骤使用包前缀
com.example.demo。 如果指定其他包前缀,请在所有这些步骤中替换该包前缀。 - 单击“完成”。
步骤 2:将对象添加到包
可以将任何必需的对象添加到包。 此包中包含一个名为 SampleApp 的对象。
在“项目”工具窗口(“视图”>“工具窗口”>“项目”)中,右键单击 project-name> src > main > scala 文件夹,然后单击“新建”>“Scala 类”。
选择“对象”并键入对象的名称,然后按 Enter。 例如,键入
SampleApp。 如果在此处输入其他对象名称,请确保在所有这些步骤中替换该名称。将
SampleApp.scala文件的内容替换为以下代码:package com.example.demo object SampleApp { def main(args: Array[String]) { } }
步骤 3:生成项目
将任何必需的项目生成设置和依赖项添加到项目。 此步骤假定你要生成已在前面的步骤中设置的项目,并且它只依赖于以下库。
将项目的
build.sbt文件的内容替换为以下内容:ThisBuild / version := "0.1.0-SNAPSHOT" ThisBuild / scalaVersion := "2.12.14" val sparkVersion = "3.2.1" lazy val root = (project in file(".")) .settings( name := "dbx-demo", idePackagePrefix := Some("com.example.demo"), libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion withSources(), libraryDependencies += "org.apache.spark" %% "spark-sql" % sparkVersion withSources(), libraryDependencies += "org.apache.spark" %% "spark-hive" % sparkVersion withSources() )在上面的文件中:
- 将
2.12.14替换为之前为此项目选择的 Scala 版本。 - 将
3.2.1替换为之前为此项目选择的 Spark 版本。 - 将
dbx-demo替换为你的项目的名称。 - 将
com.example.demo替换为你的包前缀的名称。
- 将
在菜单栏上,单击“视图”>工具窗口”>“sbt”。
在“sbt”工具窗口中,右键单击你的项目的名称,然后单击“重新加载 sbt 项目”。 等待,直到
sbt完成了从 Internet 工件存储(如 Coursier 或在默认情况下使用的 Ivy,具体取决于你的sbt版本)下载项目的依赖项。 可以在状态栏中查看下载进度。 如果在此项目中添加或更改任何其他依赖项,则必须对添加或更改的每组依赖项重复此项目重新加载步骤。在菜单栏上,单击“IntelliJ IDEA”>“首选项”。
在“首选项”对话框中,单击“生成、执行、部署”>“生成工具”>“sbt”。
在 JVM 中,对于“JRE”,请选择你安装的 OpenJDK 8 JRE。
在“sbt 项目”中,选择你的项目的名称。
在“sbt shell”中,选择“builds”。
单击“确定”。
在菜单栏上,单击“生成”>“生成项目”。 生成的结果显示在“sbt shell”工具窗口(“视图”>“工具窗口”>“sbt shell”)中。
步骤 4 - 将代码添加到项目中
将任何所需的代码添加到项目中。 此步骤假定你只想将代码添加到 example 包中的 SampleApp.scala 文件。
在项目的 src>main>scala>SampleApp.scala 文件中,添加你希望 dbx 在目标群集上成批运行的代码。 对于基本测试,请使用代码示例部分中的示例 Scala 代码。
步骤 5:运行项目
- 在菜单栏上,单击“运行”>“编辑配置”。
- 在“运行/调试配置”对话框中,单击 +(“添加新配置”)图标或“新增”或或“添加新运行配置”。
- 在下拉列表中,单击“sbt 任务”。
- 对于“名称”,请输入配置的名称,例如“运行程序”。
- 对于“任务”,请输入
~run。 - 选择“使用 sbt shell”。
- 单击“确定”。
- 在菜单栏上,单击“运行”>“运行‘运行程序’”。 该运行的结果显示在“sbt shell”工具窗口中。
步骤 6:将项目生成为 JAR
可以将任何 JAR 生成设置添加到所需的项目。 此步骤假定你只想生成基于前面步骤中设置的项目的 JAR。
- 在菜单栏上,单击“文件”>“项目结构”。
- 在“项目结构”对话框中,单击“项目设置”>“工件”。
- 单击 +(“添加”)图标。
- 在下拉列表中,选择“JAR”>“从带有依赖项的模块”。
- 在“从模块创建 JAR”对话框中,对于“模块”,请选择你的项目的名称。
- 对于“主类”,请单击文件夹图标。
- 在“选择主类”对话框中的“按名称搜索”选项卡上,选择“SampleApp”,然后单击“确定”。
- 对于“库中的 JAR 文件”,请选择“复制到输出目录并通过清单链接”。
- 单击“确定”以关闭“从模块创建 JAR”对话框。
- 单击“确定”以关闭“项目结构”对话框。
- 在菜单栏上,单击“生成”>“生成工件”。
- 在显示的上下文菜单中,选择 project-name:jar >“生成”。 在
sbt生成 JAR 时等待。 生成的结果显示在“生成输出”工具窗口(“视图”>“工具窗口”>“生成”)中。
JAR 将生成到项目的 out>artifacts><project-name>_jar 文件夹中。 JAR 的名称为 <project-name>.jar。
步骤 7:在 IDE 中显示终端
现在 dbx 项目结构已到位,可以创建 dbx 项目了。
通过单击菜单栏上的“视图”>“工具窗口”>“终端”来显示 IntelliJ IDEA 终端,然后继续创建 dbx 项目。
Eclipse
按照以下说明完成操作,以便开始将 Eclipse 和 Java 与 dbx 配合使用。 按照这些说明执行操作创建一个基于 Maven 的极简 Java 项目,该项目可用于启动 dbx 项目。
在本地开发计算机上,除了满足一般要求以外,还必须安装以下组件:
- Eclipse 的版本。 这些说明使用 Eclipse IDE 的 Eclipse IDE for Java Developers 版本。
- 某个版本的 Java Runtime Environment (JRE) 或 Java 开发工具包 (JDK) 11,具体取决于本地计算机的操作系统。 尽管任何版本的 JRE 或 JDK 11 都应适用,但 Databricks 迄今为止仅验证了将
dbx和 Eclipse IDE for Java Developers 与 Eclipse 2022-03 R(其中包括 AdoptOpenJDK 11)配合使用。
按照以下步骤开始设置 dbx 项目结构:
步骤 1:创建基于 Maven 的 Java 项目
- 在 Eclipse 中,单击“文件”>“新建”>“项目”。
- 在“新建项目”对话框中,展开“Maven”,选择“Maven 项目”,然后单击“下一步”。
- 在“新建 Maven 项目”对话框中,选择“创建简单项目(跳过原型选择)”,然后单击“下一步”。
- 对于“组 ID”,请输入符合 Java 包名称规则的组 ID。 这些步骤使用包名称
com.example.demo。 如果输入其他组 ID,请在所有这些步骤中替换该包名称。 - 对于“工件 ID”,请输入不带版本号的 JAR 文件的名称。 这些步骤使用 JAR 名称
dbx-demo。 如果为 JAR 文件输入其他名称,请在所有这些步骤中替换该 JAR 名称。 - 单击“完成”。
步骤 2:将类添加到包
可以将任何类添加到所需的包。 此包将包含一个名为 SampleApp 的类。
- 在“项目资源管理器”视图(“窗口”>“显示视图”>“项目资源管理器”)中,选择 project-name 项目图标,然后单击“文件”>“新建”>“类”。
- 在“新建 Java 类”对话框中,对于“包”,请输入
com.example.demo。 - 对于“名称”,请输入
SampleApp。 - 对于“修饰符”,请选择“public”。
- 将“超类”留空。
- 对于“要创建的方法存根”,请选择“public static void Main(String[] args)”。
- 单击“完成”。
步骤 3 - 将依赖项添加到项目中
在“项目资源管理器”视图中,双击 project-name>“pom.xml”。
将以下依赖项添加为
<project>元素的子元素,然后保存文件:<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>3.2.1</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <version>3.2.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.12</artifactId> <version>3.2.1</version> <scope>provided</scope> </dependency> </dependencies>将:
2.12替换为目标群集的 Scala 版本。3.2.1替换为目标群集的 Spark 版本。
请参阅 Databricks Runtime 版本中的“系统环境”部分,了解适用于你的目标群集的 Databricks Runtime 版本。
步骤 4:编译项目
在项目的
pom.xml文件中,将以下 Maven 编译器属性添加为<project>元素的子元素,然后保存该文件:<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.6</maven.compiler.source> <maven.compiler.target>1.6</maven.compiler.target> </properties>在“项目资源管理器”视图中,右键单击 project-name 项目图标,然后单击“运行身份”>“运行配置”。
在“运行配置”对话框中,单击“Maven 生成”。
单击“新建启动配置”图标。
输入此启动配置的名称,例如“清理编译”。
对于“基目录”,请单击“工作区”,选择项目的目录,然后单击“确定”。
对于“目标”,请输入
clean compile。单击 “运行” 。 该运行的输出显示在“控制台”视图(“窗口”>“显示视图”>“控制台”)中。
步骤 5 - 将代码添加到项目中
可以将任何代码添加到所需的项目。 此步骤假定你只想将代码添加到名为 com.example.demo 的包中的一个名为 SampleApp.java 的文件。
在项目的 src/main/java>com.example.demo>SampleApp.java 文件中,添加你希望 dbx 在目标群集上成批运行的代码。 (如果未准备好任何代码,可以使用本文末尾列出的代码示例中的 Java 代码。)
步骤 6:运行项目
- 在“项目资源管理器”视图中,右键单击 project-name 项目图标,然后单击“运行身份”>“运行配置”。
- 在“运行配置”对话框中,展开“Java 应用程序”,然后单击“应用”。
- 单击 “运行” 。 该运行的输出显示在“控制台”视图中。
步骤 7:将项目生成为 JAR
- 在“项目资源管理器”视图中,右键单击 project-name 项目图标,然后单击“运行身份”>“运行配置”。
- 在“运行配置”对话框中,单击“Maven 生成”。
- 单击“新建启动配置”图标。
- 输入此启动配置的名称,例如“清理包”。
- 对于“基目录”,请单击“工作区”,选择项目的目录,然后单击“确定”。
- 对于“目标”,请输入
clean package。 - 单击 “运行” 。 该运行的输出显示在“控制台”视图中。
JAR 将生成到 <project-name>>target 文件夹中。 JAR 的名称为 <project-name>-0.0.1-SNAPSHOT.jar。
注意
如果 JAR 最初未显示在“项目资源管理器”窗口中的 target 文件夹,则可以尝试通过右键单击 project-name 项目图标,然后单击“刷新”来显示它。
步骤 8:在 IDE 中显示终端
现在 dbx 项目结构已到位,可以创建 dbx 项目了。 首先,设置“项目资源管理器”视图以显示 dbx 生成的隐藏文件(以点 (./) 开头的文件),如下所示:
- 在“项目资源管理器”视图中,单击省略号(“视图菜单”)筛选器图标,然后单击“筛选器和自定义”。
- 在“筛选器和自定义”对话框中的“预设置筛选器”选项卡上,清除 . resources* 框。
- 单击“确定”。
接下来,按下面所示显示 Eclipse 终端:
- 单击菜单栏上的“窗口”>“显示视图”>“终端”。
- 如果未显示终端的命令提示符,请在“终端”视图中单击“打开终端”图标。
- 使用
cd命令可切换到项目的根目录。 - 继续创建 dbx 项目。
无 IDE(仅使用终端)
按照以下说明完成操作,以便开始将终端和 Python 与 dbx 配合使用。
按照以下步骤使用终端开始设置 dbx 项目结构:
在终端中创建一个空白文件夹。 以下说明使用名为
dbx-demo的文件夹(但你可为dbx项目的根文件夹指定任意名称)。 创建文件夹后,切换到该文件夹。对于 Linux 和 macOS:
mkdir dbx-demo cd dbx-demo对于 Windows:
md dbx-demo cd dbx-demo通过从
dbx-demo文件夹的根目录结合以下选项运行pipenv命令(其中<version>是已在本地安装的 Python 的目标版本,例如3.7.5),为此项目创建一个 Python 虚拟环境。pipenv --python <version>运行
pipenv shell激活 Python 虚拟环境。pipenv shell继续创建 dbx 项目。
创建 dbx 项目
由于已通过执行上述某一部分的操作让 dbx 项目结构就位,现在可以创建以下类型之一的项目:
创建适用于 Python 的极简 dbx 项目
以下极简 dbx 项目是开始使用 Python 和 dbx 的最简捷方法。 它演示了如何在 Azure Databricks 工作区中的现有 Azure Databricks 通用群集上批量运行单个 Python 代码文件。
注意
若要创建一个适用于 Python 的 dbx 模板化项目来演示通用群集和作业群集上代码的批量运行、远程代码项目部署和 CI/CD 平台设置,请转到创建适用于 Python 且支持 CI/CD 的 dbx 模板化项目。
若要完成此过程,你的工作区中必须有一个现有的通用群集。 (请参阅显示群集或创建群集。)Python 虚拟环境中的 Python 版本最好(但不强制要求)与此群集上安装的版本匹配。 若要获取群集上的 Python 版本,可以使用群集的 Web 终端运行命令 python --version。
python --version
在终端中,从
dbx项目的根文件夹结合以下选项运行dbx configure命令。 此命令在dbx项目的根文件夹中创建一个隐藏的.dbx文件夹。 此.dbx文件夹包含lock.json和project.json文件。dbx configure --profile DEFAULT注意
project.json文件包含对 Databricks CLI.databrickscfg文件中DEFAULT配置文件的引用。 如果你希望dbx使用其他配置文件,请将DEFAULT替换为目标配置文件的名称。如果你希望
dbx使用DATABRICKS_HOST和DATABRICKS_TOKEN环境变量而不是 Databricks CLI.databrickscfg文件中的配置文件,请将project.json中的DEFAULT保留原样。dbx默认使用此引用。在
dbx项目的根文件夹中创建一个名为conf的文件夹。对于 Linux 和 macOS:
mkdir conf对于 Windows:
md conf在
conf目录中添加包含以下内容的名为deployment.yaml的文件:environments: default: jobs: - name: "dbx-demo-job" spark_python_task: python_file: "dbx-demo-job.py"注意
deployment.yaml文件包含小写单词default,即对 Databricks CLI.databrickscfg文件中大写DEFAULT配置文件的引用。 如果你希望dbx使用其他配置文件,请将default替换为目标配置文件的名称。如果你希望
dbx使用DATABRICKS_HOST和DATABRICKS_TOKEN环境变量而不是 Databricks CLI.databrickscfg文件中的配置文件,请将deployment.yaml中的default保留原样。dbx默认使用此引用。在名为
dbx-demo-job.py的文件中添加要在群集上运行的代码,然后将该文件添加到dbx项目的根文件夹。 (如果未准备好任何代码,可以使用本文末尾列出的代码示例中的 Python 代码。)注意
不必将此文件命名为
dbx-demo-job.py。 如果选择了不同的文件名,请务必更新conf/deployment.yaml文件中的python_file字段,使文件名匹配。结合以下选项运行命令
dbx execute。 在此命令中,请将<existing-cluster-id>替换为你的工作区中目标群集的 ID。 (要获取该 ID,请参阅群集 URL 和 ID。)dbx execute --cluster-id=<existing-cluster-id> --job=dbx-demo-job --no-rebuild --no-package若要查看本地运行结果,请查看终端的输出。 若要查看群集上的运行结果,请转到群集的“驱动程序日志”选项卡中的“标准输出”窗格。 (参阅群集驱动程序和工作器日志。)
继续执行后续步骤。
创建适用于 Scala 或 Java 的极简 dbx 项目
以下极简 dbx 项目是开始使用 dbx 和 Scala 或 Java 的最简捷方法。 它演示如何将单个 Scala 或 Java JAR 部署到 Azure Databricks 工作区,然后在 Azure Databricks 工作区的 Azure Databricks 作业群集上运行这个已部署的 JAR。
注意
Azure Databricks 限制了在群集上运行 Scala 和 Java 代码的方式:
- 不能像运行单个 Python 文件一样在群集上以作业的形式运行单个 Scala 或 Java 文件。 若要运行 Scala 或 Java 代码,必须先将其生成到 JAR 中。
- 可以在现有的通用群集上以作业的形式运行 JAR。 但是,不能在同一通用群集上重新安装该 JAR 的任何更新。 在这种情况下,必须改用作业群集。 本部分使用作业群集方法。
- 必须先将 JAR 部署到 Azure Databricks 工作区,然后才能在该工作区中的任何通用群集或作业群集上运行这个已部署的 JAR。
在终端中,从项目的根文件夹运行带有以下选项的
dbx configure命令。 此命令在项目的根文件夹中创建一个隐藏的.dbx文件夹。 此.dbx文件夹包含lock.json和project.json文件。dbx configure --profile DEFAULT注意
project.json文件包含对 Databricks CLI.databrickscfg文件中DEFAULT配置文件的引用。 如果你希望dbx使用其他配置文件,请将DEFAULT替换为目标配置文件的名称。如果你希望
dbx使用DATABRICKS_HOST和DATABRICKS_TOKEN环境变量(而不是 Databricks CLI.databrickscfg文件中的配置文件),请将project.json文件中的DEFAULT保留原样。dbx默认使用此引用。在项目的根文件夹中创建一个名为
conf的文件夹。对于 Linux 和 macOS:
mkdir conf对于 Windows:
md conf在
conf目录中添加包含以下极简文件内容的名为deployment.yaml的文件:environments: default: strict_path_adjustment_policy: true jobs: - name: "dbx-demo-job" new_cluster: spark_version: "10.4.x-scala2.12" node_type_id: "Standard_DS3_v2" num_workers: 2 instance_pool_id: "my-instance-pool" libraries: - jar: "file://out/artifacts/dbx_demo_jar/dbx-demo.jar" spark_jar_task: main_class_name: "com.example.demo.SampleApp"将:
spark_version的值替换为适用于目标作业群集的 Runtime 版本字符串。node_type_id的值替换为适用于目标作业群集的相应群集节点类型。instance_pool_id的值替换为工作区中现有实例池的 ID,以便更快地运行作业。 如果没有现有实例池可用,或者不想使用实例池,请完全删除此行。jar的值替换为项目中指向 JAR 的路径。 对于支持 Scala 的 IntelliJ IDEA,该路径可能是file://out/artifacts/dbx_demo_jar/dbx-demo.jar。 对于支持 Java 的 Eclipse IDE,该路径可能是file://target/dbx-demo-0.0.1-SNAPSHOT.jar。main_class_name的值替换为 JAR 中主类的名称,例如com.example.demo.SampleApp。
注意
deployment.yaml文件包含单词default,它是对.dbx/project.json文件中default环境的引用,而该环境又是对 Databricks CLI.databrickscfg文件中的DEFAULT配置文件的引用。 如果希望dbx使用其他配置文件,请将此deployment.yaml文件中的default替换为.dbx/project.json文件中的相应引用,该引用又引用 Databricks CLI.databrickscfg文件中的相应配置文件。如果你希望
dbx使用DATABRICKS_HOST和DATABRICKS_TOKEN环境变量而不是 Databricks CLI.databrickscfg文件中的配置文件,请将deployment.yaml中的default保留原样。 默认情况下,dbx将使用.dbx/project.json文件中的default环境设置(profile值除外)。运行带有以下选项的
dbx deploy命令。dbx将 JAR 部署到.dbx/project.json文件的适用于匹配环境的artifact_location路径中的位置。dbx还将项目的文件作为 MLflow 试验的一部分部署到.dbx/project.json文件的适用于匹配环境的workspace_dir路径中列出的位置。dbx deploy --no-rebuild --no-package运行带有以下选项的
dbx launch命令。 此命令运行conf/deployment.yaml中具有该匹配名称的作业。 为了查找要作为作业的一部分运行的已部署 JAR,dbx引用了.dbx/project.json文件的适用于匹配环境的artifact_location路径中的位置。 为了确定要运行的特定 JAR,dbx引用了.dbx/project.json文件的适用于匹配环境的workspace_dir路径中列出的位置中的 MLflow 试验。dbx launch --job=dbx-demo-job若要查看作业群集上的作业运行结果,请参阅查看作业。
若要查看作业引用的试验,请参阅试验。
继续执行后续步骤。
创建适用于 Python 且支持 CI/CD 的 dbx 模板化项目
以下适用于 Python 的 dbx 模板化项目演示了对在 Azure Databricks 工作区中的 Azure Databricks 通用群集与作业群集上批量运行 Python 代码、远程代码项目部署和 CI/CD 平台设置的支持。 (若要创建一个适用于 Python 的极简 dbx 项目来仅演示如何在现有通用群集上批量运行单个 Python 代码文件,请转回到创建适用于 Python 的极简 dbx 项目。)
在终端中,在
dbx项目的根文件夹中运行dbx init命令。dbx init对于“project_name”,请输入项目的名称,或按 Enter 接受默认项目名称。
对于“version”,请输入项目的起始版本号,或按 Enter 接受默认项目版本。
对于“cloud”,请选择与项目要使用的 Azure Databricks 云版本对应的编号,或按 Enter 接受默认值。
对于“cicd_tool”,请选择与项目要使用的受支持 CI/CD 工具对应的编号,或按 Enter 接受默认值。
对于“project_slug”,请输入要用于项目中的资源的前缀,或按 Enter 接受默认值。
对于“workspace_dir”,请输入项目的工作区目录的本地路径,或按 Enter 接受默认值。
对于“artifact_location”,请输入项目工件要写入到的 Azure Databricks 工作区中的路径,或按 Enter 接受默认值。
对于“profile”,请输入项目要使用的 Databricks CLI 身份验证配置文件的名称,或按 Enter 接受默认值。
提示
可以结合硬编码的模板参数运行 dbx init 来跳过上述步骤,例如:
dbx init --template="python_basic" \
-p "project_name=cicd-sample-project" \
-p "cloud=Azure" \
-p "cicd_tool='Azure DevOps'" \
-p "profile=DEFAULT" \
--no-input
dbx 自动计算参数 project_slug、workspace_dir 和 artifact_location。 这三个参数是可选的,它们仅对更高级的用例有用。
请参阅 dbx 文档中 CLI 参考中的 init 命令。
若要使用新项目,请参阅 dbx 文档中的基本 Python 模板。
另请参阅后续步骤。
代码示例
如果你未准备好可以通过 dbx 批量运行的任何代码,可以试着让 dbx 批量运行以下代码。 此代码在工作区中创建一个小型表,查询该表,然后删除该表。
提示
若要将该表保留在工作区中而不删除它,请先注释掉此示例中的最后一行代码,然后再通过 dbx 批量运行它。
Python
# For testing and debugging of local objects, run
# "pip install pyspark=X.Y.Z", where "X.Y.Z"
# matches the version of PySpark
# on your target clusters.
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from datetime import date
spark = SparkSession.builder.appName("dbx-demo").getOrCreate()
# Create a DataFrame consisting of high and low temperatures
# by airport code and date.
schema = StructType([
StructField('AirportCode', StringType(), False),
StructField('Date', DateType(), False),
StructField('TempHighF', IntegerType(), False),
StructField('TempLowF', IntegerType(), False)
])
data = [
[ 'BLI', date(2021, 4, 3), 52, 43],
[ 'BLI', date(2021, 4, 2), 50, 38],
[ 'BLI', date(2021, 4, 1), 52, 41],
[ 'PDX', date(2021, 4, 3), 64, 45],
[ 'PDX', date(2021, 4, 2), 61, 41],
[ 'PDX', date(2021, 4, 1), 66, 39],
[ 'SEA', date(2021, 4, 3), 57, 43],
[ 'SEA', date(2021, 4, 2), 54, 39],
[ 'SEA', date(2021, 4, 1), 56, 41]
]
temps = spark.createDataFrame(data, schema)
# Create a table on the cluster and then fill
# the table with the DataFrame's contents.
# If the table already exists from a previous run,
# delete it first.
spark.sql('USE default')
spark.sql('DROP TABLE IF EXISTS demo_temps_table')
temps.write.saveAsTable('demo_temps_table')
# Query the table on the cluster, returning rows
# where the airport code is not BLI and the date is later
# than 2021-04-01. Group the results and order by high
# temperature in descending order.
df_temps = spark.sql("SELECT * FROM demo_temps_table " \
"WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " \
"GROUP BY AirportCode, Date, TempHighF, TempLowF " \
"ORDER BY TempHighF DESC")
df_temps.show()
# Results:
#
# +-----------+----------+---------+--------+
# |AirportCode| Date|TempHighF|TempLowF|
# +-----------+----------+---------+--------+
# | PDX|2021-04-03| 64| 45|
# | PDX|2021-04-02| 61| 41|
# | SEA|2021-04-03| 57| 43|
# | SEA|2021-04-02| 54| 39|
# +-----------+----------+---------+--------+
# Clean up by deleting the table from the cluster.
spark.sql('DROP TABLE demo_temps_table')
Scala
package com.example.demo
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import java.sql.Date
object SampleApp {
def main(args: Array[String]) {
val spark = SparkSession.builder().master("local").getOrCreate()
val schema = StructType(Array(
StructField("AirportCode", StringType, false),
StructField("Date", DateType, false),
StructField("TempHighF", IntegerType, false),
StructField("TempLowF", IntegerType, false)
))
val data = List(
Row("BLI", Date.valueOf("2021-04-03"), 52, 43),
Row("BLI", Date.valueOf("2021-04-02"), 50, 38),
Row("BLI", Date.valueOf("2021-04-01"), 52, 41),
Row("PDX", Date.valueOf("2021-04-03"), 64, 45),
Row("PDX", Date.valueOf("2021-04-02"), 61, 41),
Row("PDX", Date.valueOf("2021-04-01"), 66, 39),
Row("SEA", Date.valueOf("2021-04-03"), 57, 43),
Row("SEA", Date.valueOf("2021-04-02"), 54, 39),
Row("SEA", Date.valueOf("2021-04-01"), 56, 41)
)
val rdd = spark.sparkContext.makeRDD(data)
val temps = spark.createDataFrame(rdd, schema)
// Create a table on the Databricks cluster and then fill
// the table with the DataFrame's contents.
// If the table already exists from a previous run,
// delete it first.
spark.sql("USE default")
spark.sql("DROP TABLE IF EXISTS demo_temps_table")
temps.write.saveAsTable("demo_temps_table")
// Query the table on the Databricks cluster, returning rows
// where the airport code is not BLI and the date is later
// than 2021-04-01. Group the results and order by high
// temperature in descending order.
val df_temps = spark.sql("SELECT * FROM demo_temps_table " +
"WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
"GROUP BY AirportCode, Date, TempHighF, TempLowF " +
"ORDER BY TempHighF DESC")
df_temps.show()
// Results:
//
// +-----------+----------+---------+--------+
// |AirportCode| Date|TempHighF|TempLowF|
// +-----------+----------+---------+--------+
// | PDX|2021-04-03| 64| 45|
// | PDX|2021-04-02| 61| 41|
// | SEA|2021-04-03| 57| 43|
// | SEA|2021-04-02| 54| 39|
// +-----------+----------+---------+--------+
// Clean up by deleting the table from the Databricks cluster.
spark.sql("DROP TABLE demo_temps_table")
}
}
Java
package com.example.demo;
import java.util.ArrayList;
import java.util.List;
import java.sql.Date;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.Dataset;
public class SampleApp {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("Temps Demo")
.config("spark.master", "local")
.getOrCreate();
// Create a Spark DataFrame consisting of high and low temperatures
// by airport code and date.
StructType schema = new StructType(new StructField[] {
new StructField("AirportCode", DataTypes.StringType, false, Metadata.empty()),
new StructField("Date", DataTypes.DateType, false, Metadata.empty()),
new StructField("TempHighF", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("TempLowF", DataTypes.IntegerType, false, Metadata.empty()),
});
List<Row> dataList = new ArrayList<Row>();
dataList.add(RowFactory.create("BLI", Date.valueOf("2021-04-03"), 52, 43));
dataList.add(RowFactory.create("BLI", Date.valueOf("2021-04-02"), 50, 38));
dataList.add(RowFactory.create("BLI", Date.valueOf("2021-04-01"), 52, 41));
dataList.add(RowFactory.create("PDX", Date.valueOf("2021-04-03"), 64, 45));
dataList.add(RowFactory.create("PDX", Date.valueOf("2021-04-02"), 61, 41));
dataList.add(RowFactory.create("PDX", Date.valueOf("2021-04-01"), 66, 39));
dataList.add(RowFactory.create("SEA", Date.valueOf("2021-04-03"), 57, 43));
dataList.add(RowFactory.create("SEA", Date.valueOf("2021-04-02"), 54, 39));
dataList.add(RowFactory.create("SEA", Date.valueOf("2021-04-01"), 56, 41));
Dataset<Row> temps = spark.createDataFrame(dataList, schema);
// Create a table on the Databricks cluster and then fill
// the table with the DataFrame's contents.
// If the table already exists from a previous run,
// delete it first.
spark.sql("USE default");
spark.sql("DROP TABLE IF EXISTS demo_temps_table");
temps.write().saveAsTable("demo_temps_table");
// Query the table on the Databricks cluster, returning rows
// where the airport code is not BLI and the date is later
// than 2021-04-01. Group the results and order by high
// temperature in descending order.
Dataset<Row> df_temps = spark.sql("SELECT * FROM demo_temps_table " +
"WHERE AirportCode != 'BLI' AND Date > '2021-04-01' " +
"GROUP BY AirportCode, Date, TempHighF, TempLowF " +
"ORDER BY TempHighF DESC");
df_temps.show();
// Results:
//
// +-----------+----------+---------+--------+
// |AirportCode| Date|TempHighF|TempLowF|
// +-----------+----------+---------+--------+
// | PDX|2021-04-03| 64| 45|
// | PDX|2021-04-02| 61| 41|
// | SEA|2021-04-03| 57| 43|
// | SEA|2021-04-02| 54| 39|
// +-----------+----------+---------+--------+
// Clean up by deleting the table from the Databricks cluster.
spark.sql("DROP TABLE demo_temps_table");
}
}
后续步骤
- 扩展 conf/deployment.yaml 文件以支持各种类型的通用群集和作业群集定义。
- 在
conf/deployment.yaml文件中声明多任务作业。 - 在
conf/deployment.yaml文件中引用环境变量和命名属性。 - 使用 dbx execute 命令在群集上以新作业的形式批量运行代码。
限制
- 不支持包含笔记本任务类型的作业。
- 目标通用群集或作业群集必须使用用于机器学习的 Databricks Runtime (Databricks Runtime ML) 版本。
dbx execute不能用于多任务作业。- 仅完全支持 Python。
其他资源
- 使用 dbx deploy 命令将代码项目批量部署到 Azure Databricks 工作区存储。
- 使用 dbx launch 命令在群集上批量运行现有作业。
- 使用 dbx 执行无作业部署。
- 将 dbx 与 Azure 数据工厂集成。
- 将 dbx 与 Apache Airflow 集成。
- 详细了解 dbx 和 CI/CD。
- dbx 文档
- GitHub 上的 databrickslabs/dbx 存储库
- dbx 限制