Python library for creating data pipelines with chain functional programming

Overview

PyFunctional

Build Status Code Coverage ReadTheDocs PyPI version

Features

PyFunctional makes creating data pipelines easy by using chained functional operators. Here are a few examples of what it can do:

  • Chained operators: seq(1, 2, 3).map(lambda x: x * 2).reduce(lambda x, y: x + y)
  • Expressive and feature complete API
  • Read and write text, csv, json, jsonl, sqlite, gzip, bz2, and lzma/xz files
  • Parallelize "embarrassingly parallel" operations like map easily
  • Complete documentation, rigorous unit test suite, 100% test coverage, and CI which provide robustness

PyFunctional's API takes inspiration from Scala collections, Apache Spark RDDs, and Microsoft LINQ.

Table of Contents

  1. Installation
  2. Examples
    1. Simple Example
    2. Aggregates and Joins
    3. Reading and Writing SQLite3
    4. Data Interchange with Pandas
  3. Writing to Files
  4. Parallel Execution
  5. Github Shortform Documentation
    1. Streams, Transformations, and Actions
    2. Streams API
    3. Transformations and Actions APIs
    4. Lazy Execution
  6. Contributing and Bug Fixes
  7. Changelog

Installation

PyFunctional is available on pypi and can be installed by running:

# Install from command line
$ pip install pyfunctional

Then in python run: from functional import seq

Examples

PyFunctional is useful for many tasks, and can natively open several common file types. Here are a few examples of what you can do.

Simple Example

from functional import seq

seq(1, 2, 3, 4)\
    .map(lambda x: x * 2)\
    .filter(lambda x: x > 4)\
    .reduce(lambda x, y: x + y)
# 14

# or if you don't like backslash continuation
(seq(1, 2, 3, 4)
    .map(lambda x: x * 2)
    .filter(lambda x: x > 4)
    .reduce(lambda x, y: x + y)
)
# 14

Streams, Transformations and Actions

PyFunctional has three types of functions:

  1. Streams: read data for use by the collections API.
  2. Transformations: transform data from streams with functions such as map, flat_map, and filter
  3. Actions: These cause a series of transformations to evaluate to a concrete value. to_list, reduce, and to_dict are examples of actions.

In the expression seq(1, 2, 3).map(lambda x: x * 2).reduce(lambda x, y: x + y), seq is the stream, map is the transformation, and reduce is the action.

Filtering a list of account transactions

from functional import seq
from collections import namedtuple

Transaction = namedtuple('Transaction', 'reason amount')
transactions = [
    Transaction('github', 7),
    Transaction('food', 10),
    Transaction('coffee', 5),
    Transaction('digitalocean', 5),
    Transaction('food', 5),
    Transaction('riotgames', 25),
    Transaction('food', 10),
    Transaction('amazon', 200),
    Transaction('paycheck', -1000)
]

# Using the Scala/Spark inspired APIs
food_cost = seq(transactions)\
    .filter(lambda x: x.reason == 'food')\
    .map(lambda x: x.amount).sum()

# Using the LINQ inspired APIs
food_cost = seq(transactions)\
    .where(lambda x: x.reason == 'food')\
    .select(lambda x: x.amount).sum()

# Using PyFunctional with fn
from fn import _
food_cost = seq(transactions).filter(_.reason == 'food').map(_.amount).sum()

Aggregates and Joins

The account transactions example could be done easily in pure python using list comprehensions. To show some of the things PyFunctional excels at, take a look at a couple of word count examples.

words = 'I dont want to believe I want to know'.split(' ')
seq(words).map(lambda word: (word, 1)).reduce_by_key(lambda x, y: x + y)
# [('dont', 1), ('I', 2), ('to', 2), ('know', 1), ('want', 2), ('believe', 1)]

In the next example we have chat logs formatted in json lines (jsonl) which contain messages and metadata. A typical jsonl file will have one valid json on each line of a file. Below are a few lines out of examples/chat_logs.jsonl.

{"message":"hello anyone there?","date":"10/09","user":"bob"}
{"message":"need some help with a program","date":"10/09","user":"bob"}
{"message":"sure thing. What do you need help with?","date":"10/09","user":"dave"}
from operator import add
import re
messages = seq.jsonl('examples/chat_logs.jsonl')

# Split words on space and normalize before doing word count
def extract_words(message):
    return re.sub('[^0-9a-z ]+', '', message.lower()).split(' ')


word_counts = messages\
    .map(lambda log: extract_words(log['message']))\
    .flatten().map(lambda word: (word, 1))\
    .reduce_by_key(add).order_by(lambda x: x[1])

Next, lets continue that example but introduce a json database of users from examples/users.json. In the previous example we showed how PyFunctional can do word counts, in the next example lets show how PyFunctional can join different data sources.

# First read the json file
users = seq.json('examples/users.json')
#[('sarah',{'date_created':'08/08','news_email':True,'email':'[email protected]'}),...]

email_domains = users.map(lambda u: u[1]['email'].split('@')[1]).distinct()
# ['yahoo.com', 'python.org', 'gmail.com']

# Join users with their messages
message_tuples = messages.group_by(lambda m: m['user'])
data = users.inner_join(message_tuples)
# [('sarah',
#    (
#      {'date_created':'08/08','news_email':True,'email':'[email protected]'},
#      [{'date':'10/10','message':'what is a...','user':'sarah'}...]
#    )
#  ),...]

# From here you can imagine doing more complex analysis

CSV, Aggregate Functions, and Set functions

