PipeChain is a utility library for creating functional pipelines.

Overview

PipeChain

Motivation

PipeChain is a utility library for creating functional pipelines. Let's start with a motivating example. We have a list of Australian phone numbers from our users. We need to clean this data before we insert it into the database. With PipeChain, you can do this whole process in one neat pipeline:

from pipechain import PipeChain, PLACEHOLDER as _

nums = [
    "493225813",
    "0491 570 156",
    "55505488",
    "Barry",
    "02 5550 7491",
    "491570156",
    "",
    "1800 975 707"
]

PipeChain(
    nums
).pipe(
    # Remove spaces
    map, lambda x: x.replace(" ", ""), _
).pipe(
    # Remove non-numeric entries
    filter, lambda x: x.isnumeric(), _
).pipe(
    # Add the mobile code to the start of 8-digit numbers
    map, lambda x: "04" + x if len(x) == 8 else x, _
).pipe(
    # Add the 0 to the start of 9-digit numbers
    map, lambda x: "0" + x if len(x) == 9 else x, _
).pipe(
    # Convert to a set to remove duplicates
    set
).eval()
{'0255507491', '0455505488', '0491570156', '0493225813', '1800975707'}

Without PipeChain, we would have to horrifically nest our code, or else use a lot of temporary variables:

set(
    map(
        lambda x: "0" + x if len(x) == 9 else x,
        map(
            lambda x: "04" + x if len(x) == 8 else x,
            filter(
                lambda x: x.isnumeric(),
                map(
                    lambda x: x.replace(" ", ""),
                    nums
                )
            )
        )
    )
)
{'0255507491', '0455505488', '0491570156', '0493225813', '1800975707'}

Installation

pip install pipechain

Usage

Basic Usage

PipeChain has only two exports: PipeChain, and PLACEHOLDER.

PipeChain is a class that defines a pipeline. You create an instance of the class, and then call .pipe() to add another function onto the pipeline:

from pipechain import PipeChain, PLACEHOLDER
PipeChain(1).pipe(str)
PipeChain(arg=1, pipes=[functools.partial(
   
    )])

   

Finally, you call .eval() to run the pipeline and return the result:

PipeChain(1).pipe(str).eval()
'1'

You can "feed" the pipe at either end, either during construction (PipeChain("foo")), or during evaluation .eval("foo"):

PipeChain().pipe(str).eval(1)
'1'

Each call to .pipe() takes a function, and any additional arguments you provide, both positional and keyword, will be forwarded to the function:

PipeChain(["b", "a", "c"]).pipe(sorted, reverse=True).eval()
['c', 'b', 'a']

Argument Position

By default, the previous value is passed as the first positional argument to the function:

PipeChain(2).pipe(pow, 3).eval()
8

The only magic here is that if you use the PLACEHOLDER variable as an argument to .pipe(), then the pipeline will replace it with the output of the previous pipe at runtime:

PipeChain(2).pipe(pow, 3, PLACEHOLDER).eval()
9

Note that you can rename PLACEHOLDER to something more usable using Python's import statement, e.g.

from pipechain import PLACEHOLDER as _
PipeChain(2).pipe(pow, 3, _).eval()
9

Methods

It might not see like methods will play that well with this pipe convention, but after all, they are just functions. You should be able to access any object's method as a function by accessing it on that object's parent class. In the below example, str is the parent class of "":

"".join(["a", "b", "c"])
'abc'
PipeChain(["a", "b", "c"]).pipe(str.join, "", _).eval()
'abc'

Operators

The same goes for operators, such as +, *, [] etc. We just have to use the operator module in the standard library:

from operator import add, mul, getitem

PipeChain(5).pipe(mul, 3).eval()
15
PipeChain(5).pipe(add, 3).eval()
8
PipeChain(["a", "b", "c"]).pipe(getitem, 1).eval()
'b'

Test Suite

Note, you will need poetry installed.

To run the test suite, use:

git clone https://github.com/multimeric/PipeChain.git
cd PipeChain
poetry install
poetry run pytest test/test.py
You might also like...
Functional Data Analysis, or FDA, is the field of Statistics that analyses data that depend on a continuous parameter. Very basic but functional Kakuro solver written in Python.
Very basic but functional Kakuro solver written in Python.

kakuro.py Very basic but functional Kakuro solver written in Python. It uses a reduction to exact set cover and Ali Assaf's elegant implementation of

Creating a statistical model to predict 10 year treasury yields
Creating a statistical model to predict 10 year treasury yields

