Thread-safe Python RabbitMQ Client & Management library

Overview

AMQPStorm

Thread-safe Python RabbitMQ Client & Management library.

Version

Introduction

AMQPStorm is a library designed to be consistent, stable and thread-safe.

  • 100% Test Coverage!
  • Supports Python 2.7 and Python 3.3+.
  • Fully tested against Python Implementations; CPython and PyPy.

Documentation

Additional documentation is available on amqpstorm.io.

Changelog

Version 2.10.4

  • Fixed issue with a forcefully closed channel not sending the appropriate response [#114] - Thanks Bernd Höhl.

Version 2.10.3

  • Fixed install bug with cp1250 encoding on Windows [#112] - Thanks ZygusPatryk.

Version 2.10.2

  • Fixed bad socket fd causing high cpu usage [#110] - Thanks aiden0z.

Version 2.10.1

  • Fixed bug with UriConnection not handling amqps:// properly.
  • Improved documentation.

Version 2.10.0

  • Added Pagination support to Management list calls (e.g. queues list).
  • Added Filtering support to Management list calls.
  • Re-use the requests sessions for Management calls.
  • Updated to use pytest framework instead of nose for testing.

Version 2.9.0

  • Added support for custom Message implementations - Thanks Jay Hogg.
  • Fixed a bug with confirm_delivery not working after closing and re-opening an existing channel.
  • Re-worked the channel re-use code.

Version 2.8.5

  • Fixed a potential deadlock when opening a channel with a broken connection [#97] - Thanks mehdigmira.

Version 2.8.4

  • Fixed a bug in Message.create where it would mutate the properties dict [#92] - Thanks Killerama.

Version 2.8.3

  • Fixed pip sdist circular dependency [#88] - Thanks Jay Hogg.
  • Fixed basic.consume argument type in documentation [#86] - Thanks TechmarkDavid.

Version 2.8.2

  • Retry on SSLWantReadErrors [#82] - Thanks Bernhard Thiel.
  • Added getter/setter methods for Message properties expiration, message_type and user_id [#86] - Thanks Jay Hogg.

Version 2.8.1

  • Cleaned up documentation.

Version 2.8.0

  • Introduced a new channel function called check_for_exceptions.
  • Fixed issue where publish was successful but raises an error because connection was closed [#80] - Thanks Pavol Plaskon.
  • Updated SSL handling to use the non-deprecated way of creating a SSL Connection [#79] - Thanks Carl Hörberg from CloudAMQP.
  • Enabled SNI for SSL connections by default [#79] - Thanks Carl Hörberg from CloudAMQP.

Version 2.7.2

  • Added ability to override client_properties [#77] - Thanks tkram01.

Version 2.7.1

  • Fixed Connection close taking longer than intended when using SSL [#75]- Thanks troglas.
  • Fixed an issue with closing Channels taking too long after the server initiated it.

Version 2.7.0

  • Added support for passing your own ssl context [#71] - Thanks troglas.
  • Improved logging verbosity on connection failures [#72] - Thanks troglas.
  • Fixed occasional error message when closing a SSL connection [#68] - Thanks troglas.

Version 2.6.2

  • Set default TCP Timeout to 10s on UriConnection to match Connection [#67] - Thanks josemonteiro.
  • Internal RPC Timeout for Opening and Closing Connections are now set to a fixed 30s [#67] - Thanks josemonteiro.

Version 2.6.1

  • Fixed minor issue with the last channel id not being available.

Version 2.6.0

  • Re-use closed channel ids [#55] - Thanks mikemrm.
  • Changed Poller Timeout to be a constant.
  • Improved Connection Close performance.
  • Channels is now a publicly available variable in Connections.

Version 2.5.0

  • Upgraded pamqp to v2.0.0.
  • Properly wait until the inbound queue is empty when break_on_empty is set [#63] - Thanks TomGudman.
  • Fixed issue with Management queue/exchange declare when the passive flag was set to True.

Credits

Special thanks to gmr (Gavin M. Roy) for creating pamqp, and in addition amqpstorm is heavily influenced by his pika and rabbitpy libraries.

Comments
  • Out of order ack-ing?

    Out of order ack-ing?

    Whoooo, more wierdness!

    .Thread-21 - INFO - Message channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message properties: {'headers': None, 'reply_to': '', 'user_id': '', 'cluster_id': '', 'app_id': '', 'delivery_mode': None, 'content_type': '', 'correlation_id': 'keepalive', 'expiration': '', 'message_id': '', 'message_type': '', 'priority': None, 'content_encoding': 'utf-8', 'timestamp': None}
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _method: {'routing_key': b'nak', 'redelivered': False, 'delivery_tag': 5, 'exchange': b'keepalive_exchange140037604406920', 'consumer_tag': b'amq.ctag-DCdsyFR66UJsU7sgkNp9oQ'}
    ACK For delivery tag: 5
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Heartbeat packet received! wat
    Main.Connector.Internal(/rpcsys).Thread-23 - INFO - Timeout watcher loop. Current message counts: 0 (out: 0, in: 0)
    Main.Connector.Container(/rpcsys).Thread-23 - INFO - Interface timeout thread. Ages: heartbeat -> 4.83, last message -> 32.51.
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Received message!
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message properties: {'headers': None, 'reply_to': '', 'user_id': '', 'cluster_id': '', 'app_id': '', 'delivery_mode': None, 'content_type': '', 'correlation_id': 'keepalive', 'expiration': '', 'message_id': '', 'message_type': '', 'priority': None, 'content_encoding': 'utf-8', 'timestamp': None}
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _method: {'routing_key': b'nak', 'redelivered': False, 'delivery_tag': 6, 'exchange': b'keepalive_exchange140037604406920', 'consumer_tag': b'amq.ctag-DCdsyFR66UJsU7sgkNp9oQ'}
    ACK For delivery tag: 6
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Heartbeat packet received! wat
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Received message!
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message properties: {'headers': None, 'reply_to': '', 'user_id': '', 'cluster_id': '', 'app_id': '', 'delivery_mode': None, 'content_type': '', 'correlation_id': 'keepalive', 'expiration': '', 'message_id': '', 'message_type': '', 'priority': None, 'content_encoding': 'utf-8', 'timestamp': None}
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _method: {'routing_key': b'nak', 'redelivered': True, 'delivery_tag': 7, 'exchange': b'keepalive_exchange140037604406920', 'consumer_tag': b'amq.ctag-DCdsyFR66UJsU7sgkNp9oQ'}
    ACK For delivery tag: 7
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Heartbeat packet received! wat
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR - Error while in rx runloop!
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -       Channel 1 was closed by remote server: PRECONDITION_FAILED - unknown delivery tag 6
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR - Traceback (most recent call last):
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/FetchAgent/AmqpConnector/__init__.py", line 546, in _rx_poll
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -     self.interface.process_rx_events()
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/FetchAgent/AmqpConnector/__init__.py", line 171, in process_rx_events
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -     self.storm_channel.process_data_events(to_tuple=False)
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/channel.py", line 266, in process_data_events
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -     for message in self.build_inbound_messages(break_on_empty=True):
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/channel.py", line 113, in build_inbound_messages
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -     self.check_for_errors()
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/channel.py", line 188, in check_for_errors
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -     raise exception
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR - amqpstorm.exception.AMQPChannelError: Channel 1 was closed by remote server: PRECONDITION_FAILED - unknown delivery tag 6
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -
    Main.Connector.Internal(/rpcsys).Thread-21 - WARNING - Should die flag! Runstate: running, threads live: threads alive, had exception: yes.
    Main.Connector.Internal(/rpcsys).Thread-21 - INFO - RX Poll process dying. Threads_live: 1, had exception 1, should_die True
    Main.Connector.Internal(/rpcsys).Thread-22 - WARNING - Should die flag! Runstate: running, threads live: threads alive, had exception: yes.
    Main.Connector.Internal(/rpcsys).Thread-22 - WARNING - Should die flag! Runstate: running, threads live: threads alive, had exception: yes.
    Main.Connector.Internal(/rpcsys).Thread-22 - INFO - TX Poll process dying (should die: True). Threads_live: 1, runstate: 1, resp queue size: 0, had exception 1.
    Main.Connector.Internal(/rpcsys).Thread-23 - WARNING - Should die flag! Runstate: running, threads live: threads alive, had exception: yes.
    Disconnecting!
    Main.Connector.Container(/rpcsys).Thread-2 - INFO - Closing channel
    Main.Connector.Container(/rpcsys).Thread-2 - INFO - Closing connection
    Running state: True, thread alive: True, thread id:140037440014080
    Joining _inbound_thread. Runstate: %s False
    
    

    Context:

    I have a connection with a thread processing the receiving messages. I have instrumented Message.ack() with a print statement that prints the delivery tag that it's acking.

    It appears I'm calling [delivery tag 6].ack(), [delivery tag 7].ack(), and somehow the ack for delivery tag 7 is getting received by the rabbitmq server /first/, resulting in a PRECONDITION_FAILED error because acking 7 implicitly acks previous tags, and therefore 6 is not a valid delivery tag anymore.

    I'm working on pulling out a testable example, but it's certainly odd.


    Incidentally, the new docs pages are fancypants!

    wontfix 
    opened by fake-name 49
  • Add the ability to forcefully close a connection.

    Add the ability to forcefully close a connection.

    Basically, I'm dealing with a context where I have a high volume traffic AMQP connection, across a high-latency, unreliable link (intercontinental).

    If there is a way for a connection to possibly wedge, I'll encounter it.

    Anyways, The issue I ran into here is that it's possible for the "close" operation on a connection to wedge indefinitely, preventing a connection from actually closing (I /think/ if the shutdown RPC request gets garbled/eaten somewhere).

    The question of /how/ this happens aside, I therefore need to be able to kill a open connection in a dirty manner. This adds the ability to kill() a connection which will force the worker thread to exit immediately, without bothering to do any proper cleanup.

    Because this is a operation that's generally done with a additional watcher process, outside of the normal program flow, I used multiprocessing primitives to manage the _die flag (at one point, the watcher was a separate /process/, not thread).

    Anyways, I'm not sure if this is inline with the ideas of the library, but I haven't been able to wedge the amqp connection with this patch set, so that's something.


    Only tested on Py3.5x64, Ubuntu 14.04.

    Things I haven't done: Additional unit tests.

    opened by fake-name 17
  • SSL Retry

    SSL Retry

    I regularly get [SSL: BAD_WRITE_RETRY] bad write retry (_ssl.c:1647) errors when running AMQP-Storm using SSL.

    Looking at the code I think its because it's not handling SSL Retry exceptions from the socket

    http://stackoverflow.com/questions/2997218/why-am-i-getting-error1409f07fssl-routinesssl3-write-pending-bad-write-retr

    i'll get a stack trace to see if its on write or on do_handshake

    bug 
    opened by thejuan 17
  • reached the maximum number of channels raised with closed channels

    reached the maximum number of channels raised with closed channels

    Hello,

    I've run into an issue with creating new channels and receiving the reached the maximum number of channels 65535 when attempting to create a new channel.

    After some digging, I noticed Connection._get_next_available_channel_id() accounts for all channels, both open and closed. I believe filtering the count for just opened should resolve this issue.

    I tested with a quick fix

        def _get_next_available_channel_id(self):
            channel_id = len(self._channels) + 1
            active_channels = [
                ch for ch in list(self._channels.values()) if ch and ch.is_open
            ]
            if len(active_channels) >= self.max_allowed_channels:
                raise AMQPConnectionError(
                    'reached the maximum number of channels %d' %
                    self.max_allowed_channels)
            return channel_id
    

    However it may be better to just keep an active count

    bug 
    opened by mikemrm 12
  • Unhandled Frames in 3.6.2

    Unhandled Frames in 3.6.2

    Upgraded to RabbitMq 3.6.2 starting to see the following errors which I think lead to using all the memory on rabbitmq server.

    [Channel%d] Unhandled Frame: %s -- %s

    No code changes, just a server upgrade. Not sure if this storm or rabbitmq.

    opened by thejuan 12
  • Exception: 'Deliver' object has no attribute 'body_size'

    Exception: 'Deliver' object has no attribute 'body_size'

    I have been experiencing a strange error, just for some messages I got this error:

    'Deliver' object has no attribute 'body_size' channel.py (line: 253)

    I looked at self._inbound and I saw some Deliver objects in sequence instead of Deliver/ContentHeader/ContentBody

    amqp-storm

    The messages are being sent to the queue by another project in another language, so I have no clue why it is happening.

    bug 
    opened by viniciuschiele 12
  • Cant publish multiple messages

    Cant publish multiple messages

    This is a weird one, I've upgraded to 2.2 and trying to do the following script

    import os
    import logging
    logging.basicConfig(level=logging.DEBUG)
    
    from amqpstorm import Message
    from amqpstorm import UriConnection
    
    keys = ["1","2","3"]
    #signer = Signer()
    bus = UriConnection("***")
    with bus.channel(rpc_timeout=10) as channel:
        channel.confirm_deliveries()
        for key in keys:
            print key
            msg = "My Message"
            #properties = {"headers": {"md5-signature": signer.sign(msg)}}
            Message.create(channel, msg, properties).publish(key, exchange="amq.topic")
    
    

    I get this error on the second publish.

    DEBUG:amqpstorm.connection:Connection Opening DEBUG:amqpstorm.channel0:Frame Received: Connection.Start DEBUG:amqpstorm.channel0:Frame Sent: Connection.StartOk DEBUG:amqpstorm.channel0:Frame Received: Connection.Tune DEBUG:amqpstorm.channel0:Frame Sent: Connection.TuneOk DEBUG:amqpstorm.channel0:Frame Sent: Connection.Open DEBUG:amqpstorm.channel0:Frame Received: Connection.OpenOk DEBUG:amqpstorm.heartbeat:Heartbeat Checker Started DEBUG:amqpstorm.connection:Connection Opened DEBUG:amqpstorm.connection:Opening a new Channel DEBUG:amqpstorm.connection:Channel #1 Opened 1 2 DEBUG:amqpstorm.channel:Channel #1 Closing DEBUG:amqpstorm.channel:Channel #1 Closed Exception in thread amqpstorm.io: Traceback (most recent call last): File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner self.run() File "/usr/lib/python2.7/threading.py", line 754, in run self.__target(*self.__args, **self.__kwargs) File "/home/adam/.venv/local/lib/python2.7/site-packages/amqpstorm/io.py", line 219, in _process_incoming_data if self.poller.is_ready: File "/home/adam/.venv/local/lib/python2.7/site-packages/amqpstorm/io.py", line 48, in is_ready except select.error as why: AttributeError: 'NoneType' object has no attribute 'error'

    bug 
    opened by thejuan 11
  • Testing with pamqp 3.0.0a4

    Testing with pamqp 3.0.0a4

    I plan on releasing pamqp 3.0 soon and wanted to make sure you were aware of the changes that are making it into 3.0 and give you an opportunity to report any issues prior to its release.

    Please see the version history @ https://pamqp.readthedocs.io/en/latest/history.html

    opened by gmr 10
  • potential infinite loop ?

    potential infinite loop ?

    Hello,

    I am using your library in conjunction with celery and i regularly encounter a problem: When asking celery to restart its worker (emitting a SIGTERM), the process is blocked and celery doesn't want to restart. Celery maintains a parent thread that runs child threads where tasks are executed. Amqpstorm's thread is also run from this parent thread. The processes in htop look like this:

    screenshot from 2017-04-11 10-34-26

    Here process 13880 & 13803 are related to amqpstorm. 13803 is the inbound_thread while 13880 is the heartbeat timer.

    After some investigation, i found out that killing the thread responsible for the heartbeat timer allows celery to gracefuly restart... This lead me to think that the timer could possibly create a deadlock.

    I have created a pull request based on this assumption and will try this branch on my setup: Can you tell me what you think of it ? Do you see any other possible explanations for the error i see ?

    opened by cp2587 10
  • Connection was closed by remote server: CONNECTION_FORCED

    Connection was closed by remote server: CONNECTION_FORCED

    Hello,

    We recently updated the library version to 2.1.3 (from 1.1.7) and we now face several errors we did not have previously. One of them is the following:

        self.channel.basic.publish(body=data, exchange=self.exchange, routing_key='')
      File "/home/wirebot/.virtualenvs/cayzn_yield.etl/local/lib/python2.7/site-packages/amqpstorm/basic.py", line 194, in publish
        self._channel.write_frames(frames_out)
      File "/home/wirebot/.virtualenvs/cayzn_yield.etl/local/lib/python2.7/site-packages/amqpstorm/channel.py", line 326, in write_frames
        self.check_for_errors()
      File "/home/wirebot/.virtualenvs/cayzn_yield.etl/local/lib/python2.7/site-packages/amqpstorm/channel.py", line 169, in check_for_errors
        self._connection.check_for_errors()
      File "/home/wirebot/.virtualenvs/cayzn_yield.etl/local/lib/python2.7/site-packages/amqpstorm/connection.py", line 155, in check_for_errors
        raise self.exceptions[0]
    AMQPConnectionError: Connection was closed by remote server: CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'
    

    On the rabbitmq server, we have the following logs: client unexpectedly closed TCP connection

    Finally here is the code i use to create an amqp socket (this socket is then used by logging to send log in the queue) :

    class AMQPStormSocket(object):
    
        def __init__(self, host, port, username, password, virtual_host, exchange, queue, exchange_is_durable,
                     queue_is_durable, exchange_type, fallback_call):
    
            # create connection & channel
            self.connection = amqpstorm.Connection(host, username, password, port, virtual_host=virtual_host, timeout=1)
            self.channel = self.connection.channel()
    
            # create an exchange, if needed
            self.channel.exchange.declare(exchange=exchange, exchange_type=exchange_type, durable=exchange_is_durable)
            # create a queue, if needed
            self.channel.queue.declare(queue=queue, durable=queue_is_durable, passive=False, auto_delete=False)
            # bind it
            self.channel.queue.bind(queue=queue, exchange=exchange)
    
            # needed when publishing
            self.exchange = exchange
    
            self.fallback_call = fallback_call
    
        def sendall(self, data):
            try:
                self.channel.basic.publish(body=data, exchange=self.exchange, routing_key='')
            except Exception as e:
                self.fallback_call(e)
    
        def close(self):
            try:
                self.channel.close()
                self.connection.close()
            except Exception:
                pass
    

    Do you have an idea on how to fix these errors ?

    opened by cp2587 10
  • Queue declaration must not be obligatory, AMQPChannelError on trying to consume from predeclared queue

    Queue declaration must not be obligatory, AMQPChannelError on trying to consume from predeclared queue

    I use RabbitMQ, all the excahnges and consumers were preconfigured with differnet options. Unfortunately when I try to consume from a durable queue I get

    mqpstorm.exception.AMQPChannelError: Channel 1 was closed by remote server: NOT_FOUND - no previously declared queue

    If I try to declare a queue without specifying any params I get

    Channel 1 was closed by remote server: PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'test_queue' in vhost 'lizziebot': received 'false' but current is 'true'

    The only way is to specify all the params including including ttl, autodelete, etc. I cannot use this library because I need rabbitmq structure to be configured not on the client side ;-(

    bug 
    opened by blackalegator 9
  • Is there any support for AMQP 1-0-0 ?

    Is there any support for AMQP 1-0-0 ?

    Does the module supports AMQP-1-0-0.

    I do not see anything regarding supported version in the Doc.

    If not, is there any plan to support it in the future?

    opened by ruffp 1
  • Installation errors with Python 3.10

    Installation errors with Python 3.10

    I am using Python3 version 3.10.4 on Ubuntu 18.04 LTS and encountering the following error when attempting to install amqpstorm:

    $ pip install amqpstorm
    Keyring is skipped due to an exception: module 'collections' has no attribute 'MutableMapping'
    Defaulting to user installation because normal site-packages is not writeable
    Collecting amqpstorm
      Using cached AMQPStorm-2.10.4.tar.gz (71 kB)
      Preparing metadata (setup.py) ... error
      error: subprocess-exited-with-error
    
      × python setup.py egg_info did not run successfully.
      │ exit code: 1
      ╰─> [20 lines of output]
          Traceback (most recent call last):
            File "<string>", line 2, in <module>
            File "<pip-setuptools-caller>", line 14, in <module>
            File "/usr/lib/python3/dist-packages/setuptools/__init__.py", line 12, in <module>
              import setuptools.version
            File "/usr/lib/python3/dist-packages/setuptools/version.py", line 1, in <module>
              import pkg_resources
            File "/usr/lib/python3/dist-packages/pkg_resources/__init__.py", line 77, in <module>
              __import__('pkg_resources.extern.packaging.requirements')
            File "/usr/lib/python3/dist-packages/pkg_resources/_vendor/packaging/requirements.py", line 9, in <module>
              from pkg_resources.extern.pyparsing import stringStart, stringEnd, originalTextFor, ParseException
            File "<frozen importlib._bootstrap>", line 1027, in _find_and_load
            File "<frozen importlib._bootstrap>", line 1006, in _find_and_load_unlocked
            File "<frozen importlib._bootstrap>", line 672, in _load_unlocked
            File "<frozen importlib._bootstrap>", line 632, in _load_backward_compatible
            File "/usr/lib/python3/dist-packages/pkg_resources/extern/__init__.py", line 43, in load_module
              __import__(extant)
            File "/usr/lib/python3/dist-packages/pkg_resources/_vendor/pyparsing.py", line 943, in <module>
              collections.MutableMapping.register(ParseResults)
          AttributeError: module 'collections' has no attribute 'MutableMapping'
          [end of output]
    
      note: This error originates from a subprocess, and is likely not a problem with pip.
    error: metadata-generation-failed
    
    × Encountered error while generating package metadata.
    ╰─> See above for output.
    
    note: This is an issue with the package mentioned above, not pip.
    hint: See above for details.
    

    I have seen recommendations to upgrade to the latest version of requests when this happens, but I already have the latest version of requests installed, here is the contents of my requirements.txt file:

    boto3==1.23.0
    botocore==1.26.0
    jmespath==1.0.0
    python-dateutil==2.8.2
    s3transfer==0.5.2
    six==1.16.0
    urllib3==1.26.9
    redis==4.3.1
    requests==2.27.1
    amqpstorm==2.10.4
    pyyaml==6.0
    
    opened by ashleykleynhans 3
  • How does amqpstorm handle Rabbitmq memory alarms

    How does amqpstorm handle Rabbitmq memory alarms

    Hello,

    Thanks for your work on this library 👍 I have a question regarding Rabbitmq memory watermarks and the way amqpstorm handles them.

    Imagine the following:

    1. I have a process that keeps publishing messages (basically an infinite loop for this example)
    2. At a certain point memory alarm is triggered on RMQ, and further publishes are blocked (Writes on a blocked connection will time out or fail with an I/O write exception.), also the publisher get notified that a connexion is blocked (Compatible AMQP 0-9-1 clients will be [notified] when they are blocked and unblocked.).
    3. Then Rabbitmq flushes out some of the data to disk, and unblocks publisher

    What happens on AMQPstorm side ? Does it raise ? do we lose some messages ? does the block/unblock logic trigger some logic in AMQPStorm ?

    Thanks for the help

    Links: RMQ doc

    opened by mehdigmira 4
  • Asyncronous delivery confirmation

    Asyncronous delivery confirmation

    Hi,

    First, thanks for the work.

    We are experiencing some slow-down while enqueuing message with confirm-delivery. Some time before we were not using this feature and enqueuing would be quite fast, and after enabling this feature, enqueuing would become quite slower.

    I checked and it seems that the lib wait for the confirmation on each message while on the following blog post (https://blog.rabbitmq.com/posts/2011/02/introducing-publisher-confirms) they seems to tell us to enqueue all the message and then wait for all the confirmations, do you think there would be a way for amqpstorm to handle that ? If you have some indications, I could try an implementation.

    Thank you.

    enhancement 
    opened by antoinerabany 4
  • Robust Connection example

    Robust Connection example

    Hi,

    This is more of a question than an issue - is it possible to be able to manage a long-lived connection such that a reconnection can be immediately attempted if the channel/connection is closed?

    I see the robust consumer example, but nothing for a connection that maybe used just for publishing. I don't see the ability to add any callbacks for a closed connection or a way to block the current thread until there's an error on the connection/channel, or maybe I'm just missing something.

    Thanks for any help in advance.

    opened by jmcarter 4
Releases(2.10.5)
  • 2.10.5(Aug 14, 2022)

    • Added support for bulk removing users with the Management Api.
    • Added support to get the Cluster Name using the Management Api.
    • Fixed ConnectionUri to default to port 5761 when using ssl [#119] - Thanks s-at-ik.
    Source code(tar.gz)
    Source code(zip)
  • 2.10.4(Nov 20, 2021)

  • 2.10.3(Nov 4, 2021)

  • 2.10.2(Oct 22, 2021)

  • 2.10.1(Sep 29, 2021)

  • 2.10.0(Sep 7, 2021)

    • Added Pagination support to Management list calls (e.g. queues list).
    • Added Filtering support to Management list calls.
    • Re-use the requests sessions for Management calls.
    • Updated to use pytest framework instead of nose for testing.
    Source code(tar.gz)
    Source code(zip)
  • 2.9.0(Jun 11, 2021)

    • Added support for custom Message implementations - Thanks Jay Hogg.
    • Fixed a bug with confirm_delivery not working after closing and re-opening an existing channel.
    • Re-worked the channel re-use code.
    Source code(tar.gz)
    Source code(zip)
  • 2.8.5(May 26, 2021)

  • 2.8.4(Mar 16, 2021)

  • 2.8.3(Mar 16, 2021)

    • Fixed pip sdist circular dependency [#88] - Thanks Jay Hogg.
    • Fixed basic.consume argument type in documentation [#86] - Thanks TechmarkDavid.
    Source code(tar.gz)
    Source code(zip)
  • 2.8.2(Oct 8, 2020)

    • Retry on SSLWantReadErrors [#82] - Thanks Bernhard Thiel.
    • Added getter/setter methods for Message properties expiration, message_type and user_id [#86] - Thanks Jay Hogg.
    Source code(tar.gz)
    Source code(zip)
  • 2.8.1(Jun 27, 2020)

  • 2.8.0(Jun 9, 2020)

    • Introduced a new channel function called check_for_exceptions.
    • Fixed issue where publish was successful but raises an error because connection was closed [#80] - Thanks Pavol Plaskoň.
    • Updated SSL handling to use the non-deprecated way of creating a SSL Connection [#79] - Thanks Carl Hörberg from CloudAMQP.
    • Enabled SNI for SSL connections by default [#79] - Thanks Carl Hörberg from CloudAMQP.
    Source code(tar.gz)
    Source code(zip)
  • 2.7.2(Dec 2, 2019)

  • 2.7.1(Jun 16, 2019)

    • Fixed Connection close taking longer than intended when using SSL [#75]- Thanks troglas.
    • Fixed an issue with closing Channels taking too long after the server initiated it.
    Source code(tar.gz)
    Source code(zip)
  • 2.7.0(Apr 13, 2019)

    • Added support for passing your own ssl context [#71] - Thanks troglas.
    • Improved logging verbosity on connection failures [#72] - Thanks troglas.
    • Fixed occasional error message when closing a SSL connection [#68] - Thanks troglas.
    Source code(tar.gz)
    Source code(zip)
  • 2.6.2(Feb 2, 2019)

    • Set default TCP Timeout to 10s on UriConnection to match Connection [#67] - Thanks josemonteiro.
    • Internal RPC Timeout for Opening and Closing Connections are now set to a fixed 30s [#67] - Thanks josemonteiro.
    Source code(tar.gz)
    Source code(zip)
  • 2.6.1(Dec 28, 2018)

  • 2.6.0(Dec 28, 2018)

    • Re-use closed channel ids [#55] - Thanks mikemrm.
    • Changed Poller Timeout to be a constant.
    • Improved Connection Close performance.
    • Channels is now a publicly available variable in Connections.
    Source code(tar.gz)
    Source code(zip)
  • 2.5.0(Nov 25, 2018)

    • Upgraded pamqp to v2.0.0.
      • Python 3 keys will now always be of type str.
      • For more information see https://pamqp.readthedocs.io/en/latest/history.html
    • Properly wait until the inbound queue is empty when break_on_empty is set [#63] - Thanks TomGudman.
    • Fixed issue with Management queue/exchange declare when the passive flag was set to True.
    Source code(tar.gz)
    Source code(zip)
  • 2.4.2(Sep 1, 2018)

    • Added support for External Authentication - Thanks Bernd Höhl.
    • Fixed typo in setup.py extra requirements - Thanks Bernd Höhl.
    • LICENSE file now included in package - Thanks Tomáš Chvátal.
    Source code(tar.gz)
    Source code(zip)
  • 2.4.1(Aug 29, 2018)

    • Added client/server negotiation to better determine the maximum supported channels and maximum allowed frame size [#52] - Thanks gastlich.
    • We now raise an exception if the maximum allowed channel count is reached.
    Source code(tar.gz)
    Source code(zip)
  • 2.4.0(Jan 17, 2018)

  • 2.3.0(Nov 8, 2017)

    • Added delivery_tag property to message.
    • Added redelivered property to message [#41] - Thanks tkram01.
    • Added support for Management Api Healthchecks [#39] - Thanks Julien Carpentier.
    • Fixed incompatibility with Sun Solaris 10 [#46] - Thanks Giuliox.
    • Fixed delivery_tag being set to None by default [#47] - tkram01.
    • Exposed requests verify and certs flags to Management Api [#40] - Thanks Julien Carpentier.
    Source code(tar.gz)
    Source code(zip)
  • 2.2.2(Apr 23, 2017)

  • 2.2.1(Feb 22, 2017)

    • Fixed potential Channel leak [#36] - Thanks Adam Mills.
    • Fixed threading losing select module during python shutdown [#37] - Thanks Adam Mills.
    Source code(tar.gz)
    Source code(zip)
  • 2.2.0(Nov 18, 2016)

    • Connection.close should now be more responsive.
    • Channels are now reset when re-opening an existing connection.
    • Re-wrote large portions of the Test suit.
    Source code(tar.gz)
    Source code(zip)
  • 2.1.4(Nov 3, 2016)

    • Added parameter to override auto-decode on incoming Messages - Thanks Travis Griggs.
    • Fixed a rare bug that could cause the consumer to get stuck if the connection unexpectedly dies - Thanks Connor Wolf.
    Source code(tar.gz)
    Source code(zip)
  • 2.1.3(Sep 29, 2016)

  • 2.1.2(Sep 23, 2016)

Owner
Erik Olof Gunnar Andersson
The views expressed here are 100% mine and in no way reflect those of my employer.
Erik Olof Gunnar Andersson
gnosis safe tx builder

Ape Safe: Gnosis Safe tx builder Ape Safe allows you to iteratively build complex multi-step Gnosis Safe transactions and safely preview their side ef

null 228 Dec 22, 2022
An Anime Themed Fast And Safe Group Managing Bot.

Ξ L I N Λ ?? A Powerful, Smart And Simple Group Manager bot Avaiilable a latest version as Ξ L I N Λ ?? on Telegram Self-hosting (For Devs) vps # Inst

null 7 Nov 12, 2022
Project glow is an open source bot worked on by many people to create a good and safe moderation bot for all

Project Glow Greetings, I see you have stumbled upon project glow. Project glow is an open source bot worked on by many people to create a good and sa

Glowstikk 24 Sep 29, 2022
an API to check if a url or IP address is safe or phishing

an API to check if a url or IP address is safe or phishing. Using a ML model. The API created using FastAPI.

Adel Dahani 1 Feb 16, 2022
Dns-Client-Server - Dns Client Server For Python

Dns-client-server DNS Server: supporting all types of queries and replies. Shoul

Nishant Badgujar 1 Feb 15, 2022
Raphtory-client - The python client for the Raphtory project

Raphtory Client This is the python client for the Raphtory project Install via p

Raphtory 5 Apr 28, 2022
Drcom-pt-client - Drcom Pt version client with refresh timer

drcom-pt-client Drcom Pt version client with refresh timer Dr.com Pt版本客户端 可用于网页认

null 4 Nov 16, 2022
Discord bot ( discord.py ), uses pandas library from python for data-management.

Discord_bot A Best and the most easy-to-use Discord bot !! Some simple basic auto moderations, Chat functions. It includes a game similar to Casino, g

Jaitej 4 Aug 30, 2022
A group management bot written in python3 using the python-telegram-bot library.

Chika Fujiwara A modular telegram Python bot running on python3 with an sqlalchemy database. Originally a Marie fork, Chika was created for personal u

Wahyusaputra 3 Feb 12, 2022
🐍 The official Python client library for Google's discovery based APIs.

Google API Client This is the Python client library for Google's discovery based APIs. To get started, please see the docs folder. These client librar

Google APIs 6.2k Jan 8, 2023
The official Python client library for the Kite Connect trading APIs

The Kite Connect API Python client - v3 The official Python client for communicating with the Kite Connect API. Kite Connect is a set of REST-like API

Zerodha Technology 756 Jan 6, 2023
Unirest in Python: Simplified, lightweight HTTP client library.

Unirest for Python Unirest is a set of lightweight HTTP libraries available in multiple languages, built and maintained by Mashape, who also maintain

Kong 432 Dec 21, 2022
🐍 The official Python client library for Google's discovery based APIs.

Google API Client This is the Python client library for Google's discovery based APIs. To get started, please see the docs folder. These client librar

Google APIs 6.2k Dec 31, 2022
Pure Python 3 MTProto API Telegram client library, for bots too!

Telethon ⭐️ Thanks everyone who has starred the project, it means a lot! Telethon is an asyncio Python 3 MTProto library to interact with Telegram's A

LonamiWebs 7.3k Jan 1, 2023
Python client library for Google Maps API Web Services

Python Client for Google Maps Services Description Use Python? Want to geocode something? Looking for directions? Maybe matrices of directions? This l

Google Maps 3.8k Jan 1, 2023
Beyonic API Python official client library simplified examples using Flask, Django and Fast API.

Beyonic API Python official client library simplified examples using Flask, Django and Fast API.

Harun Mbaabu Mwenda 46 Sep 1, 2022
Python Client Library to interface with the Phoenix Realtime Server

supabase-realtime-client Python Client Library to interface with the Phoenix Realtime Server This is a fork of the supabase community realtime client

Anand 2 May 24, 2022
The Python client library for the Tuneup Technology App.

Tuneup Technology App Python Client Library The Python client library for the Tuneup Technology App. This library allows you to interact with the cust

Tuneup Technology 0 Jun 29, 2022
Python client library for Bigcommerce API

Bigcommerce API Python Client Wrapper over the requests library for communicating with the Bigcommerce v2 API. Install with pip install bigcommerce or

BigCommerce 81 Dec 26, 2022