In examples/camping_purchases.csv there are a list of camping purchases. Lets do some cost analysis and compare it the required camping gear list stored in examples/gear_list.txt.

purchases = seq.csv('examples/camping_purchases.csv')
total_cost = purchases.select(lambda row: int(row[2])).sum()
# 1275

most_expensive_item = purchases.max_by(lambda row: int(row[2]))
# ['4', 'sleeping bag', ' 350']

purchased_list = purchases.select(lambda row: row[1])
gear_list = seq.open('examples/gear_list.txt').map(lambda row: row.strip())
missing_gear = gear_list.difference(purchased_list)
# ['water bottle','gas','toilet paper','lighter','spoons','sleeping pad',...]

In addition to the aggregate functions shown above (sum and max_by) there are many more. Similarly, there are several more set like functions in addition to difference.

Reading/Writing SQLite3

PyFunctional can read and write to SQLite3 database files. In the example below, users are read from examples/users.db which stores them as rows with columns id:Int and name:String.

db_path = 'examples/users.db'
users = seq.sqlite3(db_path, 'select * from user').to_list()
# [(1, 'Tom'), (2, 'Jack'), (3, 'Jane'), (4, 'Stephan')]]

sorted_users = seq.sqlite3(db_path, 'select * from user order by name').to_list()
# [(2, 'Jack'), (3, 'Jane'), (4, 'Stephan'), (1, 'Tom')]

Writing to a SQLite3 database is similarly easy

import sqlite3
from collections import namedtuple

with sqlite3.connect(':memory:') as conn:
    conn.execute('CREATE TABLE user (id INT, name TEXT)')
    conn.commit()
    User = namedtuple('User', 'id name')
    
    # Write using a specific query
    seq([(1, 'pedro'), (2, 'fritz')]).to_sqlite3(conn, 'INSERT INTO user (id, name) VALUES (?, ?)')
    
    # Write by inserting values positionally from a tuple/list into named table
    seq([(3, 'sam'), (4, 'stan')]).to_sqlite3(conn, 'user')
    
    # Write by inferring schema from namedtuple
    seq([User(name='tom', id=5), User(name='keiga', id=6)]).to_sqlite3(conn, 'user')
    
    # Write by inferring schema from dict
    seq([dict(name='david', id=7), dict(name='jordan', id=8)]).to_sqlite3(conn, 'user')
    
    # Read everything back to make sure it wrote correctly
    print(list(conn.execute('SELECT * FROM user')))
    
    # [(1, 'pedro'), (2, 'fritz'), (3, 'sam'), (4, 'stan'), (5, 'tom'), (6, 'keiga'), (7, 'david'), (8, 'jordan')]

Writing to files

Just as PyFunctional can read from csv, json, jsonl, sqlite3, and text files, it can also write them. For complete API documentation see the collections API table or the official docs.

Compressed Files

PyFunctional will auto-detect files compressed with gzip, lzma/xz, and bz2. This is done by examining the first several bytes of the file to determine if it is compressed so therefore requires no code changes to work.

To write compressed files, every to_ function has a parameter compression which can be set to the default None for no compression, gzip or gz for gzip compression, lzma or xz for lzma compression, and bz2 for bz2 compression.

Parallel Execution

The only change required to enable parallelism is to import from functional import pseq instead of from functional import seq and use pseq where you would use seq. The following operations are run in parallel with more to be implemented in a future release:

  • map/select
  • filter/filter_not/where
  • flat_map

Parallelization uses python multiprocessing and squashes chains of embarrassingly parallel operations to reduce overhead costs. For example, a sequence of maps and filters would be executed all at once rather than in multiple loops using multiprocessing

Documentation

Shortform documentation is below and full documentation is at docs.pyfunctional.org.

Streams API

All of PyFunctional streams can be accessed through the seq object. The primary way to create a stream is by calling seq with an iterable. The seq callable is smart and is able to accept multiple types of parameters as shown in the examples below.

# Passing a list
seq([1, 1, 2, 3]).to_set()
# [1, 2, 3]

# Passing direct arguments
seq(1, 1, 2, 3).map(lambda x: x).to_list()
# [1, 1, 2, 3]

# Passing a single value
seq(1).map(lambda x: -x).to_list()
# [-1]

seq also provides entry to other streams as attribute functions as shown below.

# number range
seq.range(10)

# text file
seq.open('filepath')

# json file
seq.json('filepath')

# jsonl file
seq.jsonl('filepath')

# csv file
seq.csv('filepath')
seq.csv_dict_reader('filepath')

# sqlite3 db and sql query
seq.sqlite3('filepath', 'select * from data')

For more information on the parameters that these functions can take, reference the streams documentation

Transformations and Actions APIs

Below is the complete list of functions which can be called on a stream object from seq. For complete documentation reference transformation and actions API.

