I closed out the earlier checkpointing PR #272, since it was an incorrect implementation. Before I raise a new PR, I wanted to put out a proposal and get as many reviews as possible, so as to have a sure footing to implement the feature. So here goes:
The motivation for Checkpointing is simple — we need to track the amount of data that has been processed in our streaming pipeline so that in case the stream has to restart, it can pick up from where it left off. Please note, that processed here is different from data only being read/consumed in the pipeline.
Spark follows the following heuristics for checkpointing:
- Checkpointing is mandatory for all streaming pipelines, as would be in any production environments. However, we can make checkpointing optional in streamz API, if need be.
- Whenever a batch of data is processed completely, and the results are written to a
sink
, Spark creates a new checkpoint for this batch with the latest offsets for all partitions for the topic(s) involved in the batch.
- Spark does not consider printing to console or the like as a
sink
. In case of streamz, the only valid sink for now is Kafka.
- The directory structure of checkpointing is as follows:
checkpoint_path/ #this is provided by the user
topic1/
1.txt #Checkpoint for the first batch
2.txt #Checkpoint for the second batch
.
.
topic2/
1.txt
.
- Now, if a stream is killed for some reason, and restarts, it would pick up reading the Kafka topic from the last checkpoint file created.
- Only the last few checkpoints are maintained, and the older ones are deleted. We can hard-code this number to something, for example, 100.
- If multiple streams join/union, Spark also takes care of the checkpoints for all topics that may be involved.
Implementation in streamz:
The following is one of the cleanest ways, IMO, to implement checkpointing.
- Reading a batch from Kafka starts off with
from_kafka_batched()
.
from_kafka_batched([topics], consumer_conf, checkpoint_path, ..):
consumer_conf[‘enable.auto.commit’] = False
for topic in topics:
Retrieve last created checkpoint for the topic from the checkpoint_path.
for partition in partitions:
emit (partition, low, high) tuples in stream for get_message_batch()
Note that, [topics]
parameters is a list of topic that the stream can now subscribe/consume to/from.
The checkpoint_path
parameter has to be specified by the user. In case the user does not want to use checkpointing, we can set the default to None
to use streamz as is.
get_message_batch()
consumes the messages from the specific partition between the low and high offsets specified in the tuples emitted in the stream.
get_message_batch(kafka_params, topic, partition, low, high, checkpoint_path, ..):
Consume messages
Create a dirty-bit file in the checkpoint_path — this is an indication that some data is yet to be checkpointed.
Create/update a write-ahead log (WAL) textfile for this topic partition with the latest offsets that have been read and are ready to be `checkpointed` if processing is successful.
The purpose of the dirty-bit is explained later on.
The WAL would be a text-file, #partition.txt
in a topic-level directory in the checkpoint path.
It would contain the offset for this topic partition that has last been consumed, and if processing goes through to finally write back to a sink, this the offset to be checkpointed.
The directory structure would now look like this:
checkpoint_path/
dirty_bit
write-ahead-logs/
topic1/
1.txt #Log for first partition of topic 1.
2.txt #Log for second partition of topic 1.
.
.
topic2/
1.txt
.
topic1/
1.txt #Checkpoint for the first batch
2.txt #Checkpoint for the second batch
.
.
topic2/
1.txt
.
- There would now be some stream-specific
processing
functions, e.g., map()
, etc.
- Finally, if we write back the results to Kafka using
to_kafka()
, this is where the checkpoint creation would take place.
to_kafka(checkpoint_path, output_topic, producer_conf):
... #The current operations within to_kafka() stay as is.
#Everything below will be done from a separate function add_checkpoint()
If no dirty-bit file, exit.
Delete dirty-bit file here.
new_checkpoint = {}
for topic in [topics from WAL]: (#We can make this multi-threaded)
for partition in WAL/topic path:
new_checkpoint[partition] = offset
Create checkpoint file for this batch for this topic at `checkpoint_path/topic`.
A checkpoint file would look like a JSON {partition:offset} dictionary:
checkpoint_path/topic1/1.txt
{0: 5, 1: 4, 2: 3}
Let's say we decide to keep only the last 100 checkpoints for each topic. Then, as soon as the 101st checkpoint is created, the 1st checkpoint 1.txt is deleted, and the 101.txt gets created.
This finishes the implementation.
Now, a few questions and their answers:
-
Since, to_kafka()
is called on every element of the stream, and if we are using Dask, all WAL for all partitions are created in parallel. After we have gather().to_kafka()
, the checkpoint is created on the first call to to_kafka()
itself. In this case, how do we avoid duplicate checkpoints being created? A maximum of #partitions duplicate checkpoints can be created.
Answer: This is where the dirty bit file comes into play. If new data is consumed, a dirty bit is set. to_kafka()
does not attempt to create a new checkpoint if the dirty bit is not set. After a checkpoint is created, the dirty bit is deleted by to_kafka()
.
-
The WAL and the checkpoints would be stored on the local file system for now?
Answer: Yes, we can support S3 in the future, like Spark.
-
In the future, if we have different sinks apart from Kafka, like SQS, Google Pub-Sub, ElasticSearch, do we need to implement checkpointing all over again?
Answer: No. When writing to a sink, one would just call the add_checkpoint()
function that we would create as part of this PR. The source would always be Kafka, though, for the foreseeable future.
-
We need to pass the checkpoint_path
to both from_kafka_batched()
and to_kafka()
?
Answer: Yes, that's unfortunate. I'd love to hear another way to do this for a cleaner API. But we need the path in from_kafka_batched()
to retrieve the last checkpoint, and in to_kafka()
to create a new checkpoint.
-
Would this work with Dask?
Answer: Absolutely. We would definitely want this to work with Dask!