您现在访问的是微软AZURE全球版技术文档网站,若需要访问由世纪互联运营的MICROSOFT AZURE中国区技术文档网站,请访问 https://docs.azure.cn.

将 Apache Spark 应用程序与 Azure 事件中心连接

本教程详细介绍如何将 Spark 应用程序连接到事件中心进行实时流式处理。 通过此集成,可以进行流式处理,而无需更改协议客户端,也无需运行你自己的 Kafka 或 Zookeeper 群集。 本教程需要 Apache Spark v2.4 及更高版本和 Apache Kafka v2.0 及更高版本。

备注

GitHub 上提供了此示例

本教程介绍如何执行下列操作:

  • 创建事件中心命名空间
  • 克隆示例项目
  • 运行 Spark
  • 从用于 Kafka 的事件中心读取
  • 写入到用于 Kafka 的事件中心

先决条件

开始本教程前,请确保具备:

备注

Spark-Kafka 适配器已更新,可以支持自 Spark v2.4 以来发布的 Kafka v2.0。 在以前版本的 Spark 中,此适配器支持 Kafka v0.10 及更高版本,但特别依赖于 Kafka v0.10 API。 由于适用于 Kafka 的事件中心不支持 Kafka v0.10,因此 v2.4 之前的 Spark 版本提供的 Spark-Kafka 适配器不受适用于 Kafka 生态系统的事件中心的支持。

创建事件中心命名空间

要从事件中心服务进行发送和接收,需要使用事件中心命名空间。 有关创建命名空间和事件中心的说明,请参阅创建事件中心。 获取事件中心连接字符串和完全限定域名 (FQDN) 供以后使用。 有关说明,请参阅获取事件中心连接字符串

克隆示例项目

克隆 Azure 事件中心存储库并导航到 tutorials/spark 子文件夹:

git clone https://github.com/Azure/azure-event-hubs-for-kafka.git
cd azure-event-hubs-for-kafka/tutorials/spark

从用于 Kafka 的事件中心读取

进行一些配置更改以后,即可从适用于 Kafka 的事件中心读取数据。 根据命名空间提供的详细信息更新 BOOTSTRAP_SERVERSEH_SASL 以后,即可使用事件中心进行流式处理,就像使用 Kafka 一样。 如需完整的示例代码,请查看 GitHub 上的 sparkConsumer.scala 文件。

//Read from your Event Hub!
val df = spark.readStream
    .format("kafka")
    .option("subscribe", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("kafka.request.timeout.ms", "60000")
    .option("kafka.session.timeout.ms", "30000")
    .option("kafka.group.id", GROUP_ID)
    .option("failOnDataLoss", "true")
    .load()

//Use dataframe like normal (in this example, write to console)
val df_write = df.writeStream
    .outputMode("append")
    .format("console")
    .start()

写入到用于 Kafka 的事件中心

也可向事件中心写入数据,所用方式与向 Kafka 写入数据一样。 请勿忘记更新配置,也就是使用事件中心命名空间的信息更改 BOOTSTRAP_SERVERSEH_SASL。 如需完整的示例代码,请查看 GitHub 上的 sparkProducer.scala 文件。

df = /**Dataframe**/

//Write to your Event Hub!
df.writeStream
    .format("kafka")
    .option("topic", TOPIC)
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS)
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", EH_SASL)
    .option("checkpointLocation", "./checkpoint")
    .start()

后续步骤

若要详细了解事件中心和适用于 Kafka 的事件中心,请参阅以下文章: