question

SulemanButt-6524 avatar image
0 Votes"
SulemanButt-6524 asked ·

Azure Databricks kafka consumer facing connection issues with trying to connect with AWS Kafka Broker

I followed this guide for kafka consumer setup:

https://docs.databricks.com/spark/latest/structured-streaming/kafka.html#using-ssl

 var kafka = 
   spark.readStream
     .format("kafka")
     .option("kafka.bootstrap.servers", "kfk.awseucxx.xxx.xxx.xxx:9093")
     .option("subscribe", "dat.yyy.xxx.incoming.json")     
     .option("startingOffsets", "latest")  
     .option("minPartitions", "10")  
     .option("failOnDataLoss", "true")
     .option("kafka.ssl.truststore.location", "/mnt/devkafka/non-prod_xxxx-np-dp_kafka_client_truststore.jks")
     .option("kafka.ssl.keystore.location", "/mnt/devkafka/non-prod_xxx_kafka_client_keystore.jks")
     .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope="key-vault-kafka-dev-secrets",key="KAFKA-DEV-KEYxxx-PWD"))
     .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope="key-vault-kafka-dev-secrets",key="KAFKA-DEV-TRUSTyyy-PWD"))
     .load()
    
    
 kafka:org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5 more fields]
 kafka: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5 more fields]
 Command took 1.18 seconds -- by giinq@xxx.com at 2/19/2021, 10:52:44 AM on xxx_db_cluste


 // split lines by whitespace and explode the array as rows of `word`
 val df = kafka.select(explode(split($"value".cast("string"), "\\s+")).as("word"))
   .groupBy($"word")
   .count
    
    
 df:org.apache.spark.sql.DataFrame = [word: string, count: long]
    
 df: org.apache.spark.sql.DataFrame = [word: string, count: bigint] 


 // follow the word counts as it updates
 display(df.select($"word", $"count"))
    
 Cancel
 Running command...
 Getting offsets from KafkaV2[Subscribe[dat.yyy.xxx.incoming.json]]
 Stream initializing...


Keeps running, but no response!

I then slightly changed the configurations:

 import org.apache.spark.sql.functions.{explode, split}
 var kafka = 
   spark.readStream
     .format("kafka")
     .option("kafka.bootstrap.servers", "kfk.awseucxx.xxx.xxx.xxx:9093")
     .option("kafka.security.protocol", "SSL")
     .option("kafka.ssl.truststore.type", "JKS")
     .option("kafka.ssl.keystore.type", "JKS")
     .option("subscribe", "dat.yyy.xxx.incoming.json")     
     .option("startingOffsets", "latest")  
     .option("minPartitions", "10")  
     .option("failOnDataLoss", "true")
     .option("kafka.ssl.truststore.location", "/mnt/devkafka/non-prod_xxxx-np-dp_kafka_client_truststore.jks")
     .option("kafka.ssl.keystore.location", "/mnt/devkafka/non-prod_xxx_kafka_client_keystore.jks")
     .option("kafka.ssl.keystore.password", dbutils.secrets.get(scope="key-vault-kafka-dev-secrets",key="KAFKA-DEV-KEYxxx-PWD"))
     .option("kafka.ssl.truststore.password", dbutils.secrets.get(scope="key-vault-kafka-dev-secrets",key="KAFKA-DEV-TRUSTyyy-PWD"))
     .load()

But then on my running this:

 // follow the word counts as it updates
 display(df.select($"word", $"count"))

I get this error:

 Stream is inactive
    
 Stream stopped...
    
 kafkashaded.org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    
  at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:820)
  at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:631)
  at kafkashaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:612)
  at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrate
 ....
    
 Caused by: kafkashaded.org.apache.kafka.common.KafkaException: 
 kafkashaded.org.apache.kafka.common.KafkaException: 
 kafkashaded.org.apache.kafka.common.KafkaException: Failed to load SSL 
 keystore 
 dbfs:/mnt/devkafka/non-prod_20201014152336_xxxkafka_client_keystore.jks
  of type JKS

In my opinion, the mounted path looks good:

 dbutils.fs.mount(
   source = "wasbs://container-xxx@xxxtalaketest.blob.core.windows.net",
   mountPoint = "/mnt/devkafka",
   extraConfigs = Map("fs.azure.account.key.xxx.blob.core.windows.net" -> dbutils.secrets.get(scope = "key-vault-kafkaxxx-dev-secrets", key = "storage-account-access-key-xxx")))
    
 // scala
 val df = spark.read.text("/mnt/devkafka/non-prod_20201014152336_xxxx_client_truststore.jks")
 val df2 = spark.read.text("/mnt/devkafka/non-prod_20201014152336_xxx_keystore.jks")
    
    
 dbutils.fs.ls("/mnt/devkafka/")
    
 val df3 = dbutils.secrets.get(scope="keyxxx-vault-kafka-dev-secrets",key="KAFKAxxxx-DEV-TRUSTxxx-PWD")
    
    
 //display(df)
 //display(df2)
 //display(df3)
    
 df:org.apache.spark.sql.DataFrame = [value: string]
 df2:org.apache.spark.sql.DataFrame = [value: string]
 df: org.apache.spark.sql.DataFrame = [value: string]
 df2: org.apache.spark.sql.DataFrame = [value: string]
    
 res13: Seq[com.databricks.backend.daemon.dbutils.FileInfo] = 
 WrappedArray(FileInfo(dbfs:/mnt/devkafka/non-prod_20201014152336_xxxkafka_client_keystore.jks,
  non-prod_20201014152336_xxxx_kafka_client_keystore.jks, 3813), 
 FileInfo(dbfs:/mnt/devkafka/non-prod_20201014152336_xxx_kafka_client_truststore.jks,
  non-prod_20201014152336_xxx_kafka_client_truststore.jks, 2898))
    
 df3: String = [REDACTED] 

So I am kind of confused what is causing the consumer not able to read from the kafka stream.

As far as the kafka topic containing data for consumption is concerned, there is data in the topic’s partition, I have checked in the Kafka broker.

The driver_Logs in Databricks cluster always shows:

 source-5edcbbb1-6d6f-4f90-a01f-e050d90f1acf--1925148407-driver-0] Bootstrap broker kfk.awseuc1.xxx.xxx.xxx:9093 (id: -1 rack: null) disconnected 21/02/19 10:33:11 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-5edcbbb1-6d6f-4f90-a01f-e050d90f1acf--1925148407-driver-0-4, groupId=spark-kafka-source-5edcbbb1-6d6f-4f90-a01f-e050d90f1acf--1925148407-driver-0] Bootstrap broker kfk.awseuc1.xxx.xxx.xxx:9093 (id: -1 rack: null) disconnected

I am running out of options in terms of addressing this problem!

Any hint, where could be the problem and what further adjustments/investigation i can do in order to fix this?

Many thanks.

azure-databricks
· 4
10 |1000 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 3.0 MiB each and 30.0 MiB total.

Hello @SulemanButt-6524,

Thanks for the ask and also for using the Microsoft Q&A forum.

For a deeper investigation and immediate assistance on this issue, if you have a support plan you may file a support ticket. In case if you don't have a support plan, I will enable a one-time free support request for your subscription. Please do let us know.

0 Votes 0 ·
SulemanButt-6524 avatar image SulemanButt-6524 PRADEEPCHEEKATLA-MSFT ·

i have created a support ticket through the portal now.

Thanks.

0 Votes 0 ·

Thanks @SulemanButt-6524

Can you share the support ticket here ? We will also keep an eye on the same from our side .

Thanks
Himanshu

0 Votes 0 ·
Show more comments

0 Answers