Thread-safe Python RabbitMQ Client & Management library

Overview

AMQPStorm

Thread-safe Python RabbitMQ Client & Management library.

Version

Introduction

AMQPStorm is a library designed to be consistent, stable and thread-safe.

  • 100% Test Coverage!
  • Supports Python 2.7 and Python 3.3+.
  • Fully tested against Python Implementations; CPython and PyPy.

Documentation

Additional documentation is available on amqpstorm.io.

Changelog

Version 2.10.4

  • Fixed issue with a forcefully closed channel not sending the appropriate response [#114] - Thanks Bernd Höhl.

Version 2.10.3

  • Fixed install bug with cp1250 encoding on Windows [#112] - Thanks ZygusPatryk.

Version 2.10.2

  • Fixed bad socket fd causing high cpu usage [#110] - Thanks aiden0z.

Version 2.10.1

  • Fixed bug with UriConnection not handling amqps:// properly.
  • Improved documentation.

Version 2.10.0

  • Added Pagination support to Management list calls (e.g. queues list).
  • Added Filtering support to Management list calls.
  • Re-use the requests sessions for Management calls.
  • Updated to use pytest framework instead of nose for testing.

Version 2.9.0

  • Added support for custom Message implementations - Thanks Jay Hogg.
  • Fixed a bug with confirm_delivery not working after closing and re-opening an existing channel.
  • Re-worked the channel re-use code.

Version 2.8.5

  • Fixed a potential deadlock when opening a channel with a broken connection [#97] - Thanks mehdigmira.

Version 2.8.4

  • Fixed a bug in Message.create where it would mutate the properties dict [#92] - Thanks Killerama.

Version 2.8.3

  • Fixed pip sdist circular dependency [#88] - Thanks Jay Hogg.
  • Fixed basic.consume argument type in documentation [#86] - Thanks TechmarkDavid.

Version 2.8.2

  • Retry on SSLWantReadErrors [#82] - Thanks Bernhard Thiel.
  • Added getter/setter methods for Message properties expiration, message_type and user_id [#86] - Thanks Jay Hogg.

Version 2.8.1

  • Cleaned up documentation.

Version 2.8.0

  • Introduced a new channel function called check_for_exceptions.
  • Fixed issue where publish was successful but raises an error because connection was closed [#80] - Thanks Pavol Plaskon.
  • Updated SSL handling to use the non-deprecated way of creating a SSL Connection [#79] - Thanks Carl Hörberg from CloudAMQP.
  • Enabled SNI for SSL connections by default [#79] - Thanks Carl Hörberg from CloudAMQP.

Version 2.7.2

  • Added ability to override client_properties [#77] - Thanks tkram01.

Version 2.7.1

  • Fixed Connection close taking longer than intended when using SSL [#75]- Thanks troglas.
  • Fixed an issue with closing Channels taking too long after the server initiated it.

Version 2.7.0

  • Added support for passing your own ssl context [#71] - Thanks troglas.
  • Improved logging verbosity on connection failures [#72] - Thanks troglas.
  • Fixed occasional error message when closing a SSL connection [#68] - Thanks troglas.

Version 2.6.2

  • Set default TCP Timeout to 10s on UriConnection to match Connection [#67] - Thanks josemonteiro.
  • Internal RPC Timeout for Opening and Closing Connections are now set to a fixed 30s [#67] - Thanks josemonteiro.

Version 2.6.1

  • Fixed minor issue with the last channel id not being available.

Version 2.6.0

  • Re-use closed channel ids [#55] - Thanks mikemrm.
  • Changed Poller Timeout to be a constant.
  • Improved Connection Close performance.
  • Channels is now a publicly available variable in Connections.

Version 2.5.0

  • Upgraded pamqp to v2.0.0.
  • Properly wait until the inbound queue is empty when break_on_empty is set [#63] - Thanks TomGudman.
  • Fixed issue with Management queue/exchange declare when the passive flag was set to True.

Credits

Special thanks to gmr (Gavin M. Roy) for creating pamqp, and in addition amqpstorm is heavily influenced by his pika and rabbitpy libraries.

Comments
  • Out of order ack-ing?

    Out of order ack-ing?

    Whoooo, more wierdness!

    .Thread-21 - INFO - Message channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message properties: {'headers': None, 'reply_to': '', 'user_id': '', 'cluster_id': '', 'app_id': '', 'delivery_mode': None, 'content_type': '', 'correlation_id': 'keepalive', 'expiration': '', 'message_id': '', 'message_type': '', 'priority': None, 'content_encoding': 'utf-8', 'timestamp': None}
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _method: {'routing_key': b'nak', 'redelivered': False, 'delivery_tag': 5, 'exchange': b'keepalive_exchange140037604406920', 'consumer_tag': b'amq.ctag-DCdsyFR66UJsU7sgkNp9oQ'}
    ACK For delivery tag: 5
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Heartbeat packet received! wat
    Main.Connector.Internal(/rpcsys).Thread-23 - INFO - Timeout watcher loop. Current message counts: 0 (out: 0, in: 0)
    Main.Connector.Container(/rpcsys).Thread-23 - INFO - Interface timeout thread. Ages: heartbeat -> 4.83, last message -> 32.51.
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Received message!
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message properties: {'headers': None, 'reply_to': '', 'user_id': '', 'cluster_id': '', 'app_id': '', 'delivery_mode': None, 'content_type': '', 'correlation_id': 'keepalive', 'expiration': '', 'message_id': '', 'message_type': '', 'priority': None, 'content_encoding': 'utf-8', 'timestamp': None}
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _method: {'routing_key': b'nak', 'redelivered': False, 'delivery_tag': 6, 'exchange': b'keepalive_exchange140037604406920', 'consumer_tag': b'amq.ctag-DCdsyFR66UJsU7sgkNp9oQ'}
    ACK For delivery tag: 6
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Heartbeat packet received! wat
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Received message!
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message properties: {'headers': None, 'reply_to': '', 'user_id': '', 'cluster_id': '', 'app_id': '', 'delivery_mode': None, 'content_type': '', 'correlation_id': 'keepalive', 'expiration': '', 'message_id': '', 'message_type': '', 'priority': None, 'content_encoding': 'utf-8', 'timestamp': None}
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _channel: <amqpstorm.channel.Channel object at 0x7f5d08cdbcc8>
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Message _method: {'routing_key': b'nak', 'redelivered': True, 'delivery_tag': 7, 'exchange': b'keepalive_exchange140037604406920', 'consumer_tag': b'amq.ctag-DCdsyFR66UJsU7sgkNp9oQ'}
    ACK For delivery tag: 7
    Main.Connector.Container(/rpcsys).Thread-21 - INFO - Heartbeat packet received! wat
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR - Error while in rx runloop!
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -       Channel 1 was closed by remote server: PRECONDITION_FAILED - unknown delivery tag 6
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR - Traceback (most recent call last):
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/FetchAgent/AmqpConnector/__init__.py", line 546, in _rx_poll
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -     self.interface.process_rx_events()
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/FetchAgent/AmqpConnector/__init__.py", line 171, in process_rx_events
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -     self.storm_channel.process_data_events(to_tuple=False)
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/channel.py", line 266, in process_data_events
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -     for message in self.build_inbound_messages(break_on_empty=True):
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/channel.py", line 113, in build_inbound_messages
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -     self.check_for_errors()
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -   File "/media/Storage/Scripts/ReadableWebProxy/amqpstorm/channel.py", line 188, in check_for_errors
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -     raise exception
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR - amqpstorm.exception.AMQPChannelError: Channel 1 was closed by remote server: PRECONDITION_FAILED - unknown delivery tag 6
    Main.Connector.Internal(/rpcsys).Thread-21 - ERROR -
    Main.Connector.Internal(/rpcsys).Thread-21 - WARNING - Should die flag! Runstate: running, threads live: threads alive, had exception: yes.
    Main.Connector.Internal(/rpcsys).Thread-21 - INFO - RX Poll process dying. Threads_live: 1, had exception 1, should_die True
    Main.Connector.Internal(/rpcsys).Thread-22 - WARNING - Should die flag! Runstate: running, threads live: threads alive, had exception: yes.
    Main.Connector.Internal(/rpcsys).Thread-22 - WARNING - Should die flag! Runstate: running, threads live: threads alive, had exception: yes.
    Main.Connector.Internal(/rpcsys).Thread-22 - INFO - TX Poll process dying (should die: True). Threads_live: 1, runstate: 1, resp queue size: 0, had exception 1.
    Main.Connector.Internal(/rpcsys).Thread-23 - WARNING - Should die flag! Runstate: running, threads live: threads alive, had exception: yes.
    Disconnecting!
    Main.Connector.Container(/rpcsys).Thread-2 - INFO - Closing channel
    Main.Connector.Container(/rpcsys).Thread-2 - INFO - Closing connection
    Running state: True, thread alive: True, thread id:140037440014080
    Joining _inbound_thread. Runstate: %s False
    
    

    Context:

    I have a connection with a thread processing the receiving messages. I have instrumented Message.ack() with a print statement that prints the delivery tag that it's acking.

    It appears I'm calling [delivery tag 6].ack(), [delivery tag 7].ack(), and somehow the ack for delivery tag 7 is getting received by the rabbitmq server /first/, resulting in a PRECONDITION_FAILED error because acking 7 implicitly acks previous tags, and therefore 6 is not a valid delivery tag anymore.

    I'm working on pulling out a testable example, but it's certainly odd.


    Incidentally, the new docs pages are fancypants!

    wontfix 
    opened by fake-name 49
  • Add the ability to forcefully close a connection.

    Add the ability to forcefully close a connection.

    Basically, I'm dealing with a context where I have a high volume traffic AMQP connection, across a high-latency, unreliable link (intercontinental).

    If there is a way for a connection to possibly wedge, I'll encounter it.

    Anyways, The issue I ran into here is that it's possible for the "close" operation on a connection to wedge indefinitely, preventing a connection from actually closing (I /think/ if the shutdown RPC request gets garbled/eaten somewhere).

    The question of /how/ this happens aside, I therefore need to be able to kill a open connection in a dirty manner. This adds the ability to kill() a connection which will force the worker thread to exit immediately, without bothering to do any proper cleanup.

    Because this is a operation that's generally done with a additional watcher process, outside of the normal program flow, I used multiprocessing primitives to manage the _die flag (at one point, the watcher was a separate /process/, not thread).

    Anyways, I'm not sure if this is inline with the ideas of the library, but I haven't been able to wedge the amqp connection with this patch set, so that's something.


    Only tested on Py3.5x64, Ubuntu 14.04.

    Things I haven't done: Additional unit tests.

    opened by fake-name 17
  • SSL Retry

    SSL Retry

    I regularly get [SSL: BAD_WRITE_RETRY] bad write retry (_ssl.c:1647) errors when running AMQP-Storm using SSL.

    Looking at the code I think its because it's not handling SSL Retry exceptions from the socket

    http://stackoverflow.com/questions/2997218/why-am-i-getting-error1409f07fssl-routinesssl3-write-pending-bad-write-retr

    i'll get a stack trace to see if its on write or on do_handshake

    bug 
    opened by thejuan 17
  • reached the maximum number of channels raised with closed channels

    reached the maximum number of channels raised with closed channels

    Hello,

    I've run into an issue with creating new channels and receiving the reached the maximum number of channels 65535 when attempting to create a new channel.

    After some digging, I noticed Connection._get_next_available_channel_id() accounts for all channels, both open and closed. I believe filtering the count for just opened should resolve this issue.

    I tested with a quick fix

        def _get_next_available_channel_id(self):
            channel_id = len(self._channels) + 1
            active_channels = [
                ch for ch in list(self._channels.values()) if ch and ch.is_open
            ]
            if len(active_channels) >= self.max_allowed_channels:
                raise AMQPConnectionError(
                    'reached the maximum number of channels %d' %
                    self.max_allowed_channels)
            return channel_id
    

    However it may be better to just keep an active count

    bug 
    opened by mikemrm 12
  • Unhandled Frames in 3.6.2

    Unhandled Frames in 3.6.2

    Upgraded to RabbitMq 3.6.2 starting to see the following errors which I think lead to using all the memory on rabbitmq server.

    [Channel%d] Unhandled Frame: %s -- %s

    No code changes, just a server upgrade. Not sure if this storm or rabbitmq.

    opened by thejuan 12
  • Exception: 'Deliver' object has no attribute 'body_size'

    Exception: 'Deliver' object has no attribute 'body_size'

    I have been experiencing a strange error, just for some messages I got this error:

    'Deliver' object has no attribute 'body_size' channel.py (line: 253)

    I looked at self._inbound and I saw some Deliver objects in sequence instead of Deliver/ContentHeader/ContentBody

    amqp-storm

    The messages are being sent to the queue by another project in another language, so I have no clue why it is happening.

    bug 
    opened by viniciuschiele 12
  • Cant publish multiple messages

    Cant publish multiple messages

    This is a weird one, I've upgraded to 2.2 and trying to do the following script

    import os
    import logging
    logging.basicConfig(level=logging.DEBUG)
    
    from amqpstorm import Message
    from amqpstorm import UriConnection
    
    keys = ["1","2","3"]
    #signer = Signer()
    bus = UriConnection("***")
    with bus.channel(rpc_timeout=10) as channel:
        channel.confirm_deliveries()
        for key in keys:
            print key
            msg = "My Message"
            #properties = {"headers": {"md5-signature": signer.sign(msg)}}
            Message.create(channel, msg, properties).publish(key, exchange="amq.topic")
    
    

    I get this error on the second publish.

    DEBUG:amqpstorm.connection:Connection Opening DEBUG:amqpstorm.channel0:Frame Received: Connection.Start DEBUG:amqpstorm.channel0:Frame Sent: Connection.StartOk DEBUG:amqpstorm.channel0:Frame Received: Connection.Tune DEBUG:amqpstorm.channel0:Frame Sent: Connection.TuneOk DEBUG:amqpstorm.channel0:Frame Sent: Connection.Open DEBUG:amqpstorm.channel0:Frame Received: Connection.OpenOk DEBUG:amqpstorm.heartbeat:Heartbeat Checker Started DEBUG:amqpstorm.connection:Connection Opened DEBUG:amqpstorm.connection:Opening a new Channel DEBUG:amqpstorm.connection:Channel #1 Opened 1 2 DEBUG:amqpstorm.channel:Channel #1 Closing DEBUG:amqpstorm.channel:Channel #1 Closed Exception in thread amqpstorm.io: Traceback (most recent call last): File "/usr/lib/python2.7/threading.py", line 801, in __bootstrap_inner self.run() File "/usr/lib/python2.7/threading.py", line 754, in run self.__target(*self.__args, **self.__kwargs) File "/home/adam/.venv/local/lib/python2.7/site-packages/amqpstorm/io.py", line 219, in _process_incoming_data if self.poller.is_ready: File "/home/adam/.venv/local/lib/python2.7/site-packages/amqpstorm/io.py", line 48, in is_ready except select.error as why: AttributeError: 'NoneType' object has no attribute 'error'

    bug 
    opened by thejuan 11
  • Testing with pamqp 3.0.0a4

    Testing with pamqp 3.0.0a4

    I plan on releasing pamqp 3.0 soon and wanted to make sure you were aware of the changes that are making it into 3.0 and give you an opportunity to report any issues prior to its release.

    Please see the version history @ https://pamqp.readthedocs.io/en/latest/history.html

    opened by gmr 10
  • potential infinite loop ?

    potential infinite loop ?

    Hello,

    I am using your library in conjunction with celery and i regularly encounter a problem: When asking celery to restart its worker (emitting a SIGTERM), the process is blocked and celery doesn't want to restart. Celery maintains a parent thread that runs child threads where tasks are executed. Amqpstorm's thread is also run from this parent thread. The processes in htop look like this:

    screenshot from 2017-04-11 10-34-26

    Here process 13880 & 13803 are related to amqpstorm. 13803 is the inbound_thread while 13880 is the heartbeat timer.

    After some investigation, i found out that killing the thread responsible for the heartbeat timer allows celery to gracefuly restart... This lead me to think that the timer could possibly create a deadlock.

    I have created a pull request based on this assumption and will try this branch on my setup: Can you tell me what you think of it ? Do you see any other possible explanations for the error i see ?

    opened by cp2587 10
  • Connection was closed by remote server: CONNECTION_FORCED

    Connection was closed by remote server: CONNECTION_FORCED

    Hello,

    We recently updated the library version to 2.1.3 (from 1.1.7) and we now face several errors we did not have previously. One of them is the following:

        self.channel.basic.publish(body=data, exchange=self.exchange, routing_key='')
      File "/home/wirebot/.virtualenvs/cayzn_yield.etl/local/lib/python2.7/site-packages/amqpstorm/basic.py", line 194, in publish
        self._channel.write_frames(frames_out)
      File "/home/wirebot/.virtualenvs/cayzn_yield.etl/local/lib/python2.7/site-packages/amqpstorm/channel.py", line 326, in write_frames
        self.check_for_errors()
      File "/home/wirebot/.virtualenvs/cayzn_yield.etl/local/lib/python2.7/site-packages/amqpstorm/channel.py", line 169, in check_for_errors
        self._connection.check_for_errors()
      File "/home/wirebot/.virtualenvs/cayzn_yield.etl/local/lib/python2.7/site-packages/amqpstorm/connection.py", line 155, in check_for_errors
        raise self.exceptions[0]
    AMQPConnectionError: Connection was closed by remote server: CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'
    

    On the rabbitmq server, we have the following logs: client unexpectedly closed TCP connection

    Finally here is the code i use to create an amqp socket (this socket is then used by logging to send log in the queue) :

    class AMQPStormSocket(object):
    
        def __init__(self, host, port, username, password, virtual_host, exchange, queue, exchange_is_durable,
                     queue_is_durable, exchange_type, fallback_call):
    
            # create connection & channel
            self.connection = amqpstorm.Connection(host, username, password, port, virtual_host=virtual_host, timeout=1)
            self.channel = self.connection.channel()
    
            # create an exchange, if needed
            self.channel.exchange.declare(exchange=exchange, exchange_type=exchange_type, durable=exchange_is_durable)
            # create a queue, if needed
            self.channel.queue.declare(queue=queue, durable=queue_is_durable, passive=False, auto_delete=False)
            # bind it
            self.channel.queue.bind(queue=queue, exchange=exchange)
    
            # needed when publishing
            self.exchange = exchange
    
            self.fallback_call = fallback_call
    
        def sendall(self, data):
            try:
                self.channel.basic.publish(body=data, exchange=self.exchange, routing_key='')
            except Exception as e:
                self.fallback_call(e)
    
        def close(self):
            try:
                self.channel.close()
                self.connection.close()
            except Exception:
                pass
    

    Do you have an idea on how to fix these errors ?

    opened by cp2587 10
  • Queue declaration must not be obligatory, AMQPChannelError on trying to consume from predeclared queue

    Queue declaration must not be obligatory, AMQPChannelError on trying to consume from predeclared queue

    I use RabbitMQ, all the excahnges and consumers were preconfigured with differnet options. Unfortunately when I try to consume from a durable queue I get

    mqpstorm.exception.AMQPChannelError: Channel 1 was closed by remote server: NOT_FOUND - no previously declared queue

    If I try to declare a queue without specifying any params I get

    Channel 1 was closed by remote server: PRECONDITION_FAILED - inequivalent arg 'durable' for queue 'test_queue' in vhost 'lizziebot': received 'false' but current is 'true'

    The only way is to specify all the params including including ttl, autodelete, etc. I cannot use this library because I need rabbitmq structure to be configured not on the client side ;-(

    bug 
    opened by blackalegator 9
  • Is there any support for AMQP 1-0-0 ?

    Is there any support for AMQP 1-0-0 ?

    Does the module supports AMQP-1-0-0.

    I do not see anything regarding supported version in the Doc.

    If not, is there any plan to support it in the future?

    opened by ruffp 1
  • Installation errors with Python 3.10

    Installation errors with Python 3.10

    I am using Python3 version 3.10.4 on Ubuntu 18.04 LTS and encountering the following error when attempting to install amqpstorm:

    $ pip install amqpstorm
    Keyring is skipped due to an exception: module 'collections' has no attribute 'MutableMapping'
    Defaulting to user installation because normal site-packages is not writeable
    Collecting amqpstorm
      Using cached AMQPStorm-2.10.4.tar.gz (71 kB)
      Preparing metadata (setup.py) ... error
      error: subprocess-exited-with-error
    
      × python setup.py egg_info did not run successfully.
      │ exit code: 1
      ╰─> [20 lines of output]
          Traceback (most recent call last):
            File "<string>", line 2, in <module>
            File "<pip-setuptools-caller>", line 14, in <module>
            File "/usr/lib/python3/dist-packages/setuptools/__init__.py", line 12, in <module>
              import setuptools.version
            File "/usr/lib/python3/dist-packages/setuptools/version.py", line 1, in <module>
              import pkg_resources
            File "/usr/lib/python3/dist-packages/pkg_resources/__init__.py", line 77, in <module>
              __import__('pkg_resources.extern.packaging.requirements')
            File "/usr/lib/python3/dist-packages/pkg_resources/_vendor/packaging/requirements.py", line 9, in <module>
              from pkg_resources.extern.pyparsing import stringStart, stringEnd, originalTextFor, ParseException
            File "<frozen importlib._bootstrap>", line 1027, in _find_and_load
            File "<frozen importlib._bootstrap>", line 1006, in _find_and_load_unlocked
            File "<frozen importlib._bootstrap>", line 672, in _load_unlocked
            File "<frozen importlib._bootstrap>", line 632, in _load_backward_compatible
            File "/usr/lib/python3/dist-packages/pkg_resources/extern/__init__.py", line 43, in load_module
              __import__(extant)
            File "/usr/lib/python3/dist-packages/pkg_resources/_vendor/pyparsing.py", line 943, in <module>
              collections.MutableMapping.register(ParseResults)
          AttributeError: module 'collections' has no attribute 'MutableMapping'
          [end of output]
    
      note: This error originates from a subprocess, and is likely not a problem with pip.
    error: metadata-generation-failed
    
    × Encountered error while generating package metadata.
    ╰─> See above for output.
    
    note: This is an issue with the package mentioned above, not pip.
    hint: See above for details.
    

    I have seen recommendations to upgrade to the latest version of requests when this happens, but I already have the latest version of requests installed, here is the contents of my requirements.txt file:

    boto3==1.23.0
    botocore==1.26.0
    jmespath==1.0.0
    python-dateutil==2.8.2
    s3transfer==0.5.2
    six==1.16.0
    urllib3==1.26.9
    redis==4.3.1
    requests==2.27.1
    amqpstorm==2.10.4
    pyyaml==6.0
    
    opened by ashleykleynhans 3
  • How does amqpstorm handle Rabbitmq memory alarms

    How does amqpstorm handle Rabbitmq memory alarms

    Hello,

    Thanks for your work on this library 👍 I have a question regarding Rabbitmq memory watermarks and the way amqpstorm handles them.

    Imagine the following:

    1. I have a process that keeps publishing messages (basically an infinite loop for this example)
    2. At a certain point memory alarm is triggered on RMQ, and further publishes are blocked (Writes on a blocked connection will time out or fail with an I/O write exception.), also the publisher get notified that a connexion is blocked (Compatible AMQP 0-9-1 clients will be [notified] when they are blocked and unblocked.).
    3. Then Rabbitmq flushes out some of the data to disk, and unblocks publisher

    What happens on AMQPstorm side ? Does it raise ? do we lose some messages ? does the block/unblock logic trigger some logic in AMQPStorm ?

    Thanks for the help

    Links: RMQ doc

    opened by mehdigmira 4
  • Asyncronous delivery confirmation

    Asyncronous delivery confirmation

    Hi,

    First, thanks for the work.

    We are experiencing some slow-down while enqueuing message with confirm-delivery. Some time before we were not using this feature and enqueuing would be quite fast, and after enabling this feature, enqueuing would become quite slower.

    I checked and it seems that the lib wait for the confirmation on each message while on the following blog post (https://blog.rabbitmq.com/posts/2011/02/introducing-publisher-confirms) they seems to tell us to enqueue all the message and then wait for all the confirmations, do you think there would be a way for amqpstorm to handle that ? If you have some indications, I could try an implementation.

    Thank you.

    enhancement 
    opened by antoinerabany 4
  • Robust Connection example

    Robust Connection example

    Hi,

    This is more of a question than an issue - is it possible to be able to manage a long-lived connection such that a reconnection can be immediately attempted if the channel/connection is closed?

    I see the robust consumer example, but nothing for a connection that maybe used just for publishing. I don't see the ability to add any callbacks for a closed connection or a way to block the current thread until there's an error on the connection/channel, or maybe I'm just missing something.

    Thanks for any help in advance.

    opened by jmcarter 4
Releases(2.10.5)
  • 2.10.5(Aug 14, 2022)

    • Added support for bulk removing users with the Management Api.
    • Added support to get the Cluster Name using the Management Api.
    • Fixed ConnectionUri to default to port 5761 when using ssl [#119] - Thanks s-at-ik.
    Source code(tar.gz)
    Source code(zip)
  • 2.10.4(Nov 20, 2021)

  • 2.10.3(Nov 4, 2021)

  • 2.10.2(Oct 22, 2021)

  • 2.10.1(Sep 29, 2021)

  • 2.10.0(Sep 7, 2021)

    • Added Pagination support to Management list calls (e.g. queues list).
    • Added Filtering support to Management list calls.
    • Re-use the requests sessions for Management calls.
    • Updated to use pytest framework instead of nose for testing.
    Source code(tar.gz)
    Source code(zip)
  • 2.9.0(Jun 11, 2021)

    • Added support for custom Message implementations - Thanks Jay Hogg.
    • Fixed a bug with confirm_delivery not working after closing and re-opening an existing channel.
    • Re-worked the channel re-use code.
    Source code(tar.gz)
    Source code(zip)
  • 2.8.5(May 26, 2021)

  • 2.8.4(Mar 16, 2021)

  • 2.8.3(Mar 16, 2021)

    • Fixed pip sdist circular dependency [#88] - Thanks Jay Hogg.
    • Fixed basic.consume argument type in documentation [#86] - Thanks TechmarkDavid.
    Source code(tar.gz)
    Source code(zip)
  • 2.8.2(Oct 8, 2020)

    • Retry on SSLWantReadErrors [#82] - Thanks Bernhard Thiel.
    • Added getter/setter methods for Message properties expiration, message_type and user_id [#86] - Thanks Jay Hogg.
    Source code(tar.gz)
    Source code(zip)
  • 2.8.1(Jun 27, 2020)

  • 2.8.0(Jun 9, 2020)

    • Introduced a new channel function called check_for_exceptions.
    • Fixed issue where publish was successful but raises an error because connection was closed [#80] - Thanks Pavol Plaskoň.
    • Updated SSL handling to use the non-deprecated way of creating a SSL Connection [#79] - Thanks Carl Hörberg from CloudAMQP.
    • Enabled SNI for SSL connections by default [#79] - Thanks Carl Hörberg from CloudAMQP.
    Source code(tar.gz)
    Source code(zip)
  • 2.7.2(Dec 2, 2019)

  • 2.7.1(Jun 16, 2019)

    • Fixed Connection close taking longer than intended when using SSL [#75]- Thanks troglas.
    • Fixed an issue with closing Channels taking too long after the server initiated it.
    Source code(tar.gz)
    Source code(zip)
  • 2.7.0(Apr 13, 2019)

    • Added support for passing your own ssl context [#71] - Thanks troglas.
    • Improved logging verbosity on connection failures [#72] - Thanks troglas.
    • Fixed occasional error message when closing a SSL connection [#68] - Thanks troglas.
    Source code(tar.gz)
    Source code(zip)
  • 2.6.2(Feb 2, 2019)

    • Set default TCP Timeout to 10s on UriConnection to match Connection [#67] - Thanks josemonteiro.
    • Internal RPC Timeout for Opening and Closing Connections are now set to a fixed 30s [#67] - Thanks josemonteiro.
    Source code(tar.gz)
    Source code(zip)
  • 2.6.1(Dec 28, 2018)

  • 2.6.0(Dec 28, 2018)

    • Re-use closed channel ids [#55] - Thanks mikemrm.
    • Changed Poller Timeout to be a constant.
    • Improved Connection Close performance.
    • Channels is now a publicly available variable in Connections.
    Source code(tar.gz)
    Source code(zip)
  • 2.5.0(Nov 25, 2018)

    • Upgraded pamqp to v2.0.0.
      • Python 3 keys will now always be of type str.
      • For more information see https://pamqp.readthedocs.io/en/latest/history.html
    • Properly wait until the inbound queue is empty when break_on_empty is set [#63] - Thanks TomGudman.
    • Fixed issue with Management queue/exchange declare when the passive flag was set to True.
    Source code(tar.gz)
    Source code(zip)
  • 2.4.2(Sep 1, 2018)

    • Added support for External Authentication - Thanks Bernd Höhl.
    • Fixed typo in setup.py extra requirements - Thanks Bernd Höhl.
    • LICENSE file now included in package - Thanks Tomáš Chvátal.
    Source code(tar.gz)
    Source code(zip)
  • 2.4.1(Aug 29, 2018)

    • Added client/server negotiation to better determine the maximum supported channels and maximum allowed frame size [#52] - Thanks gastlich.
    • We now raise an exception if the maximum allowed channel count is reached.
    Source code(tar.gz)
    Source code(zip)
  • 2.4.0(Jan 17, 2018)

  • 2.3.0(Nov 8, 2017)

    • Added delivery_tag property to message.
    • Added redelivered property to message [#41] - Thanks tkram01.
    • Added support for Management Api Healthchecks [#39] - Thanks Julien Carpentier.
    • Fixed incompatibility with Sun Solaris 10 [#46] - Thanks Giuliox.
    • Fixed delivery_tag being set to None by default [#47] - tkram01.
    • Exposed requests verify and certs flags to Management Api [#40] - Thanks Julien Carpentier.
    Source code(tar.gz)
    Source code(zip)
  • 2.2.2(Apr 23, 2017)

  • 2.2.1(Feb 22, 2017)

    • Fixed potential Channel leak [#36] - Thanks Adam Mills.
    • Fixed threading losing select module during python shutdown [#37] - Thanks Adam Mills.
    Source code(tar.gz)
    Source code(zip)
  • 2.2.0(Nov 18, 2016)

    • Connection.close should now be more responsive.
    • Channels are now reset when re-opening an existing connection.
    • Re-wrote large portions of the Test suit.
    Source code(tar.gz)
    Source code(zip)
  • 2.1.4(Nov 3, 2016)

    • Added parameter to override auto-decode on incoming Messages - Thanks Travis Griggs.
    • Fixed a rare bug that could cause the consumer to get stuck if the connection unexpectedly dies - Thanks Connor Wolf.
    Source code(tar.gz)
    Source code(zip)
  • 2.1.3(Sep 29, 2016)

  • 2.1.2(Sep 23, 2016)

Owner
Erik Olof Gunnar Andersson
The views expressed here are 100% mine and in no way reflect those of my employer.
Erik Olof Gunnar Andersson
API Basica per a synologys Active Backup For Buissiness

Synology Active Backup for Business API-NPP Informació Per executar el programa

Nil Pujol 0 May 13, 2022
Telegram Group Manager Bot + Userbot Written In Python Using Pyrogram.

Telegram Group Manager Bot + Userbot Written In Python Using PyrogramTelegram Group Manager Bot + Userbot Written In Python Using Pyrogram

1 Nov 11, 2021
CnCL - CnCLess it's an Easy to deploy Botnet without CnC/C2

CnCL CnCLess it's an Easy to deploy Botnet without CnC/C2, Harder to track and t

ZSendokame 2 Jan 10, 2022
Telegram bot untuk mencari jawaban dibrainly, support inline juga

Brainly-Telebot Bot Untuk Mencari Jawaban Dibrainly Jika ingin clone. Boleh kok Dibuat dengan python menggunakan MTproto Library. Yaitu Pyrogram Bot y

... 7 Mar 17, 2022
Automatic generation of crypto-arts based on image layers

NFT Generator Автоматическая генерация крипто-артов на основе слоев изображения. Установка pip3 install -r requirements.txt rm -rf result/* Как это ра

Zproger 31 Dec 29, 2022
PHION's client-side core python library

PHION-core PHION's client-side core python library. This library is not meant to be used directly by users. If you want to install phion please use th

PHION 2 Feb 07, 2022
SimpleDCABot is a simple bot that buys crypto with a dollar-cost averaging strategy.

Simple Open Dollar Cost Averaging (DCA) Bot SimpleDCABot is a simple bot that buys crypto on a selected exchange at regular intervals for a prescribed

4 Mar 28, 2022
Dados Públicos de CNPJ disponibilizados pela Receita Federal do Brasil

Dados Públicos CNPJ Fonte oficial da Receita Federal do Brasil, aqui. Layout dos arquivos, aqui. A Receita Federal do Brasil disponibiliza bases com o

Aphonso Henrique do Amaral Rafael 102 Dec 28, 2022
🕵️‍♂️ Investigate Google Accounts with emails.

Description GHunt is an OSINT tool to extract information from any Google Account using an email. It can currently extract: Owner's name Last time the

mxrch 13.1k Jan 01, 2023
Auto-commiter - Auto commiter Github

auto committer Github Follow the steps below to use this repository: 1-install c

Arman Ebtekari 8 Nov 14, 2022
This Instagram app created as a clone of instagram.Developed during Moringa Core.

Instagram This Instagram app created as a clone of instagram.Developed during Moringa Core. AUTHOR By: Nyagah Isaac Description This web-app allows a

Nyagah Isaac 1 Nov 01, 2021
Tracks twitter spaces and sends it to a discord webhook.

Tracks twitter spaces and sends it to a discord webhook. Uses the twitter api to find twitter spaces and then the m3u8 url for the space is found using selenium and will have it posted using a discor

Sam Phung 20 Dec 17, 2022
Change the name and pfp of ur accounts, uses tokens.txt for ur tokens.

Change the name and pfp of ur accounts, uses tokens.txt for ur tokens. Also scrapes the pfps+names from a server chosen by you. For hq tokens go to discord.gg/tokenshop or t.me/praisetelegram

cChimney 36 Dec 09, 2022
Home Assistant Hilo Integration via HACS

BETA This is a beta release. There will be some bugs, issues, etc. Please bear with us and open issues in the repo. Hilo Hilo integration for Home Ass

66 Dec 23, 2022
The Discord bot framework for Python

Pycordia ⚠️ Note! As of now, this package is under early development so functionalities are bound to change drastically. We don't recommend you curren

Ángel Carias 24 Jan 01, 2023
Python wrapper for Interactive Brokers Client Portal Web API

EasyIB: Unofficial Wrapper for Interactive Brokers API EasyIB is an unofficial python wrapper for Interactive Brokers Client Portal Web API. Features

39 Dec 13, 2022
Pancakeswap Sniper BOT - TORNADO CASH Proxy (MAC WINDOWS ANDROID LINUX) A fully decentralized protocol for private transactions

TORNADO CASH Proxy Pancakeswap Sniper BOT 2022-V1 (MAC WINDOWS ANDROID LINUX) ⭐️ A fully decentralized protocol for private transactions ⭐️ AUTO DOWNL

Crypto Trader 1 Jan 05, 2022
Verify your Accounts by Tempphone using this Discordbot

Verify your Accounts by Tempphone using this Discordbot 5sim.net is a service, that offer you temp phonenumbers for otp verification. It include a lot

23 Jan 03, 2023
DevSecOps pipeline for Python based web app using Jenkins, Ansible, AWS, and open-source security tools and checks.

DevSecOps pipeline for Python Web App A Jenkins end-to-end DevSecOps pipeline for Python web application, hosted on AWS Ubuntu 20.04 Note: This projec

Devanshu Vashishtha 4 Aug 15, 2022
⚡ Yuriko Robot ⚡ - A Powerful, Smart And Simple Group Manager Written with AioGram , Pyrogram and Telethon

⚡ Yuriko Robot ⚡ - A Powerful, Smart And Simple Group Manager Written with AioGram , Pyrogram and Telethon

Øғғɪᴄɪᴀʟ Ⱡᴏɢ [₳ғᴋ] 1 Apr 01, 2022