Predicting 10-Year Treasury Yields Intitially, I wanted to see if the volatility in the stock market, represented by the VIX index (data source), had

Pandas-based utility to calculate weighted means, medians, distributions, standard deviations, and more.

weightedcalcs weightedcalcs is a pandas-based Python library for calculating weighted means, medians, standard deviations, and more. Features Plays we

small package with utility functions for analyzing (fly) calcium imaging data
small package with utility functions for analyzing (fly) calcium imaging data

fly2p Tools for analyzing two-photon (2p) imaging data collected with Vidrio Scanimage software and micromanger. Loading scanimage data relies on scan

Python utility to extract differences between two pandas dataframes.

Python utility to extract differences between two pandas dataframes.

Zipline, a Pythonic Algorithmic Trading Library
Zipline, a Pythonic Algorithmic Trading Library

Zipline is a Pythonic algorithmic trading library. It is an event-driven system for backtesting. Zipline is currently used in production as the backte

Python Library for learning (Structure and Parameter) and inference (Statistical and Causal) in Bayesian Networks.

pgmpy pgmpy is a python library for working with Probabilistic Graphical Models. Documentation and list of algorithms supported is at our official sit

Comments
  • Add `.pipe(hydrate=False)`

    Add `.pipe(hydrate=False)`

    This would prevent the previous return value from being used as an argument at all in the function. This would always be used with (passthrough=True). For example you might choose to:

    import pdb
    pipeline.pipe(pdb.set_trace, hydrate=False, passthrough=True)
    

    This would set a breakpoint in the middle of the pipeline.

    enhancement 
    opened by multimeric 0
  • Add `.pipe(passthrough=True)`

    Add `.pipe(passthrough=True)`

    This will use the return value of the previous pipe as the return value of the current pipe. This would let you e.g. .pipe(print), for debugging purposes.

    enhancement good first issue 
    opened by multimeric 0
Owner
Michael Milton
Michael Milton
WithPipe is a simple utility for functional piping in Python.

A utility for functional piping in Python that allows you to access any function in any scope as a partial.

Michael Milton 1 Oct 26, 2021
A utility for functional piping in Python that allows you to access any function in any scope as a partial.

WithPartial Introduction WithPartial is a simple utility for functional piping in Python. The package exposes a context manager (used with with) calle

Michael Milton 1 Oct 26, 2021
TE-dependent analysis (tedana) is a Python library for denoising multi-echo functional magnetic resonance imaging (fMRI) data

tedana: TE Dependent ANAlysis TE-dependent analysis (tedana) is a Python library for denoising multi-echo functional magnetic resonance imaging (fMRI)

null 136 Dec 22, 2022
Tuplex is a parallel big data processing framework that runs data science pipelines written in Python at the speed of compiled code

Tuplex is a parallel big data processing framework that runs data science pipelines written in Python at the speed of compiled code. Tuplex has similar Python APIs to Apache Spark or Dask, but rather than invoking the Python interpreter, Tuplex generates optimized LLVM bytecode for the given pipeline and input data set.

Tuplex 791 Jan 4, 2023
Streamz helps you build pipelines to manage continuous streams of data

Streamz helps you build pipelines to manage continuous streams of data. It is simple to use in simple cases, but also supports complex pipelines that involve branching, joining, flow control, feedback, back pressure, and so on.

Python Streamz 1.1k Dec 28, 2022
This tool parses log data and allows to define analysis pipelines for anomaly detection.

logdata-anomaly-miner This tool parses log data and allows to define analysis pipelines for anomaly detection. It was designed to run the analysis wit

AECID 32 Nov 27, 2022
Building house price data pipelines with Apache Beam and Spark on GCP

This project contains the process from building a web crawler to extract the raw data of house price to create ETL pipelines using Google Could Platform services.

null 1 Nov 22, 2021
Data pipelines built with polars

valves Warning: the project is very much work in progress. Valves is a collection of functions for your data .pipe()-lines. This project aimes to host

null 14 Jan 3, 2023
simple way to build the declarative and destributed data pipelines with python

unipipeline simple way to build the declarative and distributed data pipelines. Why you should use it Declarative strict config Scaffolding Fully type

aliaksandr-master 0 Jan 26, 2022
Functional tensors for probabilistic programming

Funsor Funsor is a tensor-like library for functions and distributions. See Functional tensors for probabilistic programming for a system description.

null 208 Dec 29, 2022