Function Description Type
map(func)/select(func) Maps func onto elements of sequence transformation
starmap(func)/smap(func) Apply func to sequence with itertools.starmap transformation
filter(func)/where(func) Filters elements of sequence to only those where func(element) is True transformation
filter_not(func) Filters elements of sequence to only those where func(element) is False transformation
flatten() Flattens sequence of lists to a single sequence transformation
flat_map(func) func must return an iterable. Maps func to each element, then merges the result to one flat sequence transformation
group_by(func) Groups sequence into (key, value) pairs where key=func(element) and value is from the original sequence transformation
group_by_key() Groups sequence of (key, value) pairs by key transformation
reduce_by_key(func) Reduces list of (key, value) pairs using func transformation
count_by_key() Counts occurrences of each key in list of (key, value) pairs transformation
count_by_value() Counts occurrence of each value in a list transformation
union(other) Union of unique elements in sequence and other transformation
intersection(other) Intersection of unique elements in sequence and other transformation
difference(other) New sequence with unique elements present in sequence but not in other transformation
symmetric_difference(other) New sequence with unique elements present in sequence or other, but not both transformation
distinct() Returns distinct elements of sequence. Elements must be hashable transformation
distinct_by(func) Returns distinct elements of sequence using func as a key transformation
drop(n) Drop the first n elements of the sequence transformation
drop_right(n) Drop the last n elements of the sequence transformation
drop_while(func) Drop elements while func evaluates to True, then returns the rest transformation
take(n) Returns sequence of first n elements transformation
take_while(func) Take elements while func evaluates to True, then drops the rest transformation
init() Returns sequence without the last element transformation
tail() Returns sequence without the first element transformation
inits() Returns consecutive inits of sequence transformation
tails() Returns consecutive tails of sequence transformation
zip(other) Zips the sequence with other transformation
zip_with_index(start=0) Zips the sequence with the index starting at start on the right side transformation
enumerate(start=0) Zips the sequence with the index starting at start on the left side transformation
cartesian(*iterables, repeat=1) Returns cartesian product from itertools.product transformation
inner_join(other) Returns inner join of sequence with other. Must be a sequence of (key, value) pairs transformation
outer_join(other) Returns outer join of sequence with other. Must be a sequence of (key, value) pairs transformation
left_join(other) Returns left join of sequence with other. Must be a sequence of (key, value) pairs transformation
right_join(other) Returns right join of sequence with other. Must be a sequence of (key, value) pairs transformation
join(other, join_type='inner') Returns join of sequence with other as specified by join_type. Must be a sequence of (key, value) pairs transformation
partition(func) Partitions the sequence into elements which satisfy func(element) and those that don't transformation
grouped(size) Partitions the elements into groups of size size transformation
sorted(key=None, reverse=False)/order_by(func) Returns elements sorted according to python sorted transformation
reverse() Returns the reversed sequence transformation
slice(start, until) Sequence starting at start and including elements up to until transformation
head() / first() Returns first element in sequence action
head_option() Returns first element in sequence or None if its empty action
last() Returns last element in sequence action
last_option() Returns last element in sequence or None if its empty action
len() / size() Returns length of sequence action
count(func) Returns count of elements in sequence where func(element) is True action
empty() Returns True if the sequence has zero length action
non_empty() Returns True if sequence has non-zero length action
all() Returns True if all elements in sequence are truthy action
exists(func) Returns True if func(element) for any element in the sequence is True action
for_all(func) Returns True if func(element) is True for all elements in the sequence action
find(func) Returns the element that first evaluates func(element) to True action
any() Returns True if any element in sequence is truthy action
max() Returns maximal element in sequence action
min() Returns minimal element in sequence action
max_by(func) Returns element with maximal value func(element) action
min_by(func) Returns element with minimal value func(element) action
sum()/sum(projection) Returns the sum of elements possibly using a projection action
product()/product(projection) Returns the product of elements possibly using a projection action
average()/average(projection) Returns the average of elements possibly using a projection action
aggregate(func)/aggregate(seed, func)/aggregate(seed, func, result_map) Aggregate using func starting with seed or first element of list then apply result_map to the result action
fold_left(zero_value, func) Reduces element from left to right using func and initial value zero_value action
fold_right(zero_value, func) Reduces element from right to left using func and initial value zero_value action
make_string(separator) Returns string with separator between each str(element) action
dict(default=None) / to_dict(default=None) Converts a sequence of (Key, Value) pairs to a dictionary. If default is not None, it must be a value or zero argument callable which will be used to create a collections.defaultdict action
list() / to_list() Converts sequence to a list action
set() / to_set() Converts sequence to a set action
to_file(path) Saves the sequence to a file at path with each element on a newline action
to_csv(path) Saves the sequence to a csv file at path with each element representing a row action
to_jsonl(path) Saves the sequence to a jsonl file with each element being transformed to json and printed to a new line action
to_json(path) Saves the sequence to a json file. The contents depend on if the json root is an array or dictionary action
to_sqlite3(conn, tablename_or_query, *args, **kwargs) Save the sequence to a SQLite3 db. The target table must be created in advance. action
to_pandas(columns=None) Converts the sequence to a pandas DataFrame action
cache() Forces evaluation of sequence immediately and caches the result action
for_each(func) Executes func on each element of the sequence action
peek(func) Executes func on each element of the sequence but returns the element transformation

Lazy Execution

Whenever possible, PyFunctional will compute lazily. This is accomplished by tracking the list of transformations that have been applied to the sequence and only evaluating them when an action is called. In PyFunctional this is called tracking lineage. This is also responsible for the ability for PyFunctional to cache results of computation to prevent expensive re-computation. This is predominantly done to preserve sensible behavior and used sparingly. For example, calling size() will cache the underlying sequence. If this was not done and the input was an iterator, then further calls would operate on an expired iterator since it was used to compute the length. Similarly, repr also caches since it is most often used during interactive sessions where its undesirable to keep recomputing the same value. Below are some examples of inspecting lineage.

def times_2(x):
    return 2 * x

elements = (
   seq(1, 1, 2, 3, 4)
      .map(times_2)
      .peek(print)
      .distinct()
)

elements._lineage
# Lineage: sequence -> map(times_2) -> peek(print) -> distinct

