streampq
Stream results of multi-statement PostgreSQL queries from Python without server-side cursors. Has benefits over some other Python PostgreSQL libraries:
-
Streams results from complex multi-statement queries even though SQL doesn't allow server-side cursors for such queries - suitable for large amounts of results that don't fit in memory.
-
CTRL+C (SIGINT) by default behaves as expected even during slow queries - a
KeyboardInterrupt
is raised and quickly bubbles up through streampq code. Unless client code prevents it, the program will exit. -
Every effort is made to cancel queries on
KeyboardInterrupt
,SystemExit
, or errors - the server doesn't continue needlessly using resources.
Particularly useful when temporary tables are needed to store intermediate results in multi-statement SQL scripts.
Installation
pip install streampq
The libpq
binary library is also required. This is typically either already installed, or installed by:
- macOS + brew:
brew install libpq
- Linux (Debian):
apt install libpq5
- Linux (Red Hat):
yum install postgresql-libs
The only runtime dependencies are libpq
and Python itself.
Usage
from streampq import streampq_connect
# libpq connection paramters
# https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-PARAMKEYWORDS
#
# Any can be ommitted and environment variables will be used instead
# https://www.postgresql.org/docs/current/libpq-envars.html
connection_params = (
('host', 'localhost'),
('port', '5432'),
('dbname', 'postgres'),
('user', 'postgres'),
('password', 'password'),
)
# SQL statement(s) - if more than one, separate by ;
sql = '''
SELECT * FROM my_table;
SELECT * FROM my_other_table;
'''
# Connection and querying is via a context manager
with streampq_connect(connection_params) as query:
for (columns, rows) in query(sql):
print(columns) # Tuple of column names
for row in rows:
print(row) # Tuple of row values
PostgreSQL types to Python type decoding
There are 164 built-in PostgreSQL data types (including array types), and streampq converts them to Python types. In summary:
PostgreSQL types | Python type |
---|---|
null | None |
text (e.g. varchar), xml, network addresses, and money | str |
byte (e.g. bytea) | bytes |
integer (e.g. int4) | int |
inexact real number (e.g. float4) | float |
exact real number (e.g. numeric) | Decimal |
date | date |
timestamp | datetime (without timezone) |
timestamptz | datetime (with offset timezone) |
json and jsonb | output of json.loads |
interval | streampq.Interval |
range (e.g. daterange) | streampq.Range |
multirange (e.g. datemultirange) | tuples of streampq.Range |
arrays and vectors | tuple (of any of the above types, or of nested tuples) |
To customise these, override the default value of the get_decoders
parameter of the streampq_connect
function in streampq.py.
In general, built-in types are preferred over custom types, and immutable types are preferred over mutable.
streampq.Interval
The Python built-in timedelta type is not used for PostgreSQL interval since timedelta does not offer a way to store PostgreSQL intervals of years or months, other than converting to days which would be a loss of information.
Instead, a namedtuple is defined, streampq.Interval, with members:
Member | Type |
---|---|
years | int |
months | int |
days | int |
hours | int |
minutes | int |
seconds | Decimal |
streampq.Range
There is no Python built-in type for a PosgreSQL range. So for these, a namedtuple is defined, streampq.Range, with members:
Member | Type |
---|---|
lower | int, date, datetime (without timezone), or datetime (with offset timezone) |
upper | int, date, datetime (without timezone), or datetime (with offset timezone) |
bounds | str - one of () , (] , [) , or [] |
Bind parameters - literals
Dynamic SQL literals can be bound using the literals
parameter of the query function. It must be an iterable of key-value pairs.
sql = '''
SELECT * FROM my_table WHERE my_col = {my_col_value};
'''
with streampq_connect(connection_params) as query:
for (columns, rows) in query(sql, literals=(
('my_col_value', 'my-value'),
)):
for row in rows:
pass
Bind parameters - identifiers
Dynamic SQL identifiers, e.g. column names, can be bound using the identifiers
parameter of the query function. It must be an iterable of key-value pairs.
sql = '''
SELECT * FROM my_table WHERE {column_name} = 'my-value';
'''
with streampq_connect(connection_params) as query:
for (columns, rows) in query(sql, identifiers=(
('column_name', 'my_col'),
)):
for row in rows:
pass
Identifiers and literals use different escaping rules - hence the need for 2 different parameters.
Single-statement SQL queries
While this library is specialsed for multi-statement queries, it works fine when there is only one. In this case the iterable returned from the query function yields only a single (columns, rows)
pair.
Exceptions
Exceptions derive from streampq.StreamPQError
. If there is any more information available on the error, it's added as a string in its args
property. This is included in the string representation of the exception by default.
Exception hierarchy
-
StreamPQError
Base class for all explicitly-thrown exceptions
-
ConnectionError
An error occurred while attempting to connect to the database.
-
QueryError
An error occurred while attempting to run a query. Typically this is due to a syntax error or a missing column.
-
CancelError
An error occurred while attempting to cancel a query.
-
CommunicationError
An error occurred communicating with the database after successful connection.
-