ZeroMQ bindings for Twisted

Overview

Twisted bindings for 0MQ

https://coveralls.io/repos/smira/txZMQ/badge.png

Introduction

txZMQ allows to integrate easily ØMQ sockets into Twisted event loop (reactor).

txZMQ supports both CPython and PyPy and ØMQ library version 2.2.x or 3.2.x.

Documentation is available at ReadTheDocs.

Requirements

C library required:

  • ØMQ library 2.2.x or 3.2.x

Python packages required:

  • pyzmq >= 13 (for CPython & PyPy)
  • Twisted

Details

txZMQ introduces support for general 0MQ sockets by class ZmqConnection that can do basic event loop integration, sending-receiving messages in non-blocking manner, scatter-gather for multipart messages.

txZMQ uses ØMQ APIs to get file descriptor that is used to signal pending actions from ØMQ library IO thread running in separate thread. This is used in a custom file descriptor reader, which is then added to the Twisted reactor.

Upgrading from 0.3.x

If you're upgrading from version 0.3.1 and earlier, please apply following changes to your code:

  • root package name was changed from txZMQ to txzmq, adjust your imports accordingly;
  • ZmqEndpointType.Connect has been renamed to ZmqEndpointType.connect;
  • ZmqEndpointType.Bind has been renamed to ZmqEndpointType.bind;
  • ZmqConnection.__init__ has been changed to accept keyword arguments instead of list of endpoints; if you were using one endpoint, no changes are required; if using multiple endpoints, please look for add_endpoints method.

Hacking

Source code for txZMQ is available at github; forks and pull requests are welcome.

To start hacking, fork at github and clone to your working directory. To use the Makefile (for running unit tests, checking for PEP8 compliance and running pyflakes), you will want to have virtualenv installed (it includes a pip installation).

Create a branch, add some unit tests, write your code, check it and test it! Some useful make targets are:

  • make env
  • make check
  • make test

If you don't have an environment set up, a new one will be created for you in ./env. Additionally, txZMQ will be installed as well as required development libs.

