Stream Framework is a Python library, which allows you to build news feed, activity streams and notification systems using Cassandra and/or Redis. The authors of Stream-Framework also provide a cloud service for feed technology:

Overview

Stream Framework

Build Status StackShare

Activity Streams & Newsfeeds

Examples of what you can build

Stream Framework is a Python library which allows you to build activity streams & newsfeeds using Cassandra and/or Redis. If you're not using python have a look at Stream, which supports Node, Ruby, PHP, Python, Go, Scala, Java and REST.

Examples of what you can build are:

  • Activity streams such as seen on Github
  • A Twitter style newsfeed
  • A feed like Instagram/ Pinterest
  • Facebook style newsfeeds
  • A notification system

(Feeds are also commonly called: Activity Streams, activity feeds, news streams.)

Stream

Build scalable newsfeeds and activity streams using getstream.io

Stream Framework's authors also offer a web service for building scalable newsfeeds & activity streams at Stream. It allows you to create your feeds by talking to a beautiful and easy to use REST API. There are clients available for Node, Ruby, PHP, Python, Go, Scala and Java. The Get Started page explains the API & concept in a few clicks. It's a lot easier to use, free up to 3 million feed updates and saves you the hassle of maintaining Cassandra, Redis, Faye, RabbitMQ and Celery workers.

Background Articles

A lot has been written about the best approaches to building feed based systems. Here's a collection of some of the talks:

Stream Framework

Installation

Installation through pip is recommended::

$ pip install stream-framework

By default stream-framework does not install the required dependencies for redis and cassandra:

Install stream-framework with Redis dependencies

$ pip install stream-framework[redis]

Install stream-framework with Cassandra dependencies

$ pip install stream-framework[cassandra]

Install stream-framework with both Redis and Cassandra dependencies

$ pip install stream-framework[redis,cassandra]

Authors & Contributors

Resources

Example application

We've included a Pinterest-like example application based on Stream Framework.

Tutorials

Using Stream Framework

This quick example will show you how to publish a "Pin" to all your followers. So let's create an activity for the item you just pinned.

from stream_framework.activity import Activity


def create_activity(pin):
    activity = Activity(
        pin.user_id,
        PinVerb,
        pin.id,
        pin.influencer_id,
        time=make_naive(pin.created_at, pytz.utc),
        extra_context=dict(item_id=pin.item_id)
    )
    return activity

Next up we want to start publishing this activity on several feeds. First of all, we want to insert it into your personal feed, and then into your followers' feeds. Let's start by defining these feeds.

from stream_framework.feeds.redis import RedisFeed


class UserPinFeed(PinFeed):
    key_format = 'feed:user:%(user_id)s'


class PinFeed(RedisFeed):
    key_format = 'feed:normal:%(user_id)s'

Writing to these feeds is very simple. For instance to write to the feed of user 13 one would do:

feed = UserPinFeed(13)
feed.add(activity)

But we don't want to publish to just one users feed. We want to publish to the feeds of all users which follow you. This action is called a "fanout" and is abstracted away in the manager class. We need to subclass the Manager class and tell it how we can figure out which users follow us.

from stream_framework.feed_managers.base import Manager


class PinManager(Manager):
    feed_classes = dict(
        normal=PinFeed,
    )
    user_feed_class = UserPinFeed

    def add_pin(self, pin):
        activity = pin.create_activity()
        # add user activity adds it to the user feed, and starts the fanout
        self.add_user_activity(pin.user_id, activity)

    def get_user_follower_ids(self, user_id):
        ids = Follow.objects.filter(target=user_id).values_list('user_id', flat=True)
        return {FanoutPriority.HIGH:ids}

manager = PinManager()

Now that the manager class is set up, broadcasting a pin becomes as easy as:

manager.add_pin(pin)

Calling this method will insert the pin into your personal feed and into all the feeds of users which follow you. It does so by spawning many small tasks via Celery. In Django (or any other framework) you can now show the users feed.

# django example

@login_required
def feed(request):
    '''
    Items pinned by the people you follow
    '''
    context = RequestContext(request)
    feed = manager.get_feeds(request.user.id)['normal']
    activities = list(feed[:25])
    context['activities'] = activities
    response = render_to_response('core/feed.html', context)
    return response

