Spark Structured streaming enriched from Cassandra.

We are streaming the data from Kafka with structured streaming
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("enable.auto.commit", false)
.option("auto.offset.reset", "earliest")
.option("group.id", UUID.randomUUID().toString)
.option("subscribe", "test")
.load()
After that we are trying to join it with a Cassandra table
val d = df.select(from_json(col("value").cast("string"), schema).cast("string").alias("path"))
.rdd.joinWithCassandraTable(String, String, String), SomeColumns("path"))
.toDS()
.writeStream
.format("console") // <-- use ConsoleSink
.option("truncate", false)
.option("numRows", 15)
.trigger(Trigger.ProcessingTime(6 seconds))
.queryName("rate-console")
.start
.awaitTermination()
but getting below error

Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)

Please help us.

Tagged:
Sign In or Register to comment.