你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
适用于 Apache Spark 的 Azure 数据资源管理器连接器
重要
此连接器可用于 Microsoft Fabric 中的 实时分析 。 请使用本文中的说明,但存在以下例外情况:
- 如果需要,请按照创建 KQL 数据库中的说明创建数据库。
- 如果需要,请使用创建空表中的说明 创建表。
- 使用复制 URI 中的说明获取查询或引入 URI。
- 在 KQL 查询集中运行查询。
Apache Spark 是用于大规模数据处理的统一分析引擎。 Azure 数据资源管理器是一个快速、完全托管的数据分析服务,可用于实时分析大量数据。
适用于 Spark 的 Azure 数据资源管理器连接器是可在任何 Spark 群集上运行的开源项目。 它实现了用于跨 Azure 数据资源管理器和 Spark 群集移动数据的数据源和数据接收器。 使用 Azure 数据资源管理器和 Apache Spark,可以构建面向数据驱动型方案的可缩放快速应用程序。 例如,机器学习 (ML)、提取-转换-加载 (ETL) 和 Log Analytics。 有了此连接器,Azure 数据资源管理器变成了标准 Spark 源和接收器操作(例如写入、读取和 writeStream)的有效数据存储。
可以通过排队引入或流式引入写入 Azure 数据资源管理器。 从 Azure 数据资源管理器中读取支持列删除和谓词下推,这可在 Azure 数据资源管理器中筛选数据,从而减少传输的数据量。
注意
有关使用适用于 Azure 数据资源管理器的 Synapse Spark 连接器的信息,请参阅使用适用于 Azure Synapse Analytics 的 Apache Spark 连接到 Azure 数据资源管理器。
本主题介绍了如何安装和配置 Azure 数据资源管理器 Spark 连接器,以及如何在 Azure 数据资源管理器与 Apache Spark 群集之间移动数据。
注意
尽管下面的某些示例提到了 Azure Databricks Spark 群集,但 Azure 数据资源管理器 Spark 连接器并不直接依赖于 Databricks 或任何其他 Spark 分发版。
先决条件
- Azure 订阅。 创建免费 Azure 帐户。
- Azure 数据资源管理器群集和数据库。 创建群集和数据库。
- Spark 群集
- 安装 Azure 数据资源管理器连接器库:
- 已安装 Maven 3.x
提示
Spark 2.3.x 版本也是受支持的,但可能需要在 pom.xml 依赖项中进行一些更改。
如何生成 Spark 连接器
从版本 2.3.0 开始,我们引入了新的项目 ID,替换 spark-kusto-connector:针对 Spark 3.x 和 Scala 2.12 的 kusto-spark_3.0_2.12,以及针对 Spark 2.4.x 和 scala 2.11 的 kusto-spark_2.4_2.11。
注意
2\.5.1 以前的版本无法再用于引入到某个现有表,请更新到更高的版本。 此步骤是可选的。 如果使用的是预生成库(例如 Maven),请参阅 Spark 群集设置。
生成先决条件
如果未使用预生成库,则需要安装依赖项中列出的库,包括以下 Kusto Java SDK 库。 若要查找要安装的正确版本,请查看相关版本的 pom:
参考此源来生成 Spark 连接器。
对于使用 Maven 项目定义的 Scala/Java 应用程序,请将应用程序链接到以下项目(在最新版本中可能不同):
<dependency> <groupId>com.microsoft.azure</groupId> <artifactId>kusto-spark_3.0_2.12</artifactId> <version>2.5.1</version> </dependency>
生成命令
生成 jar 并运行所有测试:
mvn clean package
生成 jar,运行所有测试,并将 jar 安装到本地 Maven 存储库:
mvn clean install
有关详细信息,请参阅连接器用法。
Spark 群集设置
注意
建议使用最新的 Azure 数据资源管理器 Spark 连接器版本执行以下步骤。
使用 Spark 2.4.4 和 Scala 2.11 或 Spark 3.0.1 和 Scala 2.12,基于 Azure Databricks 群集配置以下 Spark 群集设置:
从 Maven 安装最新 spark-kusto-connector 库:
验证是否已安装所有必需的库:
对于使用 JAR 文件的安装,请验证是否安装了其他依赖项:
身份验证
Azure 数据资源管理器 Spark 连接器允许使用以下方法之一使用Microsoft Entra ID 进行身份验证:
- Microsoft Entra应用程序
- Microsoft Entra访问令牌
- 设备身份验证(适用于非生产方案)
- Azure Key Vault 若要访问 Key Vault 资源,请安装 azure-keyvault 包并提供应用程序凭据。
Microsoft Entra应用程序身份验证
Microsoft Entra应用程序身份验证是最简单且最常见的身份验证方法,建议用于 Azure 数据资源管理器 Spark 连接器。
属性 | 选项字符串 | 说明 |
---|---|---|
KUSTO_AAD_APP_ID | kustoAadAppId | Microsoft Entra应用程序 (客户端) 标识符。 |
KUSTO_AAD_AUTHORITY_ID | kustoAadAuthorityID | Microsoft Entra身份验证机构。 Microsoft Entra Directory (租户) ID。 可选 - 默认为 microsoft.com。 有关详细信息,请参阅Microsoft Entra颁发机构。 |
KUSTO_AAD_APP_SECRET | kustoAadAppSecret | Microsoft Entra客户端的应用程序密钥。 |
注意
较旧的 API 版本(低于 2.0.0)有以下命名:“kustoAADClientID”、“kustoClientAADClientPassword”、“kustoAADAuthorityID”
Azure 数据资源管理器权限
必须在 Azure 数据资源管理器群集上授予以下权限:
- 若要读取 (数据源) ,Microsoft Entra标识必须对目标数据库具有查看者权限,或对目标表具有管理员权限。
- 若要写入 (数据接收器) ,Microsoft Entra标识必须对目标数据库具有引入器特权。 此外,它必须对目标数据库拥有“用户”特权,这样才能创建新表。 如果目标表已存在,则必须配置对目标表的“管理员”权限。
有关 Azure 数据资源管理器主体角色的详细信息,请参阅基于角色的访问控制。 有关如何管理安全角色,请参阅安全角色管理。
Spark 接收器:写入 Azure 数据资源管理器
设置接收器参数:
val KustoSparkTestAppId = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppId") val KustoSparkTestAppKey = dbutils.secrets.get(scope = "KustoDemos", key = "KustoSparkTestAppKey") val appId = KustoSparkTestAppId val appKey = KustoSparkTestAppKey val authorityId = "72f988bf-86f1-41af-91ab-2d7cd011db47" // Optional - defaults to microsoft.com val cluster = "Sparktest.eastus2" val database = "TestDb" val table = "StringAndIntTable"
将 Spark 数据帧分批写入 Azure 数据资源管理器群集:
import com.microsoft.kusto.spark.datasink.KustoSinkOptions import org.apache.spark.sql.{SaveMode, SparkSession} df.write .format("com.microsoft.kusto.spark.datasource") .option(KustoSinkOptions.KUSTO_CLUSTER, cluster) .option(KustoSinkOptions.KUSTO_DATABASE, database) .option(KustoSinkOptions.KUSTO_TABLE, "Demo3_spark") .option(KustoSinkOptions.KUSTO_AAD_APP_ID, appId) .option(KustoSinkOptions.KUSTO_AAD_APP_SECRET, appKey) .option(KustoSinkOptions.KUSTO_AAD_AUTHORITY_ID, authorityId) .option(KustoSinkOptions.KUSTO_TABLE_CREATE_OPTIONS, "CreateIfNotExist") .mode(SaveMode.Append) .save()
或者使用简化的语法:
import com.microsoft.kusto.spark.datasink.SparkIngestionProperties import com.microsoft.kusto.spark.sql.extension.SparkExtension._ val sparkIngestionProperties = Some(new SparkIngestionProperties()) // Optional, use None if not needed df.write.kusto(cluster, database, table, conf, sparkIngestionProperties)
写入流数据:
import org.apache.spark.sql.streaming.Trigger import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit import org.apache.spark.sql.streaming.Trigger // Set up a checkpoint and disable codeGen. spark.conf.set("spark.sql.streaming.checkpointLocation", "/FileStore/temp/checkpoint") // Write to a Kusto table from a streaming source val kustoQ = df .writeStream .format("com.microsoft.kusto.spark.datasink.KustoSinkProvider") .options(conf) .trigger(Trigger.ProcessingTime(TimeUnit.SECONDS.toMillis(10))) // Sync this with the ingestionBatching policy of the database .start()
Spark 源:从 Azure 数据资源管理器读取数据
读取少量数据时,可以定义数据查询:
import com.microsoft.kusto.spark.datasource.KustoSourceOptions import org.apache.spark.SparkConf import org.apache.spark.sql._ import com.microsoft.azure.kusto.data.ClientRequestProperties val query = s"$table | where (ColB % 1000 == 0) | distinct ColA" val conf: Map[String, String] = Map( KustoSourceOptions.KUSTO_AAD_APP_ID -> appId, KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey ) val df = spark.read.format("com.microsoft.kusto.spark.datasource"). options(conf). option(KustoSourceOptions.KUSTO_QUERY, query). option(KustoSourceOptions.KUSTO_DATABASE, database). option(KustoSourceOptions.KUSTO_CLUSTER, cluster). load() // Simplified syntax flavor import com.microsoft.kusto.spark.sql.extension.SparkExtension._ val cpr: Option[ClientRequestProperties] = None // Optional val df2 = spark.read.kusto(cluster, database, query, conf, cpr) display(df2)
可选:如果你提供暂时性 Blob 存储(而不是 Azure 数据资源管理器),则由调用方负责创建 Blob。 这包括预配存储、轮换访问密钥以及删除暂时性项目。 KustoBlobStorageUtils 模块包含用于以下用途的帮助程序函数:基于帐户和容器坐标以及帐户凭据(或基于具有写入、读取和列出权限的完整 SAS URL)删除 Blob。 当不再需要相应的 RDD 时,每个事务会将暂时性 blob 项目存储在一个单独的目录中。 此目录是作为 Spark 驱动程序节点上报告的读取事务信息日志的一部分捕获的。
// Use either container/account-key/account name, or container SaS val container = dbutils.secrets.get(scope = "KustoDemos", key = "blobContainer") val storageAccountKey = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountKey") val storageAccountName = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageAccountName") // val storageSas = dbutils.secrets.get(scope = "KustoDemos", key = "blobStorageSasUrl")
在上面的示例中,无法使用连接器接口访问 Key Vault;使用了一种更简单的方法,即使用 Databricks 机密。
从 Azure 数据资源管理器进行读取。
如果你提供暂时性 blob 存储,请如下所示从 Azure 数据资源管理器进行读取:
val conf3 = Map( KustoSourceOptions.KUSTO_AAD_APP_ID -> appId, KustoSourceOptions.KUSTO_AAD_APP_SECRET -> appKey KustoSourceOptions.KUSTO_BLOB_STORAGE_SAS_URL -> storageSas) val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3) val dfFiltered = df2 .where(df2.col("ColA").startsWith("row-2")) .filter("ColB > 12") .filter("ColB <= 21") .select("ColA") display(dfFiltered)
如果 Azure 数据资源管理器提供暂时性 blob 存储,请如下所示从 Azure 数据资源管理器进行读取:
val conf3 = Map( KustoSourceOptions.KUSTO_AAD_CLIENT_ID -> appId, KustoSourceOptions.KUSTO_AAD_CLIENT_PASSWORD -> appKey) val df2 = spark.read.kusto(cluster, database, "ReallyBigTable", conf3) val dfFiltered = df2 .where(df2.col("ColA").startsWith("row-2")) .filter("ColB > 12") .filter("ColB <= 21") .select("ColA") display(dfFiltered)
相关内容
反馈
https://aka.ms/ContentUserFeedback。
即将发布:在整个 2024 年,我们将逐步淘汰作为内容反馈机制的“GitHub 问题”,并将其取代为新的反馈系统。 有关详细信息,请参阅:提交和查看相关反馈