aiomisc - miscellaneous utils for asyncio

Overview

aiomisc - miscellaneous utils for asyncio

Coveralls Actions Latest Version

Miscellaneous utils for asyncio.

The complete documentation is available in the following languages:

Installation

Installing from pypi:

pip3 install aiomisc

With uvloop:

pip3 install "aiomisc[uvloop]"

With aiohttp:

pip3 install "aiomisc[aiohttp]"

Installing from github.com:

pip3 install git+https://github.com/aiokitchen/aiomisc.git
pip3 install \
    https://github.com/aiokitchen/aiomisc/archive/refs/heads/master.zip

Versioning

This software follows Semantic Versioning

How to develop?

Should be installed:

  • virtualenv
  • GNU Make as make
  • Python 3.5+ as python3

For setting up developer environment just type

make develop
Comments
  • Add aggregate decorator

    Add aggregate decorator

    Parametric decorator that aggregates multiple (but no more than max_count defaulting to None) single-argument executions (res1 = await func(arg1), res2 = await func(arg2), ...) of an asynchronous function with variadic positional arguments (async def func(*args, pho=1, bo=2) -> Iterable) into its single execution with multiple positional arguments (res1, res2, ... = await func(arg1, arg2, ...)) collected within a time window leeway_ms.

    opened by tzoiker 5
  • ERROR:aiomisc.periodic: Periodic task error:

    ERROR:aiomisc.periodic: Periodic task error:

    Я тут хочу запустить библиотеку aioimaplib через aiomisc.service.periodic но, при запуске получаю такой ответ:RuntimeError: Event loop is closed . Сам код нормально работает при запуске через asyncio loop. В чем могут быть проблемы? Log:

    opened by kholmatov 5
  • Can I safely use undocumented features?

    Can I safely use undocumented features?

    Can I safely use undocumented features, such as aiomisc.log.wrap.wrap_logging_handler() in production? Is it a stable API with just a poor documentation coverage, or is it just internal stuff that might change at any time?

    opened by cyber-chuvash 2
  • Decorators not preserving function signatures/typing

    Decorators not preserving function signatures/typing

    Hi,

    I found this library extremely useful, thank you for the awesome work.

    However, when I used the @threaded(_separate)? decorators in my typed codebase, the wrapped functions are typed as just Callable and the original type signatures are also lost. I get no type inference in my editor and my static type checker which will make things like refactors much more manually intensive process.

    I noticed that you use functools.wraps for your decorators and this is a long standing issue for it. It does not preserve function signatures and type information. That's why libraries like wrapt were created to try to address this issue.

    Do you think it would be possible to use wrapt instead of functools.wraps to create a better user experience for users who rely on type information? So what I'm trying to ask is, to what degree is user experience for users with typing important to this project? Because if it is important, it would make sense to use a library like wrapt.

    Thank you

    enhancement 
    opened by petoknm 2
  • Duplicated values under different keys in JSON Logger

    Duplicated values under different keys in JSON Logger

    Hi, thank you for great library!

    What is the idea of writing the same values to the log under different keys, if we have already set new names for these fields?

    1. This is mapping for keys: https://github.com/aiokitchen/aiomisc/blob/master/aiomisc/log/formatter/json.py#L31
    2. Then we write under new names this keys: https://github.com/aiokitchen/aiomisc/blob/master/aiomisc/log/formatter/json.py#L68
    3. Then later we iterate over all keys in record_dict https://github.com/aiokitchen/aiomisc/blob/master/aiomisc/log/formatter/json.py#L70, and write same values again under old key names.

    Maybe we should iterate over record_dict - FIELD_MAPPING.keys()?

    opened by kruvasyan 2
  • не работает с python 3.5.3

    не работает с python 3.5.3

    Добрый день! Нам приходится работать исключительно на debian9 и astra linux 1.6 - к обоим дистрибутивам "прибита гвоздями" версия python 3.5.3 (к астре так точно прибита). В этой версии python не выполняется:

    from typing import AsyncContextManager 
    

    (AsyncContextManager появился в typing вроде только с версии 3.5.4)

    Сами вопрос сейчас решаем патчем такого вида:

    --- a/aiomisc/pool.py
    +++ b/aiomisc/pool.py
    @@ -3,7 +3,7 @@
     from abc import ABC, abstractmethod
     from collections import defaultdict
     from random import random
    -from typing import AsyncContextManager
    +from typing_extensions import AsyncContextManager
     
     from .utils import cancel_tasks
     
    --- a/requirements.txt
    +++ b/requirements.txt
    @@ -1,2 +1,3 @@
     colorlog
     prettylog~=0.3.0
    +typing_extensions>=3.6.5
    

    Забираем AsyncContextManager из стороннего пакета typing_extensions

    С уважением, Александр.

    bug wontfix 
    opened by oldbay 2
  • Fix `SO_REUSEADDR` and `SO_REUSEPORT` in `bind_socket`

    Fix `SO_REUSEADDR` and `SO_REUSEPORT` in `bind_socket`

    Delete SO_REUSEADDR = 0 and SO_REUSEPORT = 0 set after binding, as that options also need to be set when socket.listen call is performed to have an effect.

    opened by leenr 2
  • aiomisc_pytest + entrypoint + session fixture + yield + test w fixture + test wo fixture = 💔

    aiomisc_pytest + entrypoint + session fixture + yield + test w fixture + test wo fixture = 💔

    Noticed something strange in the following setting (tests fail on teardown):

    • Session scope loop with entrypoint;
    • Session scope fixture yielding value;
    • Test function using this fixture;
    • Another test function not using this fixture.
    pytest_plugins = (
        'aiomisc',
    )
    
    @pytest.fixture(scope='session')
    def loop():
        with entrypoint() as loop:
            yield loop
    
    
    @pytest.fixture(scope='session')
    async def fixture():
        yield 123
    
    
    async def test_1(fixture):
        pass
    
    
    async def test_2():
        pass
    

    Gives

    Close <_UnixSelectorEventLoop running=False closed=False debug=True>
    
    def finalizer():  # type: ignore
            try:
    >           return event_loop.run_until_complete(gen.__anext__())
    
    /opt/miniconda3/envs/impulse/lib/python3.9/site-packages/aiomisc_pytest/pytest_plugin.py:462: 
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
    /opt/miniconda3/envs/impulse/lib/python3.9/asyncio/base_events.py:617: in run_until_complete
        self._check_closed()
    _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
    
    self = <_UnixSelectorEventLoop running=False closed=True debug=True>
    
        def _check_closed(self):
            if self._closed:
    >           raise RuntimeError('Event loop is closed')
    E           RuntimeError: Event loop is closed
    
    /opt/miniconda3/envs/impulse/lib/python3.9/asyncio/base_events.py:510: RuntimeError
    

    Any of the following changes makes it work without errors:

    • Replace session with any other scope;
    • Replace yield with return in the fixture;
    • Add the fixture to test_2;
    • Replace entrypoint with yield new_event_loop() in the loop fixture;
    • Add loop to the fixture explicitly.
    opened by tzoiker 1
  • True asynchronous file operations

    True asynchronous file operations

    It would be nice to add information to the docs about the aiofile library, which supports true asynchronous file operations, in addition to your thread-based implementation.

    opened by Olegt0rr 1
  • Tests TCP Service and TCP Client fail

    Tests TCP Service and TCP Client fail

    Hi All

    i have the following client TCP

    
        class MinosBaseClient:
            __slots__ = 'reader', 'writer', 'futures', 'loop', 'reader_task', '_serial'
    
           HEADER = struct.Struct(">I")
    
           def __init__(self, reader: asyncio.StreamReader,
                     writer: asyncio.StreamWriter,
                     loop: asyncio.AbstractEventLoop = None):
               self.reader = reader
               self.writer = writer
               self.futures = {}
               self._serial = None
               self.loop = loop or asyncio.get_event_loop()
               self.reader_task = self.loop.create_task(self._response_reader())
    
           async def _response_reader(self):
               try:
                   while True:
                       log.debug("MRPC Client: Response Received")
                       body_size = self.HEADER.unpack(
                           await self.reader.readexactly(self.HEADER.size)
                       )[0]
                       log.debug(f"MRPC Client: received data size of {body_size}")
                       response: MinosRPCResponse = MinosResponse.load(
                           await self.reader.readexactly(body_size),
                           MinosRPCResponse)
    
                       future = self.futures.pop(response.id, None)
    
                       if future is None:
                           continue
    
                       future.set_result(response)
               finally:
                   while self.futures:
                       _, future = self.futures.popitem()
    
                       if future.done():
                           continue
                       log.debug("MRPC Client: Set error")
                       future.set_exception(ConnectionAbortedError)
    
           async def close(self):
               """
               close the connection with the server
               """
               log.debug("Client: Closing connection")
               self.writer.write(self.HEADER.pack(0))
               self.reader_task.cancel()
               await asyncio.gather(self.reader_task, return_exceptions=True)
    
               self.loop.call_soon(self.writer.close)
               self.writer.write_eof()
               self.writer.close()
    
           def send_headers(self, path: str):
               """
               Send headers before all
               """
               log.debug("MRPC Client: Send Headers")
               header_request: MinosRPCHeadersRequest = MinosRequest.build(MinosRPCHeadersRequest).addAction(path)
               header_bytes: bytes = header_request.binary
               log.debug("MRCP Client: Send headers with ID: %d", header_request.id)
               self._serial = header_request.id
               self._send_bytes(header_bytes)
               log.debug("MRPC Client: Headers, sent")
    
           def send_body(self, message: t.Any = None):
               log.debug("MRPC Client: Send Body")
               if message:
                   body_request: MinosRPCBodyRequest = MinosRequest.build(MinosRPCBodyRequest).addId(self._serial) \
                    .addBody(message)
               else:
                   body_request: MinosRPCBodyRequest = MinosRequest.build(MinosRPCBodyRequest).addId(self._serial)
               content_bytes: bytes = body_request.binary
               log.debug("MRCP Client: Send Body with ID: %d", body_request.id)
               self._send_bytes(content_bytes)
               log.debug("Body Sent")
    
           def send_close(self):
            ...
    
           def _send_bytes(self, data: bytes):
               with io.BytesIO() as f:
                   f.write(self.HEADER.pack(len(data)))
                   f.write(data)
                   self.writer.write(f.getvalue())
    
           def send(self, path: str, message: t.Any = None, **kwargs):
               self.send_headers(path)
               self.futures[self._serial] = self.loop.create_future()
               self.send_body(message)
               self.send_close()
               return self.futures[self._serial]
    

    Is a lot of code but mainly is the same code used for TCP client in the example folder.

    The server service is a bit more complex but i have tested the server and the client outside the pytest environment and work well.

    So, i have defined the following test

    
    @pytest.fixture
    def config():
        return {
            "controller": "tests.controllers.RootController"
        }
    
    @pytest.fixture
    def services(config):
        return [
            MinosRPCServer(address='localhost', port=8900, conf=config)
        ]
    
    
    @pytest.fixture
    async def service_client() -> MinosBaseClient:
        reader, writer = await asyncio.open_connection(
            'localhost', 8900
        )
        client_connector = MinosBaseClient(reader, writer)
        try:
            yield client_connector
        finally:
            await client_connector.close()
    
    
    async def test_microservice_mrpc_client(service_client):
        result = await service_client.send("without_arg")
        assert True == False
    

    The assert is to have a better veiw of the logs and the error.

    when i start the following code i get the following error:

    async def test_microservice_mrpc_client(service_client):
    >       result = await service_client.send("without_arg")
    E       RuntimeError: Task <Task pending name='Task-61' coro=<test_microservice_mrpc_client() running at /Users/xxxx/PycharmProjects/minos_microservice_test/venv/lib/python3.9/site-packages/aiomisc_pytest/pytest_plugin.py:520> cb=[run_until_complete.<locals>.done_cb()]> got Future <Future pending> attached to a different loop
    

    Have something that is wrong in my code, because i have reviewed everithing and seems that all is fine.

    thanks in advance for your help

    opened by ogonbat 1
  • Release mismatches

    Release mismatches

    Currently there is a release mismatch across the distribution channels:

    It would really be appreciated if the releases are in sync. Especially the source and the GitHub tags otherwise it would be tricky if distributions need to patch something.

    Thanks

    opened by fabaff 1
  • Tag the source

    Tag the source

    It would be very helpful if you could tag releases as well again. This would enable distributions who want to fetch the source from GitHub instead of PyPI.

    Thanks

    opened by fabaff 0
  • Add gather helper functions

    Add gather helper functions

    Adding several helper functions:

    async def gather(
        *tocs: Optional[ToC],
        loop: Optional[AbstractEventLoop] = None,
        return_exceptions: bool = False,
    )
    

    Same as asyncio.gather but it is possible to safely pass Nones through.

    gather_shackled(
        *tocs: Optional[ToC],
        wait_cancelled: bool = False,
    ) -> list
    

    If any of the tasks fails – others are cancelled. Allows to wait for the completion of the cancelled tasks. Returns a list of values or raises the first exception.

    async def gather_independent(
        *tocs: Optional[ToC],
        wait_cancelled: bool = False,
    ) -> list
    

    If any of the tasks fails – it is ok. Allows to wait for the completion of the cancelled tasks in the case of external cancellation. Returns a list of values or exceptions.

    async def gather_graceful(
        primary: Optional[Sequence[Optional[ToC]]] = None, *,
        secondary: Sequence[Optional[ToC]] = None,
        wait_cancelled: bool = False,
    ) -> Union[list, Tuple[list, list]]
    

    Gather tasks in two groups - primary and secondary. If any primary is somehow failed, then, other tasks are cancelled. If secondary is failed – it is ok. If any primary is failed, then, will raise the first exception. Returns two lists of results, one for the primary tasks (only values) and the other for the secondary tasks (values or exceptions). If only primary or secondary is passed, then, returns a single list.

    async def wait_graceful(
        primary: Optional[Iterable[Task]] = None,
        secondary: Optional[Iterable[Task]] = None,
        *,
        wait_cancelled: bool = False,
    )
    

    Main logic is the same as for gather_graceful.

    async def wait_first_cancelled_or_exception(
        fs: Iterable[Future], *,
        loop: Optional[AbstractEventLoop] = None,
        timeout: float = None,
    )
    

    Similar to asyncio.wait, but particularly waits for the first cancelled or failed task.

    opened by tzoiker 0
  • Extend log configuration by logging template

    Extend log configuration by logging template

    Hi, everyone!

    I need a set plain-text format with an ordered key sequence for my application. How I can properly change the default logging template or apply my own log-handler to asyncio.entrypoint?

    Thanks!

    opened by nohitmai 1
  • Add graceful service shutdown

    Add graceful service shutdown

    Sometimes it is desirable to create tasks within a service that will either be awaited or cancelled upon service exit. Added

    • GracefulMixin - creates and stores tasks for cancellation or waiting;
    • GracefulService - performs graceful shutdown on stop.
    opened by tzoiker 2
