Streamz helps you build pipelines to manage continuous streams of data

Overview

Streamz

Build Status Documentation Status Version Status RAPIDS custreamz gpuCI

Streamz helps you build pipelines to manage continuous streams of data. It is simple to use in simple cases, but also supports complex pipelines that involve branching, joining, flow control, feedback, back pressure, and so on.

Optionally, Streamz can also work with both Pandas and cuDF dataframes, to provide sensible streaming operations on continuous tabular data.

To learn more about how to use Streamz see documentation at streamz.readthedocs.org.

LICENSE

BSD-3 Clause

Comments
  • Improve kafka source

    Improve kafka source

    Fixes #143

    The idea is that dynamic polling should always be marked as async, and latch on to any IOLoop going, or the one passed.

    I hope this fixes the jupyter display problem - it certainly works from print, although that has the problem of the actual output appearing wherever the latest code was executed.

    opened by martindurant 80
  • Proposal to add Checkpointing to streamz.

    Proposal to add Checkpointing to streamz.

    I closed out the earlier checkpointing PR #272, since it was an incorrect implementation. Before I raise a new PR, I wanted to put out a proposal and get as many reviews as possible, so as to have a sure footing to implement the feature. So here goes:

    The motivation for Checkpointing is simple — we need to track the amount of data that has been processed in our streaming pipeline so that in case the stream has to restart, it can pick up from where it left off. Please note, that processed here is different from data only being read/consumed in the pipeline.

    Spark follows the following heuristics for checkpointing:

    1. Checkpointing is mandatory for all streaming pipelines, as would be in any production environments. However, we can make checkpointing optional in streamz API, if need be.
    2. Whenever a batch of data is processed completely, and the results are written to a sink, Spark creates a new checkpoint for this batch with the latest offsets for all partitions for the topic(s) involved in the batch.
    3. Spark does not consider printing to console or the like as a sink. In case of streamz, the only valid sink for now is Kafka.
    4. The directory structure of checkpointing is as follows:
    checkpoint_path/ #this is provided by the user 
          topic1/
                        1.txt #Checkpoint for the first batch
                        2.txt #Checkpoint for the second batch 
                        .
                        .
          topic2/
                        1.txt
                        .
    
    1. Now, if a stream is killed for some reason, and restarts, it would pick up reading the Kafka topic from the last checkpoint file created.
    2. Only the last few checkpoints are maintained, and the older ones are deleted. We can hard-code this number to something, for example, 100.
    3. If multiple streams join/union, Spark also takes care of the checkpoints for all topics that may be involved.

    Implementation in streamz: The following is one of the cleanest ways, IMO, to implement checkpointing.

    1. Reading a batch from Kafka starts off with from_kafka_batched().
    from_kafka_batched([topics], consumer_conf, checkpoint_path, ..):	
    	consumer_conf[‘enable.auto.commit’] = False
    	for topic in topics:
    		Retrieve last created checkpoint for the topic from the checkpoint_path.
     		for partition in partitions:
    			emit (partition, low, high) tuples in stream for get_message_batch()
    

    Note that, [topics] parameters is a list of topic that the stream can now subscribe/consume to/from. The checkpoint_path parameter has to be specified by the user. In case the user does not want to use checkpointing, we can set the default to None to use streamz as is.

    1. get_message_batch() consumes the messages from the specific partition between the low and high offsets specified in the tuples emitted in the stream.
    get_message_batch(kafka_params, topic, partition, low, high, checkpoint_path, ..):
    	Consume messages
            Create a dirty-bit file in the checkpoint_path — this is an indication that some data is yet to be checkpointed.
    	Create/update a write-ahead log (WAL) textfile for this topic partition with the latest offsets that have been read and are ready to be `checkpointed` if processing is successful.      
    

    The purpose of the dirty-bit is explained later on. The WAL would be a text-file, #partition.txt in a topic-level directory in the checkpoint path. It would contain the offset for this topic partition that has last been consumed, and if processing goes through to finally write back to a sink, this the offset to be checkpointed.

    The directory structure would now look like this:

    checkpoint_path/
          dirty_bit
          write-ahead-logs/
               topic1/
                        1.txt #Log for first partition of topic 1.
                        2.txt #Log for second partition of topic 1. 
                        .
                        .
               topic2/
                        1.txt
                        .
          topic1/
                        1.txt #Checkpoint for the first batch
                        2.txt #Checkpoint for the second batch 
                        .
                        .
          topic2/
                        1.txt
                        .
    
    1. There would now be some stream-specific processing functions, e.g., map(), etc.
    2. Finally, if we write back the results to Kafka using to_kafka(), this is where the checkpoint creation would take place.
    to_kafka(checkpoint_path, output_topic, producer_conf):
           ... #The current operations within to_kafka() stay as is.
           
           #Everything below will be done from a separate function add_checkpoint()
           If no dirty-bit file, exit.
           Delete dirty-bit file here.
            new_checkpoint = {}
    	for topic in [topics from WAL]: (#We can make this multi-threaded)
    		for partition in WAL/topic path:
                          new_checkpoint[partition] = offset
                    Create checkpoint file for this batch for this topic at `checkpoint_path/topic`. 
    

    A checkpoint file would look like a JSON {partition:offset} dictionary:

    checkpoint_path/topic1/1.txt
    {0: 5, 1: 4, 2: 3}
    

    Let's say we decide to keep only the last 100 checkpoints for each topic. Then, as soon as the 101st checkpoint is created, the 1st checkpoint 1.txt is deleted, and the 101.txt gets created.

    This finishes the implementation.

    Now, a few questions and their answers:

    1. Since, to_kafka() is called on every element of the stream, and if we are using Dask, all WAL for all partitions are created in parallel. After we have gather().to_kafka(), the checkpoint is created on the first call to to_kafka() itself. In this case, how do we avoid duplicate checkpoints being created? A maximum of #partitions duplicate checkpoints can be created. Answer: This is where the dirty bit file comes into play. If new data is consumed, a dirty bit is set. to_kafka() does not attempt to create a new checkpoint if the dirty bit is not set. After a checkpoint is created, the dirty bit is deleted by to_kafka().

    2. The WAL and the checkpoints would be stored on the local file system for now? Answer: Yes, we can support S3 in the future, like Spark.

    3. In the future, if we have different sinks apart from Kafka, like SQS, Google Pub-Sub, ElasticSearch, do we need to implement checkpointing all over again? Answer: No. When writing to a sink, one would just call the add_checkpoint() function that we would create as part of this PR. The source would always be Kafka, though, for the foreseeable future.

    4. We need to pass the checkpoint_path to both from_kafka_batched() and to_kafka()? Answer: Yes, that's unfortunate. I'd love to hear another way to do this for a cleaner API. But we need the path in from_kafka_batched() to retrieve the last checkpoint, and in to_kafka() to create a new checkpoint.

    5. Would this work with Dask? Answer: Absolutely. We would definitely want this to work with Dask!

    opened by chinmaychandak 48
  • Added the to_kafka stream

    Added the to_kafka stream

    I have added the to_kafka class.

    The options were to either:

    1. Call the .flush() method after each .produce(...) call. This will seriously hurt performance as messages will be sent serially.
    2. Use atexit to call flush at the end. This requires the use of atexit, of which I'm unsure of any impact. It also means that the result of the .emit(...) call by the client does not yield an immediate call to the downstream. Note that the Confluent library will flush when the batch size reaches a threshold. This is the method that is implemented here.
    opened by jsmaupin 47
  • Add HoloViews based plotting API

    Add HoloViews based plotting API

    Adds handling for bar plots on Series and Seriess objects. Also fixes Seriess.plot.line, Seriess.plot.scatter and Seriess.plot.area, ensuring the index is reset and therefore made visible to HoloViews.

    opened by philippjfr 39
  • A potential approach for checking if a datum is done

    A potential approach for checking if a datum is done

    One way to think about check pointing is knowing if a piece of data has completely exited the pipeline, it is no longer being processed or being held in a cache.

    Backpressure can help out here, since we can know if the pipeline has returned control up the call stack.

    I think there are three ways that control is returned:

    1. All computations have finished, the data has left through a sink
    2. Some computations have finished, the data was dropped by a filter
    3. Some computations have finished, the data was stored in a cache waiting for something

    1 and 2 are handled by backpressure nicely, since if all the computations have finished then the data is done.

    3 is a bit more tricky since we need to track the cached values.

    Potential implementation

    def _emit(..., metadata={'reference': RefCounter}):
        RefCounter += len(self.downstreams)
        for downstream in downstream:
            downstream.update(x)
            RefCounter -= 1
    

    Caching nodes

    def update(...):
        cache.append(x)
        RefCounter += 1
    

    When the RefCounter hits zero the data is done.

    Note that this would need to merge the refcounting metadata when joining nodes. For instance combine_latest would need to propagate the ref counters for all incoming data sources, and all downstream data consumers would need to increment/decrement for all the ref counters.

    We'd also need to make the refcounters at data ingestion time.

    opened by CJ-Wright 35
  • A simple plugin system

    A simple plugin system

    Right now, if I need to add my own custom stream nodes, I have to do this:

    import mypackage.streamz_extras  # noqa: F401
    

    It would be nice to have a way to distribute additional functionality as separate packages that can just be installed via pip. This can be done with entry_points, similar to the way it works in airflow. This is a super bare-bones implementation of this mechanism. Check out https://github.com/roveo/streamz_example_plugin for an example of a plugin.

    Problems:

    • this should be tested, but I have no idea how, short of shipping the code for example plugin with tests
    • plugged-in classes should be checked for validity in some way. I can add a simple check e.g. isinstance(plugin, Stream), but there is probably something else I haven't thought of
    opened by roveo 29
  • Add refresh_cycles parameter to from_kafka_batched to accommodate adding Kafka topic partitions on the fly.

    Add refresh_cycles parameter to from_kafka_batched to accommodate adding Kafka topic partitions on the fly.

    Streamz should be able to handle addition of Kafka topic partitions on the fly. The refresh_cycles parameter polls Kafka every N cycles to get the latest Kafka partition metadata and accommodates any added partitions on the fly.

    [Some more discussion in #358]

    opened by chinmaychandak 29
  • combine_latest emit_on behavior

    combine_latest emit_on behavior

    Should we buffer the combine_latest emit_on stream?

    Context

    Consider the following situation, we have two streams a and b. We are going to use combine_latest to combine them, with an emit only on a. 5 entries come down from a then one comes down from b then one from a. Due to the lack of data from b none of the 5 initial a entries have been emitted since the b was missing. Now that we have b data do we expect it to emit 6 times (which would require the buffering of the emit_on stream or only once (which may violate the idea of the emit_on stream always emitting)?

    @mrocklin @ordirules @danielballan

    opened by CJ-Wright 26
  • Added checkpointing

    Added checkpointing

    This works by creating a future at the source for each batch. The futures are passed down the pipeline with the data. Each type of function does something different depending on that function's operation. Some functions will mark a future as down with 'drop' to tell the source not to commit the offsets. Others, such as map will pass the future downstream without affecting it.

    opened by jsmaupin 24
  • Connect streams

    Connect streams

    Suggested resolution to #44 . It makes a big difference when visualizing graphs and I'm using it now, so I thought I'd already go ahead and send it. Please let me know if there are better ways.

    From issue, changing from s2.map(s1.emit) to s2.connect(s1) leads to difference of the two figures:

    s2.map(s1.emit) : http://imgur.com/a/0Z0JA

    s2.connect(s1) : http://imgur.com/a/rcrX0

    opened by jrmlhermitte 24
  • WIP - Batches, dispatch, and nesting

    WIP - Batches, dispatch, and nesting

    OK, this is a work-in-progress PR that shows some ability to nest different kinds of elements in a stream. This PR does a few things to enable this:

    1. We make a Batch type that is a tuple of many elements as a single batch in a stream
    2. We create a singledispatch system to replace the monkeypatching that we used to do on Futures
    3. We do a bit of magic so that the functions we pass down to elements again checks whatever it runs on and possible nests further.

    As a result, we can do the following example, which maps a function over remote batches:

    In [1]: from dask.distributed import Client
    
    In [2]: client = Client()
    
    In [3]: from streams import Stream, Batch
    
    In [4]: import streams.dask
    
    In [5]: source = Stream()
    
    In [6]: batches = source.partition(3).map(Batch)
    
    In [7]: futures = streams.dask.scatter(batches)
    
    In [8]: L = futures.map(lambda x: x + 1).sink_to_list()
    
    In [9]: for i in range(9):
       ...:     source.emit(i)
       ...:     
    
    In [10]: L
    Out[10]: 
    [<Future: status: finished, type: Batch, key: _stream_map-06488d54c44f3a3db136e6c34468eb13>,
     <Future: status: finished, type: Batch, key: _stream_map-cad3989e05761099bf5b22db5992eed3>,
     <Future: status: finished, type: Batch, key: _stream_map-ecd902c4b2cd3a96cb3c73694d64cfe6>]
    
    In [11]: client.gather(L)
    Out[11]: [(1, 2, 3), (4, 5, 6), (7, 8, 9)]
    

    Note that things like accumulate (which will have to be changed) and merge (which will have to be split) don't work yet. I wanted to get feedback from @ordirules and @danielballan .

    opened by mrocklin 23
  • visualizing streams and changing variables during runtime

    visualizing streams and changing variables during runtime

    Does anyone have any suggestions or experience generating a real-time visualization of streams and the events flowing through the pipeline?

    After I get a real time viz going I want to be able to select a stream/operator/edge between operators and modify the state. For example temporarily disconnect one stream from another, or change a variable that is currently static such as the condition in a filter operator.

    opened by OpenCoderX 0
  • Dropping `pkg_resources`

    Dropping `pkg_resources`

    Currently this uses pkg_resources, which is planned to go away:

    https://github.com/python-streamz/streamz/blob/b73a8c4c5be35ff1dae220daaefbfd2bfa58e0a1/streamz/plugins.py#L3

    The code using it mainly uses the iter_entry_points function:

    https://github.com/python-streamz/streamz/blob/b73a8c4c5be35ff1dae220daaefbfd2bfa58e0a1/streamz/plugins.py#L17-L22

    This could be replaced by importlib.metadata.entry_points, which has been around since Python 3.8.

    opened by jakirkham 0
  • How to parametrize stream/pipeline creation?

    How to parametrize stream/pipeline creation?

    Hi, thanks for the great project!

    What I'm trying to achieve is to create a custom builder interface/wrapper around streamz to have predefined building blocks of preconfigured streams which can be used together. Example

    
    def _custom_stream_builder_1(input_stream):
        return input_stream.accumulate(_custom_acc, returns_state=True, start=_custom_state)
    
    
    def _custom_stream_builder_2(input_stream):
        return input_stream.filter(lambda event: _custom_condition)
    
    
    def _custom_stream_builder_3(input_stream):
        return input_stream.map(lambda event: _custom_map)
    
    stream = Stream()
    stream = _custom_stream_builder_1(_custom_stream_builder_2(_custom_stream_builder_3(stream)))
    stream.sink(print)
    
    for event in events:
        stream.emit(event)
    

    However it looks like the code inside those functions does not alter the initial stream object and all emitted events go straight to sink. What am I doing wrong? Can you please point me in right direction?

    Another question, what is the difference between

    
    stream = Stream()
    stream.sink(print)
    
    for event in events:
        stream.emit(event)
    
    

    and

    
    stream = Stream()
    stream = stream.sink(print) # any other function map/filter/etc. here
    
    for event in events:
        stream.emit(event)
    
    

    I feel like the answer is somewhere in this example but don't understand where. Thanks!

    opened by dirtyValera 1
  • How to store information from PeriodicDataFrame

    How to store information from PeriodicDataFrame

    i am new to coding, so apologies for asking this. I am looking to store the data that comes in through PeriodicDataFrame. My project is to intake Barometric Pressure every 30 seconds, and then later tell me if there is a change in pressure by a certain amount. In order to do the second part of this, I would need to store the data frame instead of keeping just the latest. How can this be done with streamz, or is it not possible?

    opened by rebeckaflynn1 1
  • Gracefully exit python script using Streams

    Gracefully exit python script using Streams

    I have a use case associated with pulling data from a Kafka topic. I need the streamz operator exit gracefully and exit the python script once it hits an exception. It looks something like this :

    source = Stream.from_kafka_batched(TOPIC, kafka_confs, poll_interval='20s', max_batch_size=10000)
    
    def process_messages():
        try:
             #process_messages
       except Exception as e:
            print(e)
            disconnect_gracefully()
    
    def disconnect_gracefully():
        logging.info("Exit gracefully")
        source.stop()
        source.destory()
        
    source.map(process_messages)
    

    While this seems to work for the streamz operator, I feel like it doesn't disconnect from the Kafka broker and I get logs like this

    %6|1651194599.149|FAIL|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator: my-kafka-server:9093: Disconnected (after 80522ms in state UP)
    

    So, the script doesn't exit. Any pointers to how this can be done effectively?

    opened by arjun180 1
Owner
Python Streamz
A small real-time streaming library for python
Python Streamz
Reading streams of Twitter data, save them to Kafka, then process with Kafka Stream API and Spark Streaming

Using Streaming Twitter Data with Kafka and Spark Reading streams of Twitter data, publishing them to Kafka topic, process message using Kafka Stream

Rustam Zokirov 1 Dec 6, 2021
PLStream: A Framework for Fast Polarity Labelling of Massive Data Streams

PLStream: A Framework for Fast Polarity Labelling of Massive Data Streams Motivation When dataset freshness is critical, the annotating of high speed

null 4 Aug 2, 2022
Tuplex is a parallel big data processing framework that runs data science pipelines written in Python at the speed of compiled code

Tuplex is a parallel big data processing framework that runs data science pipelines written in Python at the speed of compiled code. Tuplex has similar Python APIs to Apache Spark or Dask, but rather than invoking the Python interpreter, Tuplex generates optimized LLVM bytecode for the given pipeline and input data set.

Tuplex 791 Jan 4, 2023
This tool parses log data and allows to define analysis pipelines for anomaly detection.

logdata-anomaly-miner This tool parses log data and allows to define analysis pipelines for anomaly detection. It was designed to run the analysis wit

AECID 32 Nov 27, 2022
Building house price data pipelines with Apache Beam and Spark on GCP

This project contains the process from building a web crawler to extract the raw data of house price to create ETL pipelines using Google Could Platform services.

null 1 Nov 22, 2021
Data pipelines built with polars

valves Warning: the project is very much work in progress. Valves is a collection of functions for your data .pipe()-lines. This project aimes to host

null 14 Jan 3, 2023
Python library for creating data pipelines with chain functional programming

PyFunctional Features PyFunctional makes creating data pipelines easy by using chained functional operators. Here are a few examples of what it can do

Pedro Rodriguez 2.1k Jan 5, 2023
PipeChain is a utility library for creating functional pipelines.

PipeChain Motivation PipeChain is a utility library for creating functional pipelines. Let's start with a motivating example. We have a list of Austra

Michael Milton 2 Aug 7, 2022
🧪 Panel-Chemistry - exploratory data analysis and build powerful data and viz tools within the domain of Chemistry using Python and HoloViz Panel.

???? ??. The purpose of the panel-chemistry project is to make it really easy for you to do DATA ANALYSIS and build powerful DATA AND VIZ APPLICATIONS within the domain of Chemistry using using Python and HoloViz Panel.

Marc Skov Madsen 97 Dec 8, 2022
follow-analyzer helps GitHub users analyze their following and followers relationship

follow-analyzer follow-analyzer helps GitHub users analyze their following and followers relationship by providing a report in html format which conta

Yin-Chiuan Chen 2 May 2, 2022
Renato 214 Jan 2, 2023
Recommendations from Cramer: On the show Mad-Money (CNBC) Jim Cramer picks stocks which he recommends to buy. We will use this data to build a portfolio

Backtesting the "Cramer Effect" & Recommendations from Cramer Recommendations from Cramer: On the show Mad-Money (CNBC) Jim Cramer picks stocks which

Gábor Vecsei 12 Aug 30, 2022
In this project, ETL pipeline is build on data warehouse hosted on AWS Redshift.

ETL Pipeline for AWS Project Description In this project, ETL pipeline is build on data warehouse hosted on AWS Redshift. The data is loaded from S3 t

Mobeen Ahmed 1 Nov 1, 2021
signac-flow - manage workflows with signac

signac-flow - manage workflows with signac The signac framework helps users manage and scale file-based workflows, facilitating data reuse, sharing, a

Glotzer Group 44 Oct 14, 2022
This mini project showcase how to build and debug Apache Spark application using Python

Spark app can't be debugged using normal procedure. This mini project showcase how to build and debug Apache Spark application using Python programming language. There are also options to run Spark application on Spark container

Denny Imanuel 1 Dec 29, 2021
Amundsen is a metadata driven application for improving the productivity of data analysts, data scientists and engineers when interacting with data.

Amundsen is a metadata driven application for improving the productivity of data analysts, data scientists and engineers when interacting with data.

Amundsen 3.7k Jan 3, 2023
Elementary is an open-source data reliability framework for modern data teams. The first module of the framework is data lineage.

Data lineage made simple, reliable, and automated. Effortlessly track the flow of data, understand dependencies and analyze impact. Features Visualiza

null 898 Jan 9, 2023
A Pythonic introduction to methods for scaling your data science and machine learning work to larger datasets and larger models, using the tools and APIs you know and love from the PyData stack (such as numpy, pandas, and scikit-learn).

This tutorial's purpose is to introduce Pythonistas to methods for scaling their data science and machine learning work to larger datasets and larger models, using the tools and APIs they know and love from the PyData stack (such as numpy, pandas, and scikit-learn).

Coiled 102 Nov 10, 2022