Real-time stream processing for python

Overview

Streamz

Build Status Documentation Status Version Status RAPIDS custreamz gpuCI

Streamz helps you build pipelines to manage continuous streams of data. It is simple to use in simple cases, but also supports complex pipelines that involve branching, joining, flow control, feedback, back pressure, and so on.

Optionally, Streamz can also work with both Pandas and cuDF dataframes, to provide sensible streaming operations on continuous tabular data.

To learn more about how to use Streamz see documentation at streamz.readthedocs.org.

LICENSE

BSD-3 Clause

Comments
  • Improve kafka source

    Improve kafka source

    Fixes #143

    The idea is that dynamic polling should always be marked as async, and latch on to any IOLoop going, or the one passed.

    I hope this fixes the jupyter display problem - it certainly works from print, although that has the problem of the actual output appearing wherever the latest code was executed.

    opened by martindurant 80
  • Proposal to add Checkpointing to streamz.

    Proposal to add Checkpointing to streamz.

    I closed out the earlier checkpointing PR #272, since it was an incorrect implementation. Before I raise a new PR, I wanted to put out a proposal and get as many reviews as possible, so as to have a sure footing to implement the feature. So here goes:

    The motivation for Checkpointing is simple — we need to track the amount of data that has been processed in our streaming pipeline so that in case the stream has to restart, it can pick up from where it left off. Please note, that processed here is different from data only being read/consumed in the pipeline.

    Spark follows the following heuristics for checkpointing:

    1. Checkpointing is mandatory for all streaming pipelines, as would be in any production environments. However, we can make checkpointing optional in streamz API, if need be.
    2. Whenever a batch of data is processed completely, and the results are written to a sink, Spark creates a new checkpoint for this batch with the latest offsets for all partitions for the topic(s) involved in the batch.
    3. Spark does not consider printing to console or the like as a sink. In case of streamz, the only valid sink for now is Kafka.
    4. The directory structure of checkpointing is as follows:
    checkpoint_path/ #this is provided by the user 
          topic1/
                        1.txt #Checkpoint for the first batch
                        2.txt #Checkpoint for the second batch 
                        .
                        .
          topic2/
                        1.txt
                        .
    
    1. Now, if a stream is killed for some reason, and restarts, it would pick up reading the Kafka topic from the last checkpoint file created.
    2. Only the last few checkpoints are maintained, and the older ones are deleted. We can hard-code this number to something, for example, 100.
    3. If multiple streams join/union, Spark also takes care of the checkpoints for all topics that may be involved.

    Implementation in streamz: The following is one of the cleanest ways, IMO, to implement checkpointing.

    1. Reading a batch from Kafka starts off with from_kafka_batched().
    from_kafka_batched([topics], consumer_conf, checkpoint_path, ..):	
    	consumer_conf[‘enable.auto.commit’] = False
    	for topic in topics:
    		Retrieve last created checkpoint for the topic from the checkpoint_path.
     		for partition in partitions:
    			emit (partition, low, high) tuples in stream for get_message_batch()
    

    Note that, [topics] parameters is a list of topic that the stream can now subscribe/consume to/from. The checkpoint_path parameter has to be specified by the user. In case the user does not want to use checkpointing, we can set the default to None to use streamz as is.

    1. get_message_batch() consumes the messages from the specific partition between the low and high offsets specified in the tuples emitted in the stream.
    get_message_batch(kafka_params, topic, partition, low, high, checkpoint_path, ..):
    	Consume messages
            Create a dirty-bit file in the checkpoint_path — this is an indication that some data is yet to be checkpointed.
    	Create/update a write-ahead log (WAL) textfile for this topic partition with the latest offsets that have been read and are ready to be `checkpointed` if processing is successful.      
    

    The purpose of the dirty-bit is explained later on. The WAL would be a text-file, #partition.txt in a topic-level directory in the checkpoint path. It would contain the offset for this topic partition that has last been consumed, and if processing goes through to finally write back to a sink, this the offset to be checkpointed.

    The directory structure would now look like this:

    checkpoint_path/
          dirty_bit
          write-ahead-logs/
               topic1/
                        1.txt #Log for first partition of topic 1.
                        2.txt #Log for second partition of topic 1. 
                        .
                        .
               topic2/
                        1.txt
                        .
          topic1/
                        1.txt #Checkpoint for the first batch
                        2.txt #Checkpoint for the second batch 
                        .
                        .
          topic2/
                        1.txt
                        .
    
    1. There would now be some stream-specific processing functions, e.g., map(), etc.
    2. Finally, if we write back the results to Kafka using to_kafka(), this is where the checkpoint creation would take place.
    to_kafka(checkpoint_path, output_topic, producer_conf):
           ... #The current operations within to_kafka() stay as is.
           
           #Everything below will be done from a separate function add_checkpoint()
           If no dirty-bit file, exit.
           Delete dirty-bit file here.
            new_checkpoint = {}
    	for topic in [topics from WAL]: (#We can make this multi-threaded)
    		for partition in WAL/topic path:
                          new_checkpoint[partition] = offset
                    Create checkpoint file for this batch for this topic at `checkpoint_path/topic`. 
    

    A checkpoint file would look like a JSON {partition:offset} dictionary:

    checkpoint_path/topic1/1.txt
    {0: 5, 1: 4, 2: 3}
    

    Let's say we decide to keep only the last 100 checkpoints for each topic. Then, as soon as the 101st checkpoint is created, the 1st checkpoint 1.txt is deleted, and the 101.txt gets created.

    This finishes the implementation.

    Now, a few questions and their answers:

    1. Since, to_kafka() is called on every element of the stream, and if we are using Dask, all WAL for all partitions are created in parallel. After we have gather().to_kafka(), the checkpoint is created on the first call to to_kafka() itself. In this case, how do we avoid duplicate checkpoints being created? A maximum of #partitions duplicate checkpoints can be created. Answer: This is where the dirty bit file comes into play. If new data is consumed, a dirty bit is set. to_kafka() does not attempt to create a new checkpoint if the dirty bit is not set. After a checkpoint is created, the dirty bit is deleted by to_kafka().

    2. The WAL and the checkpoints would be stored on the local file system for now? Answer: Yes, we can support S3 in the future, like Spark.

    3. In the future, if we have different sinks apart from Kafka, like SQS, Google Pub-Sub, ElasticSearch, do we need to implement checkpointing all over again? Answer: No. When writing to a sink, one would just call the add_checkpoint() function that we would create as part of this PR. The source would always be Kafka, though, for the foreseeable future.

    4. We need to pass the checkpoint_path to both from_kafka_batched() and to_kafka()? Answer: Yes, that's unfortunate. I'd love to hear another way to do this for a cleaner API. But we need the path in from_kafka_batched() to retrieve the last checkpoint, and in to_kafka() to create a new checkpoint.

    5. Would this work with Dask? Answer: Absolutely. We would definitely want this to work with Dask!

    opened by chinmaychandak 48
  • Added the to_kafka stream

    Added the to_kafka stream

    I have added the to_kafka class.

    The options were to either:

    1. Call the .flush() method after each .produce(...) call. This will seriously hurt performance as messages will be sent serially.
    2. Use atexit to call flush at the end. This requires the use of atexit, of which I'm unsure of any impact. It also means that the result of the .emit(...) call by the client does not yield an immediate call to the downstream. Note that the Confluent library will flush when the batch size reaches a threshold. This is the method that is implemented here.
    opened by jsmaupin 47
  • Add HoloViews based plotting API

    Add HoloViews based plotting API

    Adds handling for bar plots on Series and Seriess objects. Also fixes Seriess.plot.line, Seriess.plot.scatter and Seriess.plot.area, ensuring the index is reset and therefore made visible to HoloViews.

    opened by philippjfr 39
  • A potential approach for checking if a datum is done

    A potential approach for checking if a datum is done

    One way to think about check pointing is knowing if a piece of data has completely exited the pipeline, it is no longer being processed or being held in a cache.

    Backpressure can help out here, since we can know if the pipeline has returned control up the call stack.

    I think there are three ways that control is returned:

    1. All computations have finished, the data has left through a sink
    2. Some computations have finished, the data was dropped by a filter
    3. Some computations have finished, the data was stored in a cache waiting for something

    1 and 2 are handled by backpressure nicely, since if all the computations have finished then the data is done.

    3 is a bit more tricky since we need to track the cached values.

    Potential implementation

    def _emit(..., metadata={'reference': RefCounter}):
        RefCounter += len(self.downstreams)
        for downstream in downstream:
            downstream.update(x)
            RefCounter -= 1
    

    Caching nodes

    def update(...):
        cache.append(x)
        RefCounter += 1
    

    When the RefCounter hits zero the data is done.

    Note that this would need to merge the refcounting metadata when joining nodes. For instance combine_latest would need to propagate the ref counters for all incoming data sources, and all downstream data consumers would need to increment/decrement for all the ref counters.

    We'd also need to make the refcounters at data ingestion time.

    opened by CJ-Wright 35
  • A simple plugin system

    A simple plugin system

    Right now, if I need to add my own custom stream nodes, I have to do this:

    import mypackage.streamz_extras  # noqa: F401
    

    It would be nice to have a way to distribute additional functionality as separate packages that can just be installed via pip. This can be done with entry_points, similar to the way it works in airflow. This is a super bare-bones implementation of this mechanism. Check out https://github.com/roveo/streamz_example_plugin for an example of a plugin.

    Problems:

    • this should be tested, but I have no idea how, short of shipping the code for example plugin with tests
    • plugged-in classes should be checked for validity in some way. I can add a simple check e.g. isinstance(plugin, Stream), but there is probably something else I haven't thought of
    opened by roveo 29
  • Add refresh_cycles parameter to from_kafka_batched to accommodate adding Kafka topic partitions on the fly.

    Add refresh_cycles parameter to from_kafka_batched to accommodate adding Kafka topic partitions on the fly.

    Streamz should be able to handle addition of Kafka topic partitions on the fly. The refresh_cycles parameter polls Kafka every N cycles to get the latest Kafka partition metadata and accommodates any added partitions on the fly.

    [Some more discussion in #358]

    opened by chinmaychandak 29
  • combine_latest emit_on behavior

    combine_latest emit_on behavior

    Should we buffer the combine_latest emit_on stream?

    Context

    Consider the following situation, we have two streams a and b. We are going to use combine_latest to combine them, with an emit only on a. 5 entries come down from a then one comes down from b then one from a. Due to the lack of data from b none of the 5 initial a entries have been emitted since the b was missing. Now that we have b data do we expect it to emit 6 times (which would require the buffering of the emit_on stream or only once (which may violate the idea of the emit_on stream always emitting)?

    @mrocklin @ordirules @danielballan

    opened by CJ-Wright 26
  • Added checkpointing

    Added checkpointing

    This works by creating a future at the source for each batch. The futures are passed down the pipeline with the data. Each type of function does something different depending on that function's operation. Some functions will mark a future as down with 'drop' to tell the source not to commit the offsets. Others, such as map will pass the future downstream without affecting it.

    opened by jsmaupin 24
  • Connect streams

    Connect streams

    Suggested resolution to #44 . It makes a big difference when visualizing graphs and I'm using it now, so I thought I'd already go ahead and send it. Please let me know if there are better ways.

    From issue, changing from s2.map(s1.emit) to s2.connect(s1) leads to difference of the two figures:

    s2.map(s1.emit) : http://imgur.com/a/0Z0JA

    s2.connect(s1) : http://imgur.com/a/rcrX0

    opened by jrmlhermitte 24
  • WIP - Batches, dispatch, and nesting

    WIP - Batches, dispatch, and nesting

    OK, this is a work-in-progress PR that shows some ability to nest different kinds of elements in a stream. This PR does a few things to enable this:

    1. We make a Batch type that is a tuple of many elements as a single batch in a stream
    2. We create a singledispatch system to replace the monkeypatching that we used to do on Futures
    3. We do a bit of magic so that the functions we pass down to elements again checks whatever it runs on and possible nests further.

    As a result, we can do the following example, which maps a function over remote batches:

    In [1]: from dask.distributed import Client
    
    In [2]: client = Client()
    
    In [3]: from streams import Stream, Batch
    
    In [4]: import streams.dask
    
    In [5]: source = Stream()
    
    In [6]: batches = source.partition(3).map(Batch)
    
    In [7]: futures = streams.dask.scatter(batches)
    
    In [8]: L = futures.map(lambda x: x + 1).sink_to_list()
    
    In [9]: for i in range(9):
       ...:     source.emit(i)
       ...:     
    
    In [10]: L
    Out[10]: 
    [<Future: status: finished, type: Batch, key: _stream_map-06488d54c44f3a3db136e6c34468eb13>,
     <Future: status: finished, type: Batch, key: _stream_map-cad3989e05761099bf5b22db5992eed3>,
     <Future: status: finished, type: Batch, key: _stream_map-ecd902c4b2cd3a96cb3c73694d64cfe6>]
    
    In [11]: client.gather(L)
    Out[11]: [(1, 2, 3), (4, 5, 6), (7, 8, 9)]
    

    Note that things like accumulate (which will have to be changed) and merge (which will have to be split) don't work yet. I wanted to get feedback from @ordirules and @danielballan .

    opened by mrocklin 23
  • visualizing streams and changing variables during runtime

    visualizing streams and changing variables during runtime

    Does anyone have any suggestions or experience generating a real-time visualization of streams and the events flowing through the pipeline?

    After I get a real time viz going I want to be able to select a stream/operator/edge between operators and modify the state. For example temporarily disconnect one stream from another, or change a variable that is currently static such as the condition in a filter operator.

    opened by OpenCoderX 2
  • Dropping `pkg_resources`

    Dropping `pkg_resources`

    Currently this uses pkg_resources, which is planned to go away:

    https://github.com/python-streamz/streamz/blob/b73a8c4c5be35ff1dae220daaefbfd2bfa58e0a1/streamz/plugins.py#L3

    The code using it mainly uses the iter_entry_points function:

    https://github.com/python-streamz/streamz/blob/b73a8c4c5be35ff1dae220daaefbfd2bfa58e0a1/streamz/plugins.py#L17-L22

    This could be replaced by importlib.metadata.entry_points, which has been around since Python 3.8.

    opened by jakirkham 0
  • How to parametrize stream/pipeline creation?

    How to parametrize stream/pipeline creation?

    Hi, thanks for the great project!

    What I'm trying to achieve is to create a custom builder interface/wrapper around streamz to have predefined building blocks of preconfigured streams which can be used together. Example

    
    def _custom_stream_builder_1(input_stream):
        return input_stream.accumulate(_custom_acc, returns_state=True, start=_custom_state)
    
    
    def _custom_stream_builder_2(input_stream):
        return input_stream.filter(lambda event: _custom_condition)
    
    
    def _custom_stream_builder_3(input_stream):
        return input_stream.map(lambda event: _custom_map)
    
    stream = Stream()
    stream = _custom_stream_builder_1(_custom_stream_builder_2(_custom_stream_builder_3(stream)))
    stream.sink(print)
    
    for event in events:
        stream.emit(event)
    

    However it looks like the code inside those functions does not alter the initial stream object and all emitted events go straight to sink. What am I doing wrong? Can you please point me in right direction?

    Another question, what is the difference between

    
    stream = Stream()
    stream.sink(print)
    
    for event in events:
        stream.emit(event)
    
    

    and

    
    stream = Stream()
    stream = stream.sink(print) # any other function map/filter/etc. here
    
    for event in events:
        stream.emit(event)
    
    

    I feel like the answer is somewhere in this example but don't understand where. Thanks!

    opened by dirtyValera 1
  • How to store information from PeriodicDataFrame

    How to store information from PeriodicDataFrame

    i am new to coding, so apologies for asking this. I am looking to store the data that comes in through PeriodicDataFrame. My project is to intake Barometric Pressure every 30 seconds, and then later tell me if there is a change in pressure by a certain amount. In order to do the second part of this, I would need to store the data frame instead of keeping just the latest. How can this be done with streamz, or is it not possible?

    opened by rebeckaflynn1 1
  • Gracefully exit python script using Streams

    Gracefully exit python script using Streams

    I have a use case associated with pulling data from a Kafka topic. I need the streamz operator exit gracefully and exit the python script once it hits an exception. It looks something like this :

    source = Stream.from_kafka_batched(TOPIC, kafka_confs, poll_interval='20s', max_batch_size=10000)
    
    def process_messages():
        try:
             #process_messages
       except Exception as e:
            print(e)
            disconnect_gracefully()
    
    def disconnect_gracefully():
        logging.info("Exit gracefully")
        source.stop()
        source.destory()
        
    source.map(process_messages)
    

    While this seems to work for the streamz operator, I feel like it doesn't disconnect from the Kafka broker and I get logs like this

    %6|1651194599.149|FAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: my-kafka-server:9093: Disconnected (after 80522ms in state UP)
    

    So, the script doesn't exit. Any pointers to how this can be done effectively?

    opened by arjun180 1
Owner
Python Streamz
A small real-time streaming library for python
Python Streamz
flexible time-series processing & feature extraction

tsflex is a toolkit for flexible time-series processing & feature extraction, making few assumptions about input data. Useful links Documentation Exam

PreDiCT.IDLab 206 Dec 28, 2022
2D fluid simulation implementation of Jos Stam paper on real-time fuild dynamics, including some suggested extensions.

Fluid Simulation Usage Download this repo and store it in your computer. Open a terminal and go to the root directory of this folder. Make sure you ha

Mariana Ávalos Arce 5 Dec 2, 2022
Real-time domain adaptation for semantic segmentation

Advanced-Machine-Learning This repository contains the code for the project Real

Andrea Cavallo 1 Jan 30, 2022
Kats is a toolkit to analyze time series data, a lightweight, easy-to-use, and generalizable framework to perform time series analysis.

Kats, a kit to analyze time series data, a lightweight, easy-to-use, generalizable, and extendable framework to perform time series analysis, from understanding the key statistics and characteristics, detecting change points and anomalies, to forecasting future trends.

Facebook Research 4.1k Dec 29, 2022
A toolkit for geo ML data processing and model evaluation (fork of solaris)

An open source ML toolkit for overhead imagery. This is a beta version of lunular which may continue to develop. Please report any bugs through issues

Ryan Avery 4 Nov 4, 2021
A toolkit for making real world machine learning and data analysis applications in C++

dlib C++ library Dlib is a modern C++ toolkit containing machine learning algorithms and tools for creating complex software in C++ to solve real worl

Davis E. King 11.6k Jan 2, 2023
My project contrasts K-Nearest Neighbors and Random Forrest Regressors on Real World data

kNN-vs-RFR My project contrasts K-Nearest Neighbors and Random Forrest Regressors on Real World data In many areas, rental bikes have been launched to

null 1 Oct 28, 2021
The project's goal is to show a real world application of image segmentation using k means algorithm

The project's goal is to show a real world application of image segmentation using k means algorithm

null 2 Jan 22, 2022
Open source time series library for Python

PyFlux PyFlux is an open source time series library for Python. The library has a good array of modern time series models, as well as a flexible array

Ross Taylor 2k Jan 2, 2023
A statistical library designed to fill the void in Python's time series analysis capabilities, including the equivalent of R's auto.arima function.

pmdarima Pmdarima (originally pyramid-arima, for the anagram of 'py' + 'arima') is a statistical library designed to fill the void in Python's time se

alkaline-ml 1.3k Dec 22, 2022
Probabilistic time series modeling in Python

GluonTS - Probabilistic Time Series Modeling in Python GluonTS is a Python toolkit for probabilistic time series modeling, built around Apache MXNet (

Amazon Web Services - Labs 3.3k Jan 3, 2023
A python library for easy manipulation and forecasting of time series.

Time Series Made Easy in Python darts is a python library for easy manipulation and forecasting of time series. It contains a variety of models, from

Unit8 5.2k Jan 4, 2023
STUMPY is a powerful and scalable Python library for computing a Matrix Profile, which can be used for a variety of time series data mining tasks

STUMPY STUMPY is a powerful and scalable library that efficiently computes something called the matrix profile, which can be used for a variety of tim

TD Ameritrade 2.5k Jan 6, 2023
A Python package for time series classification

pyts: a Python package for time series classification pyts is a Python package for time series classification. It aims to make time series classificat

Johann Faouzi 1.4k Jan 1, 2023
Python module for machine learning time series:

seglearn Seglearn is a python package for machine learning time series or sequences. It provides an integrated pipeline for segmentation, feature extr

David Burns 536 Dec 29, 2022
A Python toolkit for rule-based/unsupervised anomaly detection in time series

Anomaly Detection Toolkit (ADTK) Anomaly Detection Toolkit (ADTK) is a Python package for unsupervised / rule-based time series anomaly detection. As

Arundo Analytics 888 Dec 30, 2022
AtsPy: Automated Time Series Models in Python (by @firmai)

Automated Time Series Models in Python (AtsPy) SSRN Report Easily develop state of the art time series models to forecast univariate data series. Simp

Derek Snow 465 Jan 2, 2023
A python library for Bayesian time series modeling

PyDLM Welcome to pydlm, a flexible time series modeling library for python. This library is based on the Bayesian dynamic linear model (Harrison and W

Sam 438 Dec 17, 2022
A Python implementation of GRAIL, a generic framework to learn compact time series representations.

GRAIL A Python implementation of GRAIL, a generic framework to learn compact time series representations. Requirements Python 3.6+ numpy scipy tslearn

null 3 Nov 24, 2021