Apache Spark 连接器:SQL Server 和 Azure SQL

用于 SQL Server 和 Azure SQL 的 Apache Spark 连接器是一种高性能连接器,可便于在大数据分析中使用事务数据,并暂留结果以用于即席查询或报告。 借助此连接器,可以使用任何 SQL 数据库(无论是在本地,还是在云中)作为 Spark 作业的输入数据源或输出数据接收器。

此库包含用于 SQL Server 和 Azure SQL 的 Apache Spark 连接器的源代码。

Apache Spark 是用于大规模数据处理的统一分析引擎。

可以通过 Maven 提供以下两个连接器版本:2.4 兼容版本和 3.0 兼容版本。 可在此处找到这两个版本,并可以使用下面的坐标导入:

连接器 Maven 坐标
Spark 2.4.x 兼容连接器 com.microsoft.azure:spark-mssql-connector:1.0.2
Spark 3.0.x 兼容连接器 com.microsoft.azure:spark-mssql-connector_2.12:1.1.0
Spark 3.1.x 兼容连接器 com.microsoft.azure:spark-mssql-connector_2.12:1.2.0

也可以从源构建连接器,或从 GitHub 的“发布”部分下载 jar。 有关连接器的最新信息,请参阅 SQL Spark 连接器 GitHub 存储库

支持的功能

  • 支持所有 Spark 绑定(Scala、Python、R)
  • 基本身份验证和 Active Directory (AD) 密钥选项卡支持
  • 重新排序的 dataframe 写入支持
  • 支持在 SQL Server 大数据群集中写入 SQL Server 单实例和数据池
  • Sql Server 单实例的可靠连接器支持
组件 支持的版本
Apache Spark 2.4.x、3.0.x、3.1.x
Scala 2.11、2.12
Microsoft JDBC Driver for SQL Server 8.4
Microsoft SQL Server SQL Server 2008 或更高版本
Azure SQL 数据库 支持

支持的选项

用于 SQL Server 和 Azure SQL 的 Apache Spark 连接器支持此处定义的选项:SQL DataSource JDBC

此外,还支持以下选项

选项 默认 说明
reliabilityLevel BEST_EFFORT BEST_EFFORTNO_DUPLICATESNO_DUPLICATES 在执行器重启方案中实现可靠插入
dataPoolDataSource none none 表示未设置值,连接器应写入 SQL Server 单实例。 将此值设置为数据源名称,以在大数据群集中写入数据池表
isolationLevel READ_COMMITTED 指定隔离级别
tableLock false 实现带有 TABLOCK 选项的插入以提高写入性能
schemaCheckEnabled true 当设置为 false 时,禁用严格的数据帧和 SQL 表架构检查

其他大容量复制选项可以设置为 dataframe 上的选项,并将在写入时传递到 bulkcopy API

性能比较

用于 SQL Server 和 Azure SQL 的 Apache Spark 连接器比用于写入 SQL Server 的通用 JDBC 连接器快 15 倍。 性能特征因类型、数据量、使用的选项而异,并可能显示运行时的变化。 以下性能结果是用 spark dataframe 中的 143.9M 行覆盖 SQL 表所用的时间。 spark dataframe 是通过读取使用 spark TPCDS 基准生成的 store_sales HDFS 表构造的。 不包括 store_salesdataframe 的读取时间。 结果是三次运行的平均时间。

连接器类型 选项 说明 写入时间
JDBCConnector 默认 具有默认选项的通用 JDBC 连接器 1385 秒
sql-spark-connector BEST_EFFORT 具有默认选项的最佳工作状态 sql-spark-connector 580 秒
sql-spark-connector NO_DUPLICATES 可靠 sql-spark-connector 709 秒
sql-spark-connector BEST_EFFORT + tabLock=true 启用表锁的最佳工作状态 sql-spark-connector 72 秒
sql-spark-connector NO_DUPLICATES + tabLock=true 启用表锁的可靠 sql-spark-connector 198 秒

配置

  • Spark 配置:num_executors = 20、executor_memory = '1664 m'、executor_cores = 2
  • 数据生成配置:scale_factor=50、partitioned_tables=true
  • 数据文件 store_sales 包含行数 143,997,590

环境

  • SQL Server 大数据群集 CU5
  • master + 6 个节点
  • 每个节点第 5 代服务器,512 GB Ram,每个节点 4 TB NVM,NIC 10 GB

常见问题

java.lang.NoClassDefFoundError: com/microsoft/aad/adal4j/AuthenticationException