l_elements = elements.to_list()
# Prints: 1
# Prints: 1
# Prints: 2
# Prints: 3
# Prints: 4

elements._lineage
# Lineage: sequence -> map(times_2) -> peek(print) -> distinct -> cache

l_elements = elements.to_list()
# The cached result is returned so times_2 is not called and nothing is printed

Files are given special treatment if opened through the seq.open and related APIs. functional.util.ReusableFile implements a wrapper around the standard python file to support multiple iteration over a single file object while correctly handling iteration termination and file closing.

Road Map Idea

  • SQL based query planner and interpreter
  • _ lambda operator

Contributing and Bug Fixes

Any contributions or bug reports are welcome. Thus far, there is a 100% acceptance rate for pull requests and contributors have offered valuable feedback and critique on code. It is great to hear from users of the package, especially what it is used for, what works well, and what could be improved.

To contribute, create a fork of PyFunctional, make your changes, then make sure that they pass. In order to be merged, all pull requests must:

  • Pass all the unit tests
  • Pass all the pylint tests, or ignore warnings with explanation of why its correct to do so
  • Not significantly reduce covrage without a good reason coveralls.io)
  • Edit the CHANGELOG.md file in the Next Release heading with changes

Contact

Gitter for chat

Supported Python Versions

  • PyFunctional 1.4 and above supports and is tested against Python 3.6, Python 3.7, and PyPy3
  • PyFunctional 1.4 and above does not support python 2.7
  • PyFunctional 1.4 and above works in Python 3.5, but is not tested against it
  • PyFunctional 1.4 and above partially works in 3.8, parallel processing currently has issues, but other feature work fine
  • PyFunctional 1.3 and below supports and was tested against Python 2.7, Python 3.5, Python 3.6, PyPy2, and PyPy3

Changelog

Changelog

About me

To learn more about me (the author) visit my webpage at pedro.ai.

I created PyFunctional while using Python extensively, and finding that I missed the ease of use for manipulating data that Spark RDDs and Scala collections have. The project takes the best ideas from these APIs as well as LINQ to provide an easy way to manipulate data when using Scala is not an option or PySpark is overkill.

Contributors

These people have generously contributed their time to improving PyFunctional

