Continuous Processing is Apache Spark’s new Execution engine that allows very low latency(in milliseconds) event at a time processing. In this blog, we are going to do an early peek at this still experimental feature in Apache Spark that is going to be available in version 2.3. I am going to assume that you are already familiar with Spark’s micro-batch based execution engine. If you are not, do read my previous blog post here. The code used in this blog post is available in my Github repo

From MicroBatch to ContinuousProcessing

Apache Spark has been providing stream processing capabilities via micro-batching all this while, the main disadvantage of this approach is that each task/micro-batch has to be collected and scheduled at regular intervals, through which the best(minimum) latency that Spark could provide is around 1 second. There was no concept of a single event/message processing. Continuous processing is Spark’s attempt to overcome this limitations to provide stream processing with very low latencies.

To enable this features, Spark had to make two major changes in its underlying code.

  1. Create new sources and sinks that could read message continuously(instead of micro-batch) - called DataSourceV2.
  2. Create a new execution engine called - ContinuousProcessing which uses ContinuousTrigger and launch long runnings tasks using DataSourceV2.

DataSourceV2

DataSourceV2 has the ability read/write record at a time. For example, the KafkaSource has get() and next() methods to read each record, instead of the getBatch() method in V1. (Note: even though the records are read one at a time, there is still some buffering done at the KafkaConsumer)

KafkaSink runs continuously waiting for new records to be committed to the topic and writes/commits record at a time.

Available Sources

Readers supported right now are

  • KafkaSource(short name kafka)
  • RateSource(short name rate) - for testing purpose only

Writers supported right now are

  • KafkaSink
  • ConsoleSink - for testing purpose only
  • MemorySink - for testing purpose only

Custom Source/Sink

It is not very difficult to create your own reader/writer. I have an example of a source - NetcatContinuousReader and an application that uses this source in my Github.

ContinuousExecution Engine

This is the second major change that allows low latency processing in Spark. A ContinuousExeuction engine is chosen as the StreamExecution when the trigger set is ContinuousTrigger (also the source and sink should be of the type DataSourceV2). The operations supported by this engine are limited for now, it supports mainly Map, Filter, and Project. Aggregation operations, joins, windowing etc are not supported. The idea behind this is that for such operations we need to wait for sometime to collect the data, and in those use cases, the Micro-Batch based engine should suffice. The use cases that require very low latency(in milliseconds) are the ones that fit this model.

Example

If you are already familiar with Spark’s Structured Streaming API, the only change that needs to be made is in the Trigger - set the trigger as ContinuousTrigger. I will be trying to convert the code written as part of my previous blog to use ContinuousProcessing. As a first step I will set the trigger as ContinuousTrigger, rest of the code will remain same.

 val writeToKafka = aggregates
    .selectExpr("CAST(carId AS STRING) AS key", "CAST(speed AS STRING) AS value")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers","localhost:9092")
    .option("topic", "fastcars")
    .option("checkpointLocation", "/tmp/sparkcheckpoint/")
    .queryName("kafka spark streaming kafka")
    .outputMode("update")
    .trigger(Trigger.Continuous("10 seconds")) //10 seconds is the checkpoint interval.
    .start()

This caused an exception, org.apache.spark.sql.AnalysisException - Continuous processing does not support EventTimeWatermark operations. Watermarks are not supported in ContinuousProcessing since that involves collecting data. So we will remove withWatermark(“timestamp”, “3 seconds”) from the code.

Now the application threw another exception Continuous processing does not support Aggregate operations. As I mentioned earlier, Spark expects you to use micro-batch based processing if you need to do aggregations, since this involves waiting for data to arrive. Removing the code related to avg, groupBy and window fixes the problem and the application runs. The modified application code is here.

Note: ContinuousTrigger internally uses a ProcessingTimeExecutor(same as ProcessingTime trigger). But this does not have any effect on how often the data is processed since the tasks are already launched and is continuously processing the data.

Conclusion

ContinuousExecution provides us the ability to do very low latency processing but is limited in what we can we can do. This can change in near future since this is a new feature and is in the experimental stage. Hope you liked the post and as always thanks for reading.

Continue reading