Examples
- Cassandra Scala example
- Azure SQL Data Warehouse Python example
- Stream-stream join Python and Scala notebooks
Write to Cassandra using foreachBatch()
in Scala
streamingDF.writeStream.foreachBatch()
allows you to reuse existing batch data writers to write the
output of a streaming query to Cassandra. The following notebook shows this by using the Spark Cassandra connector
from Scala to write the key-value output of an aggregation query to Cassandra.
See the foreachBatch documentation for details.
To run this example, you need to install the appropriate Cassandra Spark connector for your Spark version as a Maven library.
In this example, we create a table, and then start a Structured Streaming query to write to that table.
We then use foreachBatch()
to write the streaming output using a batch DataFrame connector.
import org.apache.spark.sql._
import org.apache.spark.sql.cassandra._
import com.datastax.spark.connector.cql.CassandraConnectorConf
import com.datastax.spark.connector.rdd.ReadConf
import com.datastax.spark.connector._
val host = "<ip address>"
val clusterName = "<cluster name>"
val keyspace = "<keyspace>"
val tableName = "<tableName>"
spark.setCassandraConf(clusterName, CassandraConnectorConf.ConnectionHostParam.option(host))
spark.readStream.format("rate").load()
.selectExpr("value % 10 as key")
.groupBy("key")
.count()
.toDF("key", "value")
.writeStream
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.write // Use Cassandra batch data source to write streaming out
.cassandraFormat(tableName, keyspace)
.option("cluster", clusterName)
.mode("append")
.save()
}
.outputMode("update")
.start()
Write to Azure SQL Data Warehouse using foreachBatch()
in Python
streamingDF.writeStream.foreachBatch()
allows you to reuse existing batch data writers to write the
output of a streaming query to Azure SQL Data Warehouse. See the foreachBatch documentation for details.
To run this example, you need the Azure SQL Data Warehouse connector. For details on the Azure SQL Data Warehouse connector, see Azure SQL Data Warehouse.
from pyspark.sql.functions import *
from pyspark.sql import *
def writeToSQLWarehouse(df, epochId):
df.write \
.format("com.databricks.spark.sqldw") \
.mode('overwrite') \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("forward_spark_azure_storage_credentials", "true") \
.option("dbtable", "my_table_in_dw_copy") \
.option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
.save()
spark.conf.set("spark.sql.shuffle.partitions", "1")
query = (
spark.readStream.format("rate").load()
.selectExpr("value % 10 as key")
.groupBy("key")
.count()
.toDF("key", "count")
.writeStream
.foreachBatch(writeToSQLWarehouse)
.outputMode("update")
.start()
)
Stream-Stream Joins
These two notebooks show how to use stream-stream joins in Python and Scala.
Stream-Stream joins Python notebook
Stream-Stream joins Scala notebook
Feedback
Loading feedback...