This example only briefly covered how Stream Framework works. The full explanation can be found on the documentation.

Features

Stream Framework uses Celery and Redis/Cassandra to build a system with heavy writes and extremely light reads. It features:

  • Asynchronous tasks (All the heavy lifting happens in the background, your users don't wait for it)
  • Reusable components (You will need to make tradeoffs based on your use cases, Stream Framework doesn't get in your way)
  • Full Cassandra and Redis support
  • The Cassandra storage uses the new CQL3 and Python-Driver packages, which give you access to the latest Cassandra features.
  • Build for the extremely performant Cassandra 2.1. 2.2 and 3.3 also pass the test suite, but no production experience.
Comments
  • Custom AggregatedActivity not working

    Custom AggregatedActivity not working

    I have created a CustomAggregatedActivity extended from feedly.activity.AggregatedActivity:

    from feedly.activity import AggregatedActivity
    
    class CustomAggregatedActivity(AggregatedActivity):
        @property
        def abc(self):
            pass
    

    And then i have used this in my custom aggregator:

    class CustomAggregator(BaseAggregator):
        aggregation_class = CustomAggregatedActivity
    
        def rank(self, aggregated_activities):
            # rank logic here
    
         def group(self, activity):
             # group logic here
    

    Then i have assigned the CustomAggregator in:

    class AggregatedUserFeed(RedisAggregatedFeed):
        aggregator_class = CustomAggregator
        key_format = 'feed:aggregated:%(user_id)s'
    

    and finally:

    class Newsfeed(Feedly):
        feed_classes = dict(
            normal=NormalUserFeed,
            aggregated=AggregatedUserFeed
        )
        user_feed_class = UserFeed
    newsfeed_stream = Newsfeed()
    

    Now when i get user aggregated feed:

    feed = newsfeed_stream.get_feeds(user_id)['aggregated']
    aggregated_activities = list(feed[:10])
    >>> aggregated_activities[0].abc
    

    It says:

    AttributeError: 'AggregatedActivity' object has no attribute 'abc'
    

    Can you tell me why this happened?

    Thanks!

    opened by intellisense 22
  • Redis+Celery+feedly setup issue?

    Redis+Celery+feedly setup issue?

    Hi again! Im working on a django project (more info in issue #16 ) I've got my Celery worker up and running, my redis seems database ok and most of my setup seems fine.. The only problem is that when I try to "fanout" to new feeds, the feeds does not seem to reach the Celery worker.. (seems no tasks are added to the worker queue). I know this might be a broker/worker issue, but I haven't found any good indications of what the problem might be :( Everything works fine when CELERY_ALWAYS_EAGER == True , so I don't think there is a problem with my code..

    for feedly to work properly with celery and redis the things I would need is a Celery worker running, redis db setup ok, and a task broker (redis or rabbitmq) , right? I think I lack the understanding of how the broker and worker works together with feedly..?

    Any tips are much appreciated :)

    Adding to feed: activity = Activity(wishlist.user,WishVerb,in_prod.id) feed = UserWishFeed(wishlist.user.id) feed.insert_activity(activity) feed.add(activity) feedly.add_wish(wishlist.user, activity)

    A view for fetching friends-feed : @csrf_exempt @api_view(['GET']) @login_required def friends_wish_feed(request, _args, *_kwargs): user = GiftItEndUser.objects.get(pk=kwargs['pk']) if request.user.id != user.pk: content = {"message":"you do not have permission","status":0} json_data = json.dumps(content) return Response(json_data)

        feed = feedly.get_feeds(request.user.id)['normal']
        act_list = []
        json_data = []
        activities = list(feed[:25])
        for act in activities:
                act = {'user_id':act.actor_id,'product_id':act.object_id}
                act_list.append(act)
                json_data = json.dumps(act_list)
        return Response(json_data)
    

    Some celery settings: FEEDLY_NYDUS_CONFIG = { 'CONNECTIONS': { 'redis': { 'engine': 'nydus.db.backends.redis.Redis', 'router': 'nydus.db.routers.redis.PrefixPartitionRouter', 'hosts': { 0: {'prefix': 'default', 'db': 0, 'host': '127.0.0.1', 'port': 6379}, } }, } }

    BROKER_URL = 'redis://localhost:6379/0' CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'

    opened by andrenro 18
  • Deps as extras require

    Deps as extras require

    Further to the conversation in #174, I saw that there's now an option to install without cassandra pip install stream-framework --install-option="--no-cassandra", however I don't think this option makes it possible to install via a requirements file without cassandra.

    This pull request makes it possible to install via

    $ pip install Stream-Framework[cassandra2]
    
    $ pip install Stream-Framework[cassandra3]
    
    $ pip install Stream-Framework[redis]
    

    And these options can also be used in a requirements file, eg

    Stream-Framework[cassandra2]==1.3.4
    
    opened by j0hnsmith 17
  • Can't find pinterest_example based on the instruction

    Can't find pinterest_example based on the instruction

    Hi team,based on this instruction From the root of the feedly project run:

    vagrant up vagrant provision vagrant ssh cd pinterest_example python manage.py runserver 0:8000

    I can't see to find the pinterest_example folder, why?

    opened by rogeryu23 17
  • Not thread safe?

    Not thread safe?

    Just for testing i have set feed max_length=1000 and merge_max_length=1000, why? because duplicate aggregated activities is not an option for me in newsfeed (see on facebook newsfeed you will never see a duplicate activity). So i ran a program which reads existing user actions from database and generates feeds using celery (async tasks).

    I was expecting that max_length=merge_max_length would never create a duplicate aggregated activity, but the program issue so many celery tasks that in the end there were some duplicate aggregated activities. Why that happen?

    Why there is even merge_max_length variable? I thought we were storing activities in redis as key, value where key is a group? so why there is a need to traverse through all aggregated activities to find where the new aggregated activity belong? A new aggregated activity already have group defined so why not do something like this as we do in python dict.get(group)?

    opened by intellisense 15
  • Global pubsub channel in notification feed

    Global pubsub channel in notification feed

    Hello there,

    I'm trying to setup count publishing via pubsub on our project. It seems to work, but I've got a question:

    https://github.com/tschellenbach/Stream-Framework/blob/master/stream_framework/feeds/aggregated_feed/notification_feed.py#L80

    Why is there global channel used? Are there some reasons like performance or something? It looks like there should be just something like:

     self.redis.publish(self.pubsub_key, count_data)
    

    Also, it seems like there is remove_many() override missing with self.denormalize_count() call in it (same as in add_many())

    Thank you guys for a great lib!

    opened by alexkuz 14
  • Serialization Error / KeyError in get_hydrated (?)

    Serialization Error / KeyError in get_hydrated (?)

    I have an issue similar to that in issue #8 .

    Im trying to make a REST service with this, I use django-rest-framework.

    Seems adding activities to redis works, but when i try to fetch them back i will get this

    KeyError: 13869294460000000000005005L

    seems like the "L" is causing some sort of error ?

    function calls and stack trace: wishlists/views.py

    feedly = UserWishFeedly()

    //function for adding activities: activity = Activity(wishlist.user, WishVerb, in_prod.id) feed = UserWishFeed(wishlist.user.id) feed.add(activity)

    //test-function for getting the feed def wish_feed(request,args,*kwargs): feed = feedly.get_user_feed(request.user.id) activities = feed[:10] content = {"activities":activities} json_data = json.dumps(content) return Response(json_data)

    File "/usr/local/lib/python2.7/dist-packages/django/core/handlers/base.py", line 115, in get_response response = callback(request, _callback_args, *_callback_kwargs)

    File "/usr/local/lib/python2.7/dist-packages/django/views/decorators/csrf.py", line 77, in wrapped_view return view_func(_args, *_kwargs)

    File "/usr/local/lib/python2.7/dist-packages/django/views/generic/base.py", line 68, in view return self.dispatch(request, _args, *_kwargs)

    File "/usr/local/lib/python2.7/dist-packages/django/views/decorators/csrf.py", line 77, in wrapped_view return view_func(_args, *_kwargs)

    File "/usr/local/lib/python2.7/dist-packages/rest_framework/views.py", line 327, in dispatch response = self.handle_exception(exc)

    File "/usr/local/lib/python2.7/dist-packages/rest_framework/views.py", line 324, in dispatch response = handler(request, _args, *_kwargs)

    File "/usr/local/lib/python2.7/dist-packages/rest_framework/decorators.py", line 49, in handler return func(_args, *_kwargs)

    File "/usr/local/lib/python2.7/dist-packages/django/contrib/auth/decorators.py", line 25, in _wrapped_view return view_func(request, _args, *_kwargs)

    File "/home/andrenro/dev_giftit_webapp/env/Scripts/giftit_webapp/wishlists/views.py", line 437, in wish_feed activities = feed[:10]

    File "/usr/local/lib/python2.7/dist-packages/feedly/feeds/base.py", line 261, in get item start, bound)

    File "/usr/local/lib/python2.7/dist-packages/feedly/feeds/base.py", line 304, in get_activity_slice activities = self.hydrate_activities(activities)

    File "/usr/local/lib/python2.7/dist-packages/feedly/feeds/base.py", line 285, in hydrate_activities return [activity.get_hydrated(activity_data) for activity in activities]

    File "/usr/local/lib/python2.7/dist-packages/feedly/activity.py", line 41, in get_hydrated activity = activities[int(self.serialization_id)]

    KeyError: 13869294460000000000005005L

    If i try redis-cli i get:

    127.0.0.1:6379> zrange "feed:user:6" 0 25

    1. "13869294460000000000005005"

    Im running python 2.7.3 and django 1.5.4 Im kinda stuck with this, hope someone can shed some light on it. Great project btw!!

    opened by andrenro 14
  • dynamic feed slug support

    dynamic feed slug support

    It's not very clear how to implement getstream.io dynamic feed slug feature. Create dynamic feed class? or merge feed_slug into user_id like Feed(user_id="%(feed_slug)s:%(user_id)s") ? this is too hack.

    opened by ilovenwd 13
  • Lost activities in Aggregated Feed

    Lost activities in Aggregated Feed

    Hi, first of all a little of what I am trying to achieve. I have built a django wrapper on top of stream-framework library. There are 2 feed-classes - FlatFeed(RedisFeed) and AggregatedFeed(RedisAggregatedFeed). Obviously, these are using Redis to store the feed data. I have also implemented my own aggregator class.

    Error: The generated aggregated feed doesn't contain all activities while the flat feed has all the activities. The use-case is - there are 3 users A, B and C. B and C performs some activities, then user A follows B and C. User B and C keep on doing more activities. Flat feed of A contains all the activities of B and C, but aggregated feed of A has some lost activities.

    For example, B likes products 1, 2, 3, 4 C likes products 5, 6, 7, 8, 9, 10 A follows B A follows C

    flat_feed(A) has all activities, but aggregated_feed(A) only has likes for 1, 5 and 8. I repeated this use-case several times and each time only these 3 activities are coming.

    I have tested my aggregator class implementation on django shell. The output of aggregate and merge function contains all the activities.

    Please help !!

    Please note that flat feed has correct entries, the missing entries are only in aggregated feeds.

    opened by anunayasri 10
  • change celery task decorator to shared_task

    change celery task decorator to shared_task

    For reusable apps shared_task are used. Also for me with celery.task.task decorator, celery was using default settings instead of setting from project.

    http://celery.readthedocs.org/en/latest/django/first-steps-with-django.html#using-the-shared-task-decorator

    opened by ashwinrajeev 9
  • serialization id mismatch

    serialization id mismatch

    Hi,

    It seems like serialization ids mismatch by 'L' letter, so what would be the possible cause? Is feedly hitting redis twice? Here is the step in trace of the run

    > get_user_feed(customer.user.id)[:10]
    [Activity(joined) 7 5]
    (Pdb) activity_list = self.activity_storage.get_many(activity_ids)
    (Pdb) activity_ids
    ['13778137560000000000005005']
    (Pdb) activity_data
    {13778173560000000000005005L: Activity(joined) 7 5}
    -> return [activity.get_hydrated(activity_data) for activity in activities]
    KeyError: (13778137560000000000005005L,)
    

    Feedly version: feedly==0.9.3

    Thanks

    opened by ghost 9
  • docs: Fix a few typos

    docs: Fix a few typos

    There are small typos in:

    • docs/conf.py
    • docs/readme.rst
    • stream_framework/tests/storage/base.py

    Fixes:

    • Should read that rather than shat.
    • Should read pollution rather than polution.
    • Should read immediately rather than immediatly.

    Semi-automated pull request generated by https://github.com/timgates42/meticulous/blob/master/docs/NOTE.md

    opened by timgates42 0
  • ImportError: cannot import name 'BasePipeline' from 'redis.client' (/home/ian/.local/lib/python3.8/site-packages/redis/client.py)

    ImportError: cannot import name 'BasePipeline' from 'redis.client' (/home/ian/.local/lib/python3.8/site-packages/redis/client.py)

    ImportError: cannot import name 'BasePipeline' from 'redis.client' (/home/ian/.local/lib/python3.8/site-packages/redis/client.py)

    ian@ian-HP-Stream-Laptop-11-y0XX:~/stream-framework/Stream-Framework$ python3 Python 3.8.10 (default, Sep 28 2021, 16:10:42) [GCC 9.3.0] on linux Type "help", "copyright", "credits" or "license" for more information.

    from stream_framework.activity import Activity

    def create_activity(pin): ... activity = Activity( ... pin.user_id, ... PinVerb, ... pin.id, ... pin.influencer_id, ... time=make_naive(pin.created_at, pytz.utc), ... extra_context=dict(item_id=pin.item_id) ... ) ... return activity ... from stream_framework.feeds.redis import RedisFeed Traceback (most recent call last): File "", line 1, in File "/home/ian/stream-framework/Stream-Framework/stream_framework/feeds/redis.py", line 2, in from stream_framework.storage.redis.activity_storage import RedisActivityStorage File "/home/ian/stream-framework/Stream-Framework/stream_framework/storage/redis/activity_storage.py", line 2, in from stream_framework.storage.redis.structures.hash import ShardedHashCache File "/home/ian/stream-framework/Stream-Framework/stream_framework/storage/redis/structures/hash.py", line 1, in from stream_framework.storage.redis.structures.base import RedisCache File "/home/ian/stream-framework/Stream-Framework/stream_framework/storage/redis/structures/base.py", line 2, in from redis.client import BasePipeline ImportError: cannot import name 'BasePipeline' from 'redis.client' (/home/ian/.local/lib/python3.8/site-packages/redis/client.py)

    opened by iajzenszmi 1
  • Question: Can I use stream-framework for fanout to target object followers

    Question: Can I use stream-framework for fanout to target object followers

    It's clear that I can use stream-framework for Notification/Activity feeds. However, the use-case I'm building involves a slightly different scenario

    1. A per-user ranked feed of target objects. An example of target objects could be movie listings.
    2. A per-user ranked, filtered feed of users who follow a target object

    As far as 2) is concerned, it seems like I can override get_user_follower_ids to accomplish this [user->target-objects-the-user-is-following -> user_ids that follow the target objects]

    Would this be a good way to handle this scenario ?

    opened by aammundi 0