在 hadoop 环境中使用 mssql 驱动程序的较旧版本(现在包含在此连接器中)时,会出现此问题。 如果使用的是旧版 Azure SQL 连接器,并且已将驱动程序手动安装到该群集上以确保 Microsoft Entra 身份验证兼容性,则需要删除这些驱动程序。

修复问题的步骤:

  1. 如果使用的是通用 Hadoop 环境,请检查并删除 mssql jar:rm $HADOOP_HOME/share/hadoop/yarn/lib/mssql-jdbc-6.2.1.jre7.jar。 如果使用的是 Databricks,请添加全局或群集初始化脚本,以从 /databricks/jars 文件夹中删除旧版 mssql 驱动程序,或将此行添加到现有脚本:rm /databricks/jars/*mssql*

  2. 添加 adal4jmssql 包。 例如,可以使用 Maven,但任何方式都应有效。

    注意

    请勿以此方式安装 SQL spark 连接器。

  3. 将驱动程序类添加到连接配置。 例如:

    connectionProperties = {
      `Driver`: `com.microsoft.sqlserver.jdbc.SQLServerDriver`
    }`
    

有关详细信息和说明,请参阅 https://github.com/microsoft/sql-spark-connector/issues/26 解决方案。

开始使用

用于 SQL Server 和 Azure SQL 的 Apache Spark 连接器基于 Spark DataSourceV1 API 和 SQL Server 批处理 API,并使用与内置 JDBC Spark-SQL 连接器相同的接口。 通过这种集成,你可以轻松集成连接器,并通过使用 com.microsoft.sqlserver.jdbc.spark 更新格式参数来迁移现有 Spark 作业。

若要在项目中包含连接器,请下载此存储库,并使用 SBT 生成 jar。

写入到新 SQL 表

警告

overwrite 模式先删除默认情况下数据库中已存在的表。 请谨慎使用此选项,以免发生意外数据丢失。

在使用 overwrite 模式时,如果在重新创建表时没有使用选项 truncate,则索引将会丢失。 ,列存储表现在为堆。 若要保留现有索引,请同时指定值为 true 的选项 truncate。 例如,.option("truncate","true")

server_name = "jdbc:sqlserver://{SERVER_ADDR}"
database_name = "database_name"
url = server_name + ";" + "databaseName=" + database_name + ";"

table_name = "table_name"
username = "username"
password = "password123!#" # Please specify password here

try:
  df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("overwrite") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", username) \
    .option("password", password) \
    .save()
except ValueError as error :
    print("Connector write failed", error)

追加到 SQL 表

try:
  df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("append") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", username) \
    .option("password", password) \
    .save()
except ValueError as error :
    print("Connector write failed", error)

指定隔离级别

在对数据库执行大容量插入时,此连接器默认使用 READ_COMMITTED 隔离级别。 若要替代此隔离级别,请使用 mssqlIsolationLevel 选项,如下所示。

    .option("mssqlIsolationLevel", "READ_UNCOMMITTED") \

从 SQL 表中读取数据

jdbcDF = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("user", username) \
        .option("password", password).load()

Microsoft Entra 身份验证

使用服务主体的 Python 示例

context = adal.AuthenticationContext(authority)
token = context.acquire_token_with_client_credentials(resource_app_id_url, service_principal_id, service_principal_secret)
access_token = token["accessToken"]

jdbc_db = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("accessToken", access_token) \
        .option("encrypt", "true") \
        .option("hostNameInCertificate", "*.database.windows.net") \
        .load()

使用 Active Directory 密码的 Python 示例

jdbc_df = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", table_name) \
        .option("authentication", "ActiveDirectoryPassword") \
        .option("user", user_name) \
        .option("password", password) \
        .option("encrypt", "true") \
        .option("hostNameInCertificate", "*.database.windows.net") \
        .load()

必须安装必需的依赖项,才能使用 Active Directory 进行身份验证。

使用 ActiveDirectoryPassword 时,user 的格式应为 UPN 格式,例如 username@domainname.com

对于 Scala,将需要安装 _com.microsoft.aad.adal4j_ 项目。

对于 Python,将需要安装 _adal_ 库。 这可通过 pip 获得。

有关示例,请查看示例笔记本

支持

用于 Azure SQL 和 SQL Server 的 Apache Spark 连接器是一个开源项目。 此连接器不提供任何 Microsoft 支持。 有关连接器的疑问或问题,请在此项目存储库中创建问题。 连接器社区处于活动状态,并对提交情况进行监视。

后续步骤

请访问 SQL Spark 连接器 GitHub 存储库

有关隔离级别的信息,请参阅 SET TRANSACTION ISOLATION LEVEL (Transact-SQL)