Comments
  • Timeout support

    Timeout support

    I have looked at the code and it does not seem like there is any support for setting timeouts.

    Is that correct, it seems like a pretty fundamental thing for REP/REQ over tcp.

    opened by brunsgaard 23
  • Change txZMQ (from GPL) to a more permissive license (at least LGPL)

    Change txZMQ (from GPL) to a more permissive license (at least LGPL)

    Just want to confirm that you are intentionally prohibiting distribution of this package along with proprietary software. While GPL is certainly a valid license for a library, the LGPL is more common for libraries (e.g. psycopg2) -- and you (appear to) have licensed other packages under more permissive licenses like MPL and MIT.

    opened by ambsw-technology 19
  • DEALER socket losing messages

    DEALER socket losing messages

    Hi all,

    I am seeing strange behavior in an application using DEALER and ROUTER sockets.

    Requests made by the DEALER are always received by the ROUTER, but the responses made by the ROUTER are sometimes lost. I've added a test to demonstrate this behavior. It is the only change in this pull request.

    https://github.com/wehriam/txZMQ/blob/master/txZMQ/test/test_async_dealer_router.py

    In the application (but not demonstrated in the test) after some varying number of requests responses eventually fail to arrive entirely.

    I was unsuccessful in resolving this issue and hope you'll have better luck in determining the underlying problem.

    OSX Lion, Python 2.7.1 ZMQ 2.1.9

    Twisted==11.0.0 pep8==0.6.1 pyflakes==0.5.0 pyzmq==2.1.10 txZMQ==0.3 wsgiref==0.1.2 zope.interface==3.8.0

    opened by wehriam 13
  • push with bind

    push with bind

    i run push_pull.py example, using python push_pull.py --method=bind --mode=push, raise

    python push_pull.py --method=bind --mode=push
    
    producing ['1352354886.76', 'ubuntu']
    Unhandled Error
    Traceback (most recent call last):
      File "/usr/local/lib/python2.7/dist-packages/Twisted-12.2.0-py2.7-linux-i686.egg/twisted/internet/base.py", line 413, in fireEvent
        DeferredList(beforeResults).addCallback(self._continueFiring)
      File "/usr/local/lib/python2.7/dist-packages/Twisted-12.2.0-py2.7-linux-i686.egg/twisted/internet/defer.py", line 301, in addCallback
        callbackKeywords=kw)
      File "/usr/local/lib/python2.7/dist-packages/Twisted-12.2.0-py2.7-linux-i686.egg/twisted/internet/defer.py", line 290, in addCallbacks
        self._runCallbacks()
      File "/usr/local/lib/python2.7/dist-packages/Twisted-12.2.0-py2.7-linux-i686.egg/twisted/internet/defer.py", line 551, in _runCallbacks
        current.result = callback(current.result, *args, **kw)
    --- <exception caught here> ---
      File "/usr/local/lib/python2.7/dist-packages/Twisted-12.2.0-py2.7-linux-i686.egg/twisted/internet/base.py", line 426, in _continueFiring
        callable(*args, **kwargs)
      File "push_pull.py", line 45, in produce
        s.push(data)
      File "/home/jjx/sources/txZMQ/txzmq/pushpull.py", line 22, in push
        self.send(message)
      File "/home/jjx/sources/txZMQ/txzmq/connection.py", line 245, in send
        self.socket.send(m, constants.NOBLOCK | constants.SNDMORE)
      File "socket.pyx", line 499, in zmq.core.socket.Socket.send (zmq/core/socket.c:5381)
    
      File "socket.pyx", line 546, in zmq.core.socket.Socket.send (zmq/core/socket.c:5143)
    
      File "socket.pyx", line 175, in zmq.core.socket._send_copy (zmq/core/socket.c:2139)
    
    zmq.core.error.ZMQError: Resource temporarily unavailable
    
    
    opened by jiangjianxiao 11
  • Support for Zeromq3.2

    Support for Zeromq3.2

    Hi, zeromq3.2 isn't quite out yet, but it will be soon! pyzmq has had experimental support for it since pyzmq-2.1.7.

    The following commits introduce the start of experimental support for zeromq-3.2. Whether pyzmq is built against zeromq-2.2 or zeromq-3.2, the txZMQ tests should pass.

    pubsub works, but the req/rep and router/dealer patterns are broken for reasons I haven't figured out yet.

    opened by ralphbean 9
  • Integrate message reading into twisted reactor loop

    Integrate message reading into twisted reactor loop

    During work with txZMQ I met the case when fast incoming messages stream with not so fast message processing functions prevents twisted reactor's loop from resuming because loop reading messages in doRead() never ends - there are always new messages arrived. This pull request transforms message reading and processing loop to cooperative task scheduled by Twisted.

    opened by daa 7
  • XREQ/XREP

    XREQ/XREP

    Hi!

    I just put together this initial XREQ/XREP ZmqConnections subclasses.

    There are also some small changes to the base ZmqConnection class, the most important is the doRead call at the end of _startWriting method. This is the only way (I've found ) to be sure we don't miss any events from the socket, as it's event-triggered and twisted might miss an event while we are writting.

    Cheers.

    opened by verterok 7
  • Incorrect (twice?) shutdown when registered for shutdown

    Incorrect (twice?) shutdown when registered for shutdown

    The factory which has been registered for shutdown ...

            self.zmqfactory = ZmqFactory()
            self.zmqfactory.registerForShutdown()
            self.receiver = MyZmqSubConnection(self,self.zmqfactory)
    

    is shutted down later

           self.zmqfactory.shutdown()
    

    As a result, the unhandled exception happens when the reactor is shutted down:

    Unhandled Error
    Traceback (most recent call last):
      File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 416, in fireEvent
        DeferredList(beforeResults).addCallback(self._continueFiring)
      File "/usr/local/lib/python2.7/dist-packages/twisted/internet/defer.py", line 307, in addCallback
        callbackKeywords=kw)
      File "/usr/local/lib/python2.7/dist-packages/twisted/internet/defer.py", line 296, in addCallbacks
        self._runCallbacks()
      File "/usr/local/lib/python2.7/dist-packages/twisted/internet/defer.py", line 578, in _runCallbacks
        current.result = callback(current.result, *args, **kw)
    --- <exception caught here> ---
      File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 429, in _continueFiring
        callable(*args, **kwargs)
      File "/usr/local/lib/python2.7/dist-packages/txzmq/factory.py", line 51, in shutdown
        for connection in self.connections.copy():
    

    It probably useful either add an 'active' flag of the factory to avoid conflict when shutting down twice (manually and automatically), or unregister from the reactor to be shutted down automatically to avoid a conflict as such.

    opened by nnseva 6
  • Is it possible to create txzmq daemon as twistd plugin?

    Is it possible to create txzmq daemon as twistd plugin?

    Hi,

    How can i create txzmq daemon as twistd plugin? I googled for some information, but without success. If it is possible to do, I will be grateful for any help, advice or information.

    opened by tmpd 6
  • highWaterMark cannot protect from memory overflow

    highWaterMark cannot protect from memory overflow

    With pyzmq, setting HWM would protect you from memory overflow issues like this:

    import zmq
    ctx = zmq.Context()
    s= zmq.socket(zmq.PUSH)
    s.setsockopt(zmq.HWM, 8)
    s.connect('tcp://localhost:33710')  # non-exist endpoint
    while True:
        s.send('XXX')  # this will block when high water mark is reached
    

    With txZMQ, ZmqConnection.send() simply append into a deque(), regardless of highWaterMark. Therefore test like above would take up a lot of memory.

    If send() could return a Deferred object, the calling code would be able to do something about it.

    A limited work around is to set a maxlen on the deque object, so it would drop data when full. Problem is if you really want to wait until the exceptional HWM state is finished (data is very important for example), this won't help.

    feature 
    opened by fantix 6
  • 'ZmqREPConnection' object has no attribute '_routingInfo'

    'ZmqREPConnection' object has no attribute '_routingInfo'

    In our project we encountered the following failure:

    
      File "/opt/neo/venv/lib/python2.7/site-packages/txzmq/req_rep.py", line 153, in __init__
        ZmqConnection.__init__(self, *args, **kwargs)
      File "/opt/neo/venv/lib/python2.7/site-packages/txzmq/connection.py", line 174, in __init__
        self.doRead()
      File "/opt/neo/venv/lib/python2.7/site-packages/txzmq/connection.py", line 281, in doRead
        log.callWithLogger(self, self.messageReceived, message)
    --- <exception caught here> ---
      File "/opt/neo/venv/lib/python2.7/site-packages/twisted/python/log.py", line 88, in callWithLogger
        return callWithContext({"system": lp}, func, *args, **kw)
      File "/opt/neo/venv/lib/python2.7/site-packages/twisted/python/log.py", line 73, in callWithContext
        return context.call({ILogContext: newCtx}, func, *args, **kw)
      File "/opt/neo/venv/lib/python2.7/site-packages/twisted/python/context.py", line 118, in callWithContext
        return self.currentContext().callWithContext(ctx, func, *args, **kw)
      File "/opt/neo/venv/lib/python2.7/site-packages/twisted/python/context.py", line 81, in callWithContext
        return func(*args,**kw)
      File "/opt/neo/venv/lib/python2.7/site-packages/txzmq/req_rep.py", line 179, in messageReceived
        self._routingInfo[msgId] = routingInfo
    exceptions.AttributeError: 'ZmqREPConnection' object has no attribute '_routingInfo'
    
    

    It seems like the doRead called during initialization of ZmqREPConnection actually gets a message and tries to handle it, before the _routingInfo attribute was initialized later in its __init__.

    opened by aryes 5
  • docs: fix simple typo, recevied -> received

    docs: fix simple typo, recevied -> received

    There is a small typo in txzmq/pubsub.py.

    Should read received rather than recevied.

    Semi-automated pull request generated by https://github.com/timgates42/meticulous/blob/master/docs/NOTE.md

    opened by timgates42 0
  • txzmq/pubsub.py

    txzmq/pubsub.py", line 26 -- exception cannot conatenate bytes

    File "/home/ar/Envs/a3_pyvcluster/lib/python3.7/site-packages/txzmq/pubsub.py", line 26, in publish |class ZmqSubConnection(ZmqConnection): self.send(tag + b'\0' + message) | """
    TypeError: can only concatenate str (not "bytes") to str

    def publish(self, message, tag=b''): """ Publish message with specified tag.

        :param message: message data
        :type message: str
        :param tag: message tag
        :type tag: str
        """
        self.send(tag + b'\0' + message) <---- It does not work in python3, I tried both tag and message being bytes and it does not work either
    
    opened by alex4747-pub 1
  • Timeout handling in REP-REQ

    Timeout handling in REP-REQ

    I'm not quite sure this is a bug or if its an intended feature, and I'd like to elaborate on this problem that I've spent some time figuring out:

    I'm using REP-REQ in a context where the REP server is unstable. The txzmq implements a timeout on the ZmqREQConnection.sendMsg() method. This timeout works perfectly in itself, but after some digging it seems the timeout feature is not a part of zmg. The problem with this is that the message is still left in zmq REQ queue, even after the caller has already gotten its Errback() called for the this message. If the server becomes available at some later time, the reply will be send back to the REQ client, but it goes nowhere in the txzmq stack.

    The second problem is that sending a message to the server is the only way of knowing if the server connection is alive, so one needs to keep sending messages (that will time out) regularly to check if the link is up. When it is up again, the server will be flooded with a stream of these "alive" messages.

    My resolution to the issue is to use a variant of the lazy pirate pattern: When sending a message with sendMsg() I install an Errback callback that closes the link, which will flush the connection.

        if not socket:
            socket = ZmqREQConnection(zmq, req_endpoint)
    
        try:
            d = socket.sendMsg(message, timeout=2)
    
            def onTimeout(fail):
                socket.shutdown()
                socket = None
                return fail
    
            d.addErrback(onTimeout)
            return d
    
        except zmq.error.Again:
            print("Handle me...")
    

    Since this is more an effect of the zmq's REP-REQ implementation than txzmq, I'm not sure this is a bug in txzmq, but I dislike that a message receives a timeout, while still being scheduled for transmission. Perhaps the above logic is required? Perhaps there is some way to cancel the message when it times out?

    opened by sveinse 1
  • Add support for multipart messages and remove custom framing for pub/sub

    Add support for multipart messages and remove custom framing for pub/sub

    I've changed the pub/sub code to remove the custom framing using a null byte and added code to send and receive multipart messages. I've ensured API-level compatibility as well as making sure old versions can read data sent by the old one (thanks to the existing compatibility code there). But of course older versions won't be able to read multipart messages sent by this one.

    opened by cncfanatics 5
Releases(1.0.1)
Autoscaling volumes for Kubernetes (with the help of Prometheus)

Kubernetes Volume Autoscaler (with Prometheus) This repository contains a service that automatically increases the size of a Persistent Volume Claim i

DevOps Nirvana 142 Dec 28, 2022
A lobby boy will create a VPS server when you need one, and destroy it after using it.

Lobbyboy What is a lobby boy? A lobby boy is completely invisible, yet always in sight. A lobby boy remembers what people hate. A lobby boy anticipate

226 Dec 29, 2022
ZeroMQ bindings for Twisted

Twisted bindings for 0MQ Introduction txZMQ allows to integrate easily ØMQ sockets into Twisted event loop (reactor). txZMQ supports both CPython and

Andrey Smirnov 149 Dec 08, 2022
A honey token manager and alert system for AWS.

SpaceSiren SpaceSiren is a honey token manager and alert system for AWS. With this fully serverless application, you can create and manage honey token

287 Nov 09, 2022
SSH tunnels to remote server.

Author: Pahaz Repo: https://github.com/pahaz/sshtunnel/ Inspired by https://github.com/jmagnusson/bgtunnel, which doesn't work on Windows. See also: h

Pavel White 1k Dec 28, 2022
Spinnaker is an open source, multi-cloud continuous delivery platform for releasing software changes with high velocity and confidence.

Welcome to the Spinnaker Project Spinnaker is an open-source continuous delivery platform for releasing software changes with high velocity and confid

8.8k Jan 07, 2023
Micro Data Lake based on Docker Compose

Micro Data Lake based on Docker Compose This is the implementation of a Minimum Data Lake

Abel Coronado 15 Jan 07, 2023
Visual disk-usage analyser for docker images

whaler What? A command-line tool for visually investigating the disk usage of docker images Why? Large images are slow to move and expensive to store.

Treebeard Technologies 194 Sep 01, 2022
DataOps framework for Machine Learning projects.

Noronha DataOps Noronha is a Python framework designed to help you orchestrate and manage ML projects life-cycle. It hosts Machine Learning models ins

52 Oct 30, 2022
Daemon to ban hosts that cause multiple authentication errors

__ _ _ ___ _ / _|__ _(_) |_ ) |__ __ _ _ _ | _/ _` | | |/ /| '_ \/ _` | ' \

Fail2Ban 7.8k Jan 09, 2023
GitGoat enables DevOps and Engineering teams to test security products intending to integrate with GitHub

GitGoat is an open source tool that was built to enable DevOps and Engineering teams to design and implement a sustainable misconfiguration prevention strategy. It can be used to test with products w

Arnica 149 Dec 22, 2022
Manage your azure VM easily!

Azure-manager Manage your VM in Azure using cookies.

Team 1injex 129 Dec 17, 2022
Honcho: a python clone of Foreman. For managing Procfile-based applications.

___ ___ ___ ___ ___ ___ /\__\ /\ \ /\__\ /\ \ /\__\ /\

Nick Stenning 1.5k Jan 03, 2023
CDK Template of Table Definition AWS Lambda for RDB

CDK Template of Table Definition AWS Lambda for RDB Overview This sample deploys Amazon Aurora of PostgreSQL or MySQL with AWS Lambda that can define

AWS Samples 5 May 16, 2022
Nagios status monitor for your desktop.

Nagstamon Nagstamon is a status monitor for the desktop. It connects to multiple Nagios, Icinga, Opsview, Centreon, Op5 Monitor/Ninja, Checkmk Multisi

Henri Wahl 361 Jan 05, 2023
A Python library for the Docker Engine API

Docker SDK for Python A Python library for the Docker Engine API. It lets you do anything the docker command does, but from within Python apps – run c

Docker 6.1k Dec 31, 2022
A system for managing CI data for Mozilla projects

Treeherder Description Treeherder is a reporting dashboard for Mozilla checkins. It allows users to see the results of automatic builds and their resp

Mozilla 235 Dec 22, 2022
Utilitaire de contrôle de Kubernetes

Utilitaire de contrôle de Kubernetes ** What is this ??? ** Every time we use a word in English our manager tells us to use the French translation of

Théophane Vié 9 Dec 03, 2022
Emissary - open source Kubernetes-native API gateway for microservices built on the Envoy Proxy

Emissary-ingress Emissary-Ingress is an open-source Kubernetes-native API Gateway + Layer 7 load balancer + Kubernetes Ingress built on Envoy Proxy. E

Emissary Ingress 4k Dec 31, 2022
Learning and experimenting with Kubernetes

Kubernetes Experiments This repository contains code that I'm using to learn and experiment with Kubernetes. 1. Environment setup minikube kubectl doc

Richard To 10 Dec 02, 2022