Owner
Thierry Schellenbach
Author Stream-Framework, Founder/CEO getstream.io (Techstars NYC)
Thierry Schellenbach
Py-instant-search-redis - Source code example for how to build an instant search with redis in python

py-instant-search-redis Source code example for how to build an instant search (

Giap Le 4 Feb 17, 2022
Simple API written in Python using FastAPI to store and retrieve Books and Authors.

Simple API made with Python FastAPI WIP: Deploy in AWS with Terraform Simple API written in Python using FastAPI to store and retrieve Books and Autho

Caio Delgado 9 Oct 26, 2022
Django-Audiofield is a simple app that allows Audio files upload, management and conversion to different audio format (mp3, wav & ogg), which also makes it easy to play audio files into your Django application.

Django-Audiofield Description: Django Audio Management Tools Maintainer: Areski Contributors: list of contributors Django-Audiofield is a simple app t

Areski Belaid 167 Nov 10, 2022
Django Persistent Filters is a Python package which provide a django middleware that take care to persist the querystring in the browser cookies.

Django Persistent Filters Django Persistent Filters is a Python package which provide a django middleware that take care to persist the querystring in

Lorenzo Prodon 2 Aug 5, 2022
A beginner django project and also my first Django project which involves shortening of a longer URL into a short one using a unique id.

Django-URL-Shortener A beginner django project and also my first Django project which involves shortening of a longer URL into a short one using a uni

Rohini Rao 3 Aug 8, 2021
Adding Firebase Cloud Messaging Service into a Django Project

Adding Firebase Cloud Messaging Service into a Django Project The aim of this repository is to provide a step-by-step guide and a basic project sample

Seyyed Ali Ayati 11 Jan 3, 2023
This is raw connection between redis server and django python app

Django_Redis This repository contains the code for this blogpost. Running the Application Clone the repository git clone https://github.com/xxl4tomxu9

Tom Xu 1 Sep 15, 2022
Show how the redis works with Python (Django).

Redis Leaderboard Python (Django) Show how the redis works with Python (Django). Try it out deploying on Heroku (See notes: How to run on Google Cloud

Tom Xu 4 Nov 16, 2021
An app that allows you to add recipes from the dashboard made using DJango, JQuery, JScript and HTMl.

An app that allows you to add recipes from the dashboard. Then visitors filter based on different categories also each ingredient has a unique page with their related recipes.

Pablo Sagredo 1 Jan 31, 2022
A Django chatbot that is capable of doing math and searching Chinese poet online. Developed with django, channels, celery and redis.

Django Channels Websocket Chatbot A Django chatbot that is capable of doing math and searching Chinese poet online. Developed with django, channels, c

Yunbo Shi 8 Oct 28, 2022
Indonesia's negative news detection using gaussian naive bayes with Django+Scikir Learn

Introduction Indonesia's negative news detection using gaussian naive bayes build with Django and Scikit Learn. There is also any features, are: Input

Harifzi Ham 1 Dec 30, 2021
A web app which allows user to query the weather info of any place in the world

weather-app This is a web app which allows user to get the weather info of any place in the world as soon as possible. It makes use of OpenWeatherMap

Oladipo Adesiyan 3 Sep 20, 2021
Full featured redis cache backend for Django.

Redis cache backend for Django This is a Jazzband project. By contributing you agree to abide by the Contributor Code of Conduct and follow the guidel

Jazzband 2.5k Jan 3, 2023
A simple app that provides django integration for RQ (Redis Queue)

Django-RQ Django integration with RQ, a Redis based Python queuing library. Django-RQ is a simple app that allows you to configure your queues in djan

RQ 1.6k Jan 6, 2023
A Redis cache backend for django

Redis Django Cache Backend A Redis cache backend for Django Docs can be found at http://django-redis-cache.readthedocs.org/en/latest/. Changelog 3.0.0

Sean Bleier 1k Dec 15, 2022
A simple Blog Using Django Framework and Used IBM Cloud Services for Text Analysis and Text to Speech

ElhamBlog Cloud Computing Course first assignment. A simple Blog Using Django Framework and Used IBM Cloud Services for Text Analysis and Text to Spee

Elham Razi 5 Dec 6, 2022
wagtail_tenants is a Django/Wagtail app to provide multitenancy to your wagtail project.

wagtail-tenants wagtail_tenants is a Django/Wagtail app to provide multitenancy to your wagtail project. You are able to run a main Wagtail Site and f

<bbr> 11 Nov 20, 2022
Django-environ allows you to utilize 12factor inspired environment variables to configure your Django application.

Django-environ django-environ allows you to use Twelve-factor methodology to configure your Django application with environment variables. import envi

Daniele Faraglia 2.7k Jan 7, 2023
pytest-django allows you to test your Django project/applications with the pytest testing tool.

pytest-django allows you to test your Django project/applications with the pytest testing tool.

pytest-dev 1.1k Dec 14, 2022