Owner
aiokitchen
Cookbook with asyncio recipes and libraries.
aiokitchen
Utils for fastapi based services.

Installation pip install fastapi-serviceutils Usage For more details and usage see: readthedocs Development Getting started After cloning the repo

Simon Kallfass 31 Nov 25, 2022
Utils for fastapi based services.

Installation pip install fastapi-serviceutils Usage For more details and usage see: readthedocs Development Getting started After cloning the repo

Simon Kallfass 20 Dec 18, 2020
Tez is a super-simple and lightweight Trainer for PyTorch. It also comes with many utils that you can use to tackle over 90% of deep learning projects in PyTorch.

Tez: a simple pytorch trainer NOTE: Currently, we are not accepting any pull requests! All PRs will be closed. If you want a feature or something does

abhishek thakur 1.1k Jan 4, 2023
Devkit for 3D -- Some utils for 3D object detection based on Numpy and Pytorch

D3D Devkit for 3D: Some utils for 3D object detection and tracking based on Numpy and Pytorch Please consider siting my work if you find this library

Jacob Zhong 27 Jul 7, 2022
Json utils is a python module that you can use when working with json files.

Json-utils Json utils is a python module that you can use when working with json files. it comes packed with a lot of featrues Features Converting jso

Advik 4 Apr 24, 2022
A library for Deep Learning Implementations and utils