Comments
  • Parallel Execution Engine

    Parallel Execution Engine

    Creating issue to discuss potential of implementing a parallel execution engine. From the 0.5.0 milestone this might include:

    The first possibility is to abstract the execution engine away so that ScalaFunctional can use either a sequential or parallel execution engine. This would need to be done through a combination of multiprocessing and determining where it could be used without creating massive code duplication. Additionally, this would require writing completely new tests and infrastructure since order is not guaranteed, but expected in the current sequential tests.

    feature 
    opened by EntilZha 49
  • `grouped` Forces pre-compute of sequence

    `grouped` Forces pre-compute of sequence

    The current implementation of grouped

    def grouped_impl(wrap, size, sequence):
        """
        Implementation for grouped_t
        :param wrap: wrap children values with this
        :param size: size of groups
        :param sequence: sequence to group
        :return: grouped sequence
        """
        for i in range(0, len(sequence), size):
            yield wrap(sequence[i:i + size])
    

    seems computing the whole sequence to apply len(sequence). Am I correct?

    I find my code like bellow doesn't print anything to the console.

    (
        seq(fp_of_very_large_file)
            .map(parse_func)
            .grouped(1)
            .for_each(print)
    )
    

    Once I dropped .grouped(1), the console showed data.

    bug 
    opened by lucidfrontier45 18
  • Insert into sqlite3

    Insert into sqlite3

    Initial implementation to sqlite3. Currently it behaves like to_csv, only accepts tuple or structure as element. Insertion SQL must be supplied and target table must be created in advance.

    I'm still considering how to insert dict.

    opened by lucidfrontier45 16
  • `tail` differing from the Scala version

    `tail` differing from the Scala version

    From documentation: "Selects all elements except the first."

    Your version: "get last element".

    Any good reason behind it? I can change it to the former, and also implement stuff like inits and tails.

    opened by adrian17 14
  • support compression in File I/O functions

    support compression in File I/O functions

    I could be usefull if stream functions like seq.open, seq.csv etc can read compressed files like Spark sc.textFile.

    Also writing a compressed file by to_file, to_csv etc is great.

    feature roadmap in progress 
    opened by lucidfrontier45 10
  • Fix for the grouped() and to_csv() functions

    Fix for the grouped() and to_csv() functions

    Closes #120 and #121 by making the following changes:

    • Fixes #120: added list() to the function "group_impl"'s yield
    • Fixes #121: added the argument "newline" to the function to_csv() and to the universal_write_open()

    I have ran the tests on my Windows machine at work and all are passing. Will check on my linux after reaching home.

    opened by abybaddi009 9
  • Test on pypy2/3 plus irrelevant bug on tests discovered by the build

    Test on pypy2/3 plus irrelevant bug on tests discovered by the build

    Tests succeed on pypy2/3 (thus added to travis) except for some tests on sets that were accidentally written as deterministic. (I fixed them in the same PR to keep the build clean)


    By the way, while set(seq('abc')) gives the expected in the shell, in tests I get an empty set and can only use .the set() method. Probably assertSetEqual(result, set(expected)) doesn't work for the same reason.

    Looks to me like a bug you may want to investigate.

    opened by Digenis 9
  • Implement functions with generators

    Implement functions with generators

    Re-implement/change all the functions in the library to be compatible with generators. Currently, sequential calls to transformations produces a new list between each transformation even if it is only used for the next transformation, not in the result. This is wasteful and could be eliminated using generators.

    Targeting this to be the large (potentially breaking, hopefully not though) feature of 0.2.0 while 0.1.7 will be used to add more utility functions.

    feature 
    opened by EntilZha 9
  • for_each doesn't return the original sequence but returns None

    for_each doesn't return the original sequence but returns None

    I would have expected for_each(func) to apply the func function but to also return the unmodified sequence.

    Eg I would expect the following code, not to fail(as it happens in my case):

        l = (seq([1, 2, 3, 4])
        .for_each(print)
        .map(lambda x: x + 1))
       print(l)
    

    But to produce:

    1
    2
    3
    4
    [2, 3, 4, 5]
    

    My fix: was to modify in functional/pipeline.py for_each to return self

    pip freeze | grep pyfunctional
    pyfunctional==1.4.2
    
    python --version
    Python 3.7.6
    

    pyfunctional was installed today using pip install pyfunctional

    I will shortly create a pull request with this functionality & test associated with it

    opened by bmsan 8
  • Add accumulate to Sequence

    Add accumulate to Sequence

    Change Summary

    • add accumulate method to Sequence
    • mirrors itertools.accumulate API

    Why

    The alternative is to use fold_left and then drop the initial value. I believe creating a running sum is a common enough operation to warrent its own implementation.

    In [1]: from functional import seq
    In [2]: seq(1, 2, 3).fold_left([0], lambda acc, curr: acc + [acc[-1] + curr]).tail()
    Out[2]: [1, 3, 6]
    

    versus

    In [1]: from functional import seq
    In [2]: seq(1, 2, 3).accumulate()
    Out[2]: [1, 3, 6]
    
    opened by ameier38 8
  • Implement data passing functions

    Implement data passing functions

    So far the only way to ingest data into ScalaFunctional is to read through using python defined data structures. It would be helpful to be able to read directly from data formats such as json/sql/csv.

    Target milestone for everything completed will be 0.4.0.

    This issue will serve as a parent issue for implementing each specific function.

    Child issues: #34 ~~seq.open~~ #35 ~~seq.range~~ #36 ~~seq.csv~~ #37 ~~seq.jsonl~~ #29 ~~seq.json~~ #30 ~~to_json~~ #31 ~~to_csv~~ #32 ~~to_file~~ #33 ~~to_jsonl~~

    feature roadmap 
    opened by EntilZha 8
  • Add the split transformation

    Add the split transformation

    Hello everyone,

    This is my first PR ever, but I'm willing to learn, so don't hesitate to be picky.

    I was doing the advent of code 2022 with a constraint to use only Pyfunctional so that I could learn the library. I was blocked on the first problem because I felt it was missing a split function.

    The goal of such a function is to take a sequence and split it into several lists given a function. The function maps over the sequence and splits if it returns True or continues if it returns False. An example is worth a million words, so here it is:

    seq([1,2,3,None,4,5,6,7,None,8,9]).split(lambda x: x is None) 
    >>> seq([ [1,2,3], [4,5,6,7], [8,9] ])
    

    I tried to respect the rules for PR, but if I missed something feel free to comment ;)

    Have a good day!

    opened by AlexandreKempf 2
  • More dictionary/tuple list helpers

    More dictionary/tuple list helpers

    Out of curiosity, I was wondering why there aren't filter/value equivalents of functions that operate on a list of tuples, eg filter_by_key, group_by_value, or map_values. As one use case, It'd be great to have more support for operations after calling group_by.

    I really enjoy using Pyfunctional, and I'm happy to contribute if there are no blockers here!

    stale 
    opened by foresthu2006 1
  • [Question] VSCode language server doesn't see the pyfunctional package

    [Question] VSCode language server doesn't see the pyfunctional package

    Hello,

    I've noticed that in visual studio code the inteligent indexer/autocomplete functionality(I think provided by Jedi) isn't able to see pyfunctional (at least on my setup).

    By this I mean it doesn't see any of the imported objects and it doesn't see "functional" as a package. Clicking on a function/module from other packages takes me directly to their source code but for pyfunctional nothing is happening.

    Running and debugging works fine so the module is installed correctly, but something is bugging VsCode.

    While this is more of an issue related to VsCode/Jedi I was wondering if anybody else is experiencing this or if this is something maybe related to a strange combination of packages that I might have in my enviroment.

    I've tried this on two different machines and got the same behavior.

    Thanks!

    evergreen 
    opened by bmsan 3
  • Cache to file and auto-load from cache

    Cache to file and auto-load from cache

    It would be awesome if there was an easy way to cache results to a file and if the cache was found then load from the cache without recomputing.

    To basically support the following usecase/api

    # main.py
    
    def expensive_function(x):
       """Consider an expensive to compute function"""
       print(x, end=" ")
       return x**2
    
    s = seq([x for x in range(4)], cache_dir="/path/to/cache/dir")
    
    expensive_result = s.map(expensive_function)\
        .with_caching(name="expensive_result")
    
    expensive_result.for_each(lambda x: print(f"\n", x, end=""))
    

    First run:

    $ python main.py
    0 1 2 3
    0
    1
    4
    9
    

    Second run (the first run should have created the cache file which will now be used instead of recomputing the sequence expensive_result):

    $ python main.py
    
    0
    1
    4
    9
    

    Please let me know if you have any questions or would like some clarifications.


    Loving the project so far, thanks for your effort!

    evergreen 
    opened by hXtreme 4
  • seq() converts pandas DataFrame into Sequence

    seq() converts pandas DataFrame into Sequence

    from functional import seq
    from pandas import DataFrame
    
    df = DataFrame({'col1': [1,2,3], 'col2': [4,5,6]})
    s = seq([df])
    el = s.first()
    print(type(el))
    

    this code prints: "<class 'functional.pipeline.Sequence'>" but the expected output is "<class 'pandas.core.frame.DataFrame'>"

    evergreen 
    opened by Arshaku 8
  • lazy_parallellize having trouble with function context?

    lazy_parallellize having trouble with function context?

    I'm using a function defined in the current file in pseq, and seems it errors out not being able to find other referenced functions or even simple types like Dict. This works fine when using seq.

    I think the problem is with pickling the target function in lazy_parallelize:

        partitions = split_every(partition_size, iter(result))
        packed_partitions = (pack(func, (partition,)) for partition in partitions)
        for pool_result in pool.imap(unpack, packed_partitions):
            yield pool_result
        pool.terminate()
    

    I executed on my own the function with pool.imap and works fine.

    Wouldn't it be better not to use pickling to avoid these kind of problems?

    evergreen 
    opened by larroy 3
Releases(v1.4.3)
  • v1.0.0(Feb 2, 2017)

    Reaching 1.0 primarily means that API stability has been reached so I don't expect to run into many new breaking changes. The library is also relatively feature complete at this point almost two years after the first commit (February 5, 2015).

    This release includes several new minor features and usability improvements in jupyter notebook environments

    New Features

    • Added optional initial value for reduce (https://github.com/EntilZha/PyFunctional/issues/86)
    • Added table of contents to readme (https://github.com/EntilZha/PyFunctional/issues/88)
    • Added data interchange tutorial with pandas (https://github.com/EntilZha/PyFunctional/blob/master/examples/PyFunctional-pandas-tutorial.ipynb)
    • Implemented itertools.starmap as Sequence.starmap and Sequence.smap (https://github.com/EntilZha/PyFunctional/issues/90)
    • Added interface to csv.DictReader via seq.csv_dict_reader (https://github.com/EntilZha/PyFunctional/issues/92)
    • Improved _html_repr_, show and tabulate by auto detecting named tuples as column names (https://github.com/EntilZha/PyFunctional/issues/91)
    • Improved _html_repr_ and show to tell the user 10 of N rows are being shown if there are more than 10 rows (https://github.com/EntilZha/PyFunctional/issues/94)

    Dependencies and Supported Python Versions

    • Bumped version dependencies (https://github.com/EntilZha/PyFunctional/issues/89)
    • Added Python 3.6 via Travis CI testing
    Source code(tar.gz)
    Source code(zip)
  • v0.7.1(Jun 8, 2016)

    This is a hotfix release which separates Python 2 and 3 wheels on PyPI. This is primarily motivated by the different installation requirements for each. The python 2 version has dependencies on several libraries that are backports of python 3 libraries.

    Source code(tar.gz)
    Source code(zip)
  • v0.7.0(Jun 6, 2016)

    New Features

    • Auto parallelization by using pseq instead of seq. Details at https://github.com/EntilZha/PyFunctional/issues/47
    • Parallel functions: map, select, filter, filter_not, where, flatten, and flat_map
    • Compressed file IO support for gzip/lzma/bz2 as detailed at https://github.com/EntilZha/PyFunctional/issues/54
    • Cartesian product from itertools.product implemented as Pipeline.cartesian
    • Website at pyfunctional.org and docs at docs.pyfunctional.org

    Bug Fixes

    • No option for encoding in to_json https://github.com/EntilZha/PyFunctional/issues/70

    Internal Changes

    • Pinned versions of all dependencies

    Contributors

    • Thanks to versae for implementing most of the pseq feature!
    • Thanks to ChuyuHsu for implemented large parts of the compression feature!
    Source code(tar.gz)
    Source code(zip)
  • v0.6.0(Mar 30, 2016)

    Largest changes in this release are adding SQLite support and changing the project name to PyFunctional.

    Name Change

    Details can be found in the RFC issue. On PyPI, 0.6.0 was published as PyFunctional and ScalaFunctional to support transition to new name. Overall, name change better suits the package as it is about functional programming with python, even if it is inspired by Scala/Spark.

    New Features

    • Added support for reading to and from SQLite databases with seq.sqlite3
    • Added to_pandas call integration

    Internal Changes

    • Changed code quality check service
    Source code(tar.gz)
    Source code(zip)
  • v0.5.0(Jan 1, 2016)

    Release 0.5.0 is a few new features and bug fixes grouped into a release.

    Breaking Changes

    • Sequence.zip_with_index has modified behavior to extend usability and conform to scala/spark APIs which breaks prior compatibility. The drop in replacement to fix this issue in code bases upgrading to 0.5.0 is changing zip_with_index to enumerate.

    New Features

    • Delimiter option on to_file
    • Sequence.sliding for sliding windows over sequence of elements

    Internal Changes

    • Changed relative imports to absolute imports

    Bug Fixes

    • _wrap incorrectly converted tuples to arrays
    • to_file documentation fixed
    • Prior mentioned zip_with_index in breaking changes

    Changelog: https://github.com/EntilZha/ScalaFunctional/blob/master/CHANGELOG.md Milestone: https://github.com/EntilZha/ScalaFunctional/milestones/0.5.0

    Source code(tar.gz)
    Source code(zip)
  • v0.4.1(Nov 4, 2015)

    The primary goals of this release were to:

    1. Support reading and writing data from files in common formats
    2. Improve LINQ support

    Reading and Writing text, json, jsonl, and csv

    The large feature additions of this release include functions to natively read and write from text, json, jsonl, and csv files. Details on the issue can be found at #19. The examples on the README.md page illustrate how these can be used and their usefulness. A full list of changes can be found in CHANGELOG.md or the copy of it at the bottom of the release notes.

    LINQ

    In doing research I found that a common use case where ScalaFunctional could be helpful is in doing LINQ-like data manipulation. To better serve this group of users functions like select and where were added, and documentation was improved to cover this use case.

    Breaking Changes

    The bug detailed at #44 exposed that fold_left and fold_right was using the passed function incorrectly. This was corrected, but is a breaking change to all prior versions.

    0.4.1 enum34 Removed

    In the release of 0.4.0 a issue was found where the wheel built with python2 contained enum34 which broke the python3 installation. If it were built with python3, then it would not include enum34 causing problems with python2. The solution was to remove enum34 and use vanilla python instead.

    Changelog

    Release 0.4.0

    New Features

    • Official and tested support for python 3.5. Thus ScalaFunctional is tested on Python 2.7, 3.3, 3.4, 3.5, pypy, and pypy3
    • aggregate from LINQ
    • order_by from LINQ
    • where from LINQ
    • select from LINQ
    • average from LINQ
    • sum modified to allow LINQ projected sum
    • product modified to allow LINQ projected product
    • seq.jsonl to read jsonl files
    • seq.json to read json files
    • seq.open to read files
    • seq.csv to read csv files
    • seq.range to create range sequences
    • Sequence.to_jsonl to save jsonl files
    • Sequence.to_json to save json files
    • Sequence.to_file to save files
    • Sequence.to_csv to save csv files
    • Improved documentation with more examples and mention LINQ explicitly
    • Change PyPi keywords to improve discoverability
    • Created Google groups mailing list

    Bug Fixes

    • fold_left and fold_right had incorrect order of arguments for passed function

    Release 0.4.1

    Fix python 3 build error due to wheel installation of enum34. Package no longer depends on enum34

    Contributors

    Thank you to adrian17 for contributing seq.range to the release.

    Source code(tar.gz)
    Source code(zip)
  • v0.4.0(Nov 4, 2015)

    Refer to the release notes for 0.4.1 for summary of changes in 0.4.0. Both versions are nearly identical with 0.4.1 being a hotfix to a pip install issue on python 3

    Source code(tar.gz)
    Source code(zip)
  • v0.3.1(Jul 22, 2015)

    This is a very minor release which adds distinct_by to the API. distinct_by takes a single identity function as argument. The returned sequence is unique by the identity function and consists of the first element found for each identity key. Code example below:

    from functional import seq
    
    seq([(1, 2), (1, 3), (2, 3), (4, 5), (0, 1), (0, 0)]).distinct_by(lambda x: x[0])
    # [(0, 1), (1, 2), (2, 3), (4, 5)]
    
    Source code(tar.gz)
    Source code(zip)
  • v0.3.0(Jun 9, 2015)

    The primary goal of this release was to improve performance of longer data pipelines. Additionally, there were additional API additions and several minor breaking changes.

    Performance Improvements

    The largest under the hood change is changing all operations to be lazy by default. 0.2.0 calculates a new list at every transformation. This was initially implemented using generators, but this could lead to unexpected behavior. The problem with this approach is highlighted in #20. Code sample below:

    from functional import seq
    def gen():
        for e in range(5):
        yield e
    
    nums = gen()
    s = seq(nums)
    s.map(lambda x: x * 2).sum()
    # prints 20
    s.map(lambda x: x * 2).sum()
    # prints 0
    s = seq([1, 2, 3, 4])
    a = s.map(lambda x: x * 2)
    a.sum()
    # prints 20
    a.sum()
    # prints 0
    

    Either, ScalaFunctional would need to aggressively cache results or a new approach was needed. That approach is called lineage. The basic concept is that ScalaFunctional:

    1. Tracks the most recent concrete data (eg list of objects)
    2. Tracks the list of transformations that need to be applied to the list to find the answer
    3. Whenever an expression is evaluated, the result is cached for (1) and returned

    The result is the problems above are fixed, below is an example showing how the backend calculates results:

    from functional import seq
    
    In [8]: s = seq(1, 2, 3, 4)
    
    In [9]: s._lineage
    Out[9]: Lineage: sequence
    
    In [10]: s0 = s.map(lambda x: x * 2)
    
    In [11]: s0._lineage
    Out[11]: Lineage: sequence -> map(<lambda>)
    
    In [12]: s0
    Out[12]: [2, 4, 6, 8]
    
    In [13]: s0._lineage
    Out[13]: Lineage: sequence -> map(<lambda>) -> cache
    

    Note how initially, since the expression is not evaluated, it is not cached. Since printing s0 in the repl calls __repr__, it is evaluated and cached so it is not recomputed if s0 is used again. You can also call cache() directly if desired. You may also notice that seq can now take a list of arguments like list (added in #27).

    Next up

    Improvements in documentation and redo of README.md. Next release will be focused on extending ScalaFunctional further to work with other data input/output and more usability improvements. This release also marks relative stability in the collections API. Everything that seemed worth porting from Scala/Spark has been completed with a few additions (predominantly left, right, inner, and outer joins). There aren't currently any foreseeable breaking changes.

    Source code(tar.gz)
    Source code(zip)
  • v0.2.0(Mar 23, 2015)

    The primary goal of this release was to implement a number of functions which were missing from the ScalaFunctional library.

    Features

    • Comprehensive documentation for every function, with examples
    • Set-like manipulation: union, intersection, difference, symmetric_difference
    • Joins: join list of (Key, Value) pairs using inner, outer, left, and right joins
    • Added many different functions for taking/dropping elements in sequence
    • Extended min/max with min_by and max_by
    • Extended attribute access to check if attribute exists on sequence before erroring

    With these changes, the API is both stable and reasonably complete. The goal is to support all major operations from Scala arrays and Spark. That goal seems to be complete.

    Next Release

    The next release (small or larger version) will be focused on improving performance using generators and extending the API if necessary.

    Source code(tar.gz)
    Source code(zip)
Owner
Pedro Rodriguez
Research Scientist; NLP, Question Answering, Data + Eval; PhD in CS from UMD, Berkeley CS Grad; Prev: Google AI, FB, MS Research, Riot Games, Oracle, Zillow
Pedro Rodriguez
Functional tensors for probabilistic programming

Funsor Funsor is a tensor-like library for functions and distributions. See Functional tensors for probabilistic programming for a system description.

null 208 Dec 29, 2022
Tuplex is a parallel big data processing framework that runs data science pipelines written in Python at the speed of compiled code

Tuplex is a parallel big data processing framework that runs data science pipelines written in Python at the speed of compiled code. Tuplex has similar Python APIs to Apache Spark or Dask, but rather than invoking the Python interpreter, Tuplex generates optimized LLVM bytecode for the given pipeline and input data set.

Tuplex 791 Jan 4, 2023
TE-dependent analysis (tedana) is a Python library for denoising multi-echo functional magnetic resonance imaging (fMRI) data

tedana: TE Dependent ANAlysis TE-dependent analysis (tedana) is a Python library for denoising multi-echo functional magnetic resonance imaging (fMRI)

null 136 Dec 22, 2022
simple way to build the declarative and destributed data pipelines with python

unipipeline simple way to build the declarative and distributed data pipelines. Why you should use it Declarative strict config Scaffolding Fully type

aliaksandr-master 0 Jan 26, 2022
Streamz helps you build pipelines to manage continuous streams of data

Streamz helps you build pipelines to manage continuous streams of data. It is simple to use in simple cases, but also supports complex pipelines that involve branching, joining, flow control, feedback, back pressure, and so on.

Python Streamz 1.1k Dec 28, 2022
This tool parses log data and allows to define analysis pipelines for anomaly detection.

logdata-anomaly-miner This tool parses log data and allows to define analysis pipelines for anomaly detection. It was designed to run the analysis wit

AECID 32 Nov 27, 2022
Building house price data pipelines with Apache Beam and Spark on GCP

This project contains the process from building a web crawler to extract the raw data of house price to create ETL pipelines using Google Could Platform services.

null 1 Nov 22, 2021
Data pipelines built with polars

valves Warning: the project is very much work in progress. Valves is a collection of functions for your data .pipe()-lines. This project aimes to host

null 14 Jan 3, 2023
Very basic but functional Kakuro solver written in Python.

kakuro.py Very basic but functional Kakuro solver written in Python. It uses a reduction to exact set cover and Ali Assaf's elegant implementation of

Louis Abraham 4 Jan 15, 2022
WithPipe is a simple utility for functional piping in Python.

A utility for functional piping in Python that allows you to access any function in any scope as a partial.

Michael Milton 1 Oct 26, 2021
A utility for functional piping in Python that allows you to access any function in any scope as a partial.

WithPartial Introduction WithPartial is a simple utility for functional piping in Python. The package exposes a context manager (used with with) calle

Michael Milton 1 Oct 26, 2021
A probabilistic programming library for Bayesian deep learning, generative models, based on Tensorflow

ZhuSuan is a Python probabilistic programming library for Bayesian deep learning, which conjoins the complimentary advantages of Bayesian methods and

Tsinghua Machine Learning Group 2.2k Dec 28, 2022
Creating a statistical model to predict 10 year treasury yields

Predicting 10-Year Treasury Yields Intitially, I wanted to see if the volatility in the stock market, represented by the VIX index (data source), had

null 10 Oct 27, 2021
Probabilistic Programming in Python: Bayesian Modeling and Probabilistic Machine Learning with Theano

PyMC3 is a Python package for Bayesian statistical modeling and Probabilistic Machine Learning focusing on advanced Markov chain Monte Carlo (MCMC) an

PyMC 7.2k Dec 30, 2022
Deep universal probabilistic programming with Python and PyTorch

Getting Started | Documentation | Community | Contributing Pyro is a flexible, scalable deep probabilistic programming library built on PyTorch. Notab

null 7.7k Dec 30, 2022
A probabilistic programming language in TensorFlow. Deep generative models, variational inference.

Edward is a Python library for probabilistic modeling, inference, and criticism. It is a testbed for fast experimentation and research with probabilis

Blei Lab 4.7k Jan 9, 2023
Amundsen is a metadata driven application for improving the productivity of data analysts, data scientists and engineers when interacting with data.

Amundsen is a metadata driven application for improving the productivity of data analysts, data scientists and engineers when interacting with data.

Amundsen 3.7k Jan 3, 2023
Elementary is an open-source data reliability framework for modern data teams. The first module of the framework is data lineage.

Data lineage made simple, reliable, and automated. Effortlessly track the flow of data, understand dependencies and analyze impact. Features Visualiza

null 898 Jan 9, 2023