deeply A Deep Learning library Table of Contents Features Quick Start Usage License Features Python 2.7+ and Python 3.4+ compatible. Quick Start $ pip

Achilles Rasquinha 1 Dec 12, 2022
Airspy-Utils is a small software collection to help with firmware related operations on Airspy HF+ devices.

Airspy-Utils Airspy-Utils is a small software collection to help with firmware related operations on Airspy HF+ devices on Linux (and other free syste

Dhiru Kholia 11 Oct 4, 2022
Utils to quickly evaluate many 🤗 models on the GLUE tasks

Utils to quickly evaluate many ?? models on the GLUE tasks

Przemyslaw K. Joniak 1 Dec 22, 2021
go-cqhttp API typing annoations, return data models and utils for nonebot

go-cqhttp API typing annoations, return data models and utils for nonebot

风屿 6 Jan 4, 2023
Cleaning-utils - a collection of small Python functions and classes which make cleaning pipelines shorter and easier

cleaning-utils [] [] [] cleaning-utils is a collection of small Python functions

null 4 Aug 31, 2022
Snowfall - helpful image handling utils - abstracts various file and opencv and pil features into result oriented functions

snowfall helpful image handling utils - abstracts various file and opencv and pil features into result oriented functions usage examples: from image_h

Less Wright 2 Jan 9, 2022
Bounding Boxes Python Utils

Bounding Boxes Python Utils

Vadim 4 May 1, 2022
Some utils for auto speech recognition

About Some utils for auto speech recognition. Utils Util Description Script Reset audio Reset sample rate, sample width, etc of audios.

null 1 Jan 24, 2022
ZX Spectrum Utilities: (zx-spectrum-utils)

Here are a few utility programs that can be used with the zx spectrum. The ZX Spectrum is one of the first home computers from the early 1980s.

Graham Oakes 4 Mar 7, 2022
Ultra fast asyncio event loop.

uvloop is a fast, drop-in replacement of the built-in asyncio event loop. uvloop is implemented in Cython and uses libuv under the hood. The project d

magicstack 9.1k Jan 7, 2023
A curated list of awesome Python asyncio frameworks, libraries, software and resources

Awesome asyncio A carefully curated list of awesome Python asyncio frameworks, libraries, software and resources. The Python asyncio module introduced

Timo Furrer 3.8k Jan 8, 2023
Asynchronous HTTP client/server framework for asyncio and Python

Async http client/server framework Key Features Supports both client and server side of HTTP protocol. Supports both client and server Web-Sockets out

aio-libs 13.2k Jan 5, 2023
WebSocket and WAMP in Python for Twisted and asyncio

Autobahn|Python WebSocket & WAMP for Python on Twisted and asyncio. Quick Links: Source Code - Documentation - WebSocket Examples - WAMP Examples Comm

Crossbar.io 2.4k Jan 6, 2023
Motor - the async Python driver for MongoDB and Tornado or asyncio

Motor Info: Motor is a full-featured, non-blocking MongoDB driver for Python Tornado and asyncio applications. Documentation: Available at motor.readt

mongodb 2.1k Dec 26, 2022
WebSocket and WAMP in Python for Twisted and asyncio

Autobahn|Python WebSocket & WAMP for Python on Twisted and asyncio. Quick Links: Source Code - Documentation - WebSocket Examples - WAMP Examples Comm

Crossbar.io 2.4k Jan 4, 2023