ZeroMQ bindings for Twisted

Overview

Twisted bindings for 0MQ

https://coveralls.io/repos/smira/txZMQ/badge.png

Introduction

txZMQ allows to integrate easily ØMQ sockets into Twisted event loop (reactor).

txZMQ supports both CPython and PyPy and ØMQ library version 2.2.x or 3.2.x.

Documentation is available at ReadTheDocs.

Requirements

C library required:

  • ØMQ library 2.2.x or 3.2.x

Python packages required:

  • pyzmq >= 13 (for CPython & PyPy)
  • Twisted

Details

txZMQ introduces support for general 0MQ sockets by class ZmqConnection that can do basic event loop integration, sending-receiving messages in non-blocking manner, scatter-gather for multipart messages.

txZMQ uses ØMQ APIs to get file descriptor that is used to signal pending actions from ØMQ library IO thread running in separate thread. This is used in a custom file descriptor reader, which is then added to the Twisted reactor.

Upgrading from 0.3.x

If you're upgrading from version 0.3.1 and earlier, please apply following changes to your code:

  • root package name was changed from txZMQ to txzmq, adjust your imports accordingly;
  • ZmqEndpointType.Connect has been renamed to ZmqEndpointType.connect;
  • ZmqEndpointType.Bind has been renamed to ZmqEndpointType.bind;
  • ZmqConnection.__init__ has been changed to accept keyword arguments instead of list of endpoints; if you were using one endpoint, no changes are required; if using multiple endpoints, please look for add_endpoints method.

Hacking

Source code for txZMQ is available at github; forks and pull requests are welcome.

To start hacking, fork at github and clone to your working directory. To use the Makefile (for running unit tests, checking for PEP8 compliance and running pyflakes), you will want to have virtualenv installed (it includes a pip installation).

Create a branch, add some unit tests, write your code, check it and test it! Some useful make targets are:

  • make env
  • make check
  • make test

If you don't have an environment set up, a new one will be created for you in ./env. Additionally, txZMQ will be installed as well as required development libs.

Comments
  • Timeout support

    Timeout support

    I have looked at the code and it does not seem like there is any support for setting timeouts.

    Is that correct, it seems like a pretty fundamental thing for REP/REQ over tcp.

    opened by brunsgaard 23
  • Change txZMQ (from GPL) to a more permissive license (at least LGPL)

    Change txZMQ (from GPL) to a more permissive license (at least LGPL)

    Just want to confirm that you are intentionally prohibiting distribution of this package along with proprietary software. While GPL is certainly a valid license for a library, the LGPL is more common for libraries (e.g. psycopg2) -- and you (appear to) have licensed other packages under more permissive licenses like MPL and MIT.

    opened by ambsw-technology 19
  • DEALER socket losing messages

    DEALER socket losing messages

    Hi all,

    I am seeing strange behavior in an application using DEALER and ROUTER sockets.

    Requests made by the DEALER are always received by the ROUTER, but the responses made by the ROUTER are sometimes lost. I've added a test to demonstrate this behavior. It is the only change in this pull request.

    https://github.com/wehriam/txZMQ/blob/master/txZMQ/test/test_async_dealer_router.py

    In the application (but not demonstrated in the test) after some varying number of requests responses eventually fail to arrive entirely.

    I was unsuccessful in resolving this issue and hope you'll have better luck in determining the underlying problem.

    OSX Lion, Python 2.7.1 ZMQ 2.1.9

    Twisted==11.0.0 pep8==0.6.1 pyflakes==0.5.0 pyzmq==2.1.10 txZMQ==0.3 wsgiref==0.1.2 zope.interface==3.8.0

    opened by wehriam 13
  • push with bind

    push with bind

    i run push_pull.py example, using python push_pull.py --method=bind --mode=push, raise

    python push_pull.py --method=bind --mode=push
    
    producing ['1352354886.76', 'ubuntu']
    Unhandled Error
    Traceback (most recent call last):
      File "/usr/local/lib/python2.7/dist-packages/Twisted-12.2.0-py2.7-linux-i686.egg/twisted/internet/base.py", line 413, in fireEvent
        DeferredList(beforeResults).addCallback(self._continueFiring)
      File "/usr/local/lib/python2.7/dist-packages/Twisted-12.2.0-py2.7-linux-i686.egg/twisted/internet/defer.py", line 301, in addCallback
        callbackKeywords=kw)
      File "/usr/local/lib/python2.7/dist-packages/Twisted-12.2.0-py2.7-linux-i686.egg/twisted/internet/defer.py", line 290, in addCallbacks
        self._runCallbacks()
      File "/usr/local/lib/python2.7/dist-packages/Twisted-12.2.0-py2.7-linux-i686.egg/twisted/internet/defer.py", line 551, in _runCallbacks
        current.result = callback(current.result, *args, **kw)
    --- <exception caught here> ---
      File "/usr/local/lib/python2.7/dist-packages/Twisted-12.2.0-py2.7-linux-i686.egg/twisted/internet/base.py", line 426, in _continueFiring
        callable(*args, **kwargs)
      File "push_pull.py", line 45, in produce
        s.push(data)
      File "/home/jjx/sources/txZMQ/txzmq/pushpull.py", line 22, in push
        self.send(message)
      File "/home/jjx/sources/txZMQ/txzmq/connection.py", line 245, in send
        self.socket.send(m, constants.NOBLOCK | constants.SNDMORE)
      File "socket.pyx", line 499, in zmq.core.socket.Socket.send (zmq/core/socket.c:5381)
    
      File "socket.pyx", line 546, in zmq.core.socket.Socket.send (zmq/core/socket.c:5143)
    
      File "socket.pyx", line 175, in zmq.core.socket._send_copy (zmq/core/socket.c:2139)
    
    zmq.core.error.ZMQError: Resource temporarily unavailable
    
    
    opened by jiangjianxiao 11
  • Support for Zeromq3.2

    Support for Zeromq3.2

    Hi, zeromq3.2 isn't quite out yet, but it will be soon! pyzmq has had experimental support for it since pyzmq-2.1.7.

    The following commits introduce the start of experimental support for zeromq-3.2. Whether pyzmq is built against zeromq-2.2 or zeromq-3.2, the txZMQ tests should pass.

    pubsub works, but the req/rep and router/dealer patterns are broken for reasons I haven't figured out yet.

    opened by ralphbean 9
  • Integrate message reading into twisted reactor loop

    Integrate message reading into twisted reactor loop

    During work with txZMQ I met the case when fast incoming messages stream with not so fast message processing functions prevents twisted reactor's loop from resuming because loop reading messages in doRead() never ends - there are always new messages arrived. This pull request transforms message reading and processing loop to cooperative task scheduled by Twisted.

    opened by daa 7
  • XREQ/XREP

    XREQ/XREP

    Hi!

    I just put together this initial XREQ/XREP ZmqConnections subclasses.

    There are also some small changes to the base ZmqConnection class, the most important is the doRead call at the end of _startWriting method. This is the only way (I've found ) to be sure we don't miss any events from the socket, as it's event-triggered and twisted might miss an event while we are writting.

    Cheers.

    opened by verterok 7
  • Incorrect (twice?) shutdown when registered for shutdown

    Incorrect (twice?) shutdown when registered for shutdown

    The factory which has been registered for shutdown ...

            self.zmqfactory = ZmqFactory()
            self.zmqfactory.registerForShutdown()
            self.receiver = MyZmqSubConnection(self,self.zmqfactory)
    

    is shutted down later

           self.zmqfactory.shutdown()
    

    As a result, the unhandled exception happens when the reactor is shutted down:

    Unhandled Error
    Traceback (most recent call last):
      File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 416, in fireEvent
        DeferredList(beforeResults).addCallback(self._continueFiring)
      File "/usr/local/lib/python2.7/dist-packages/twisted/internet/defer.py", line 307, in addCallback
        callbackKeywords=kw)
      File "/usr/local/lib/python2.7/dist-packages/twisted/internet/defer.py", line 296, in addCallbacks
        self._runCallbacks()
      File "/usr/local/lib/python2.7/dist-packages/twisted/internet/defer.py", line 578, in _runCallbacks
        current.result = callback(current.result, *args, **kw)
    --- <exception caught here> ---
      File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 429, in _continueFiring
        callable(*args, **kwargs)
      File "/usr/local/lib/python2.7/dist-packages/txzmq/factory.py", line 51, in shutdown
        for connection in self.connections.copy():
    

    It probably useful either add an 'active' flag of the factory to avoid conflict when shutting down twice (manually and automatically), or unregister from the reactor to be shutted down automatically to avoid a conflict as such.

    opened by nnseva 6
  • Is it possible to create txzmq daemon as twistd plugin?

    Is it possible to create txzmq daemon as twistd plugin?

    Hi,

    How can i create txzmq daemon as twistd plugin? I googled for some information, but without success. If it is possible to do, I will be grateful for any help, advice or information.

    opened by tmpd 6
  • highWaterMark cannot protect from memory overflow

    highWaterMark cannot protect from memory overflow

    With pyzmq, setting HWM would protect you from memory overflow issues like this:

    import zmq
    ctx = zmq.Context()
    s= zmq.socket(zmq.PUSH)
    s.setsockopt(zmq.HWM, 8)
    s.connect('tcp://localhost:33710')  # non-exist endpoint
    while True:
        s.send('XXX')  # this will block when high water mark is reached
    

    With txZMQ, ZmqConnection.send() simply append into a deque(), regardless of highWaterMark. Therefore test like above would take up a lot of memory.

    If send() could return a Deferred object, the calling code would be able to do something about it.

    A limited work around is to set a maxlen on the deque object, so it would drop data when full. Problem is if you really want to wait until the exceptional HWM state is finished (data is very important for example), this won't help.

    feature 
    opened by fantix 6
  • 'ZmqREPConnection' object has no attribute '_routingInfo'

    'ZmqREPConnection' object has no attribute '_routingInfo'

    In our project we encountered the following failure:

    
      File "/opt/neo/venv/lib/python2.7/site-packages/txzmq/req_rep.py", line 153, in __init__
        ZmqConnection.__init__(self, *args, **kwargs)
      File "/opt/neo/venv/lib/python2.7/site-packages/txzmq/connection.py", line 174, in __init__
        self.doRead()
      File "/opt/neo/venv/lib/python2.7/site-packages/txzmq/connection.py", line 281, in doRead
        log.callWithLogger(self, self.messageReceived, message)
    --- <exception caught here> ---
      File "/opt/neo/venv/lib/python2.7/site-packages/twisted/python/log.py", line 88, in callWithLogger
        return callWithContext({"system": lp}, func, *args, **kw)
      File "/opt/neo/venv/lib/python2.7/site-packages/twisted/python/log.py", line 73, in callWithContext
        return context.call({ILogContext: newCtx}, func, *args, **kw)
      File "/opt/neo/venv/lib/python2.7/site-packages/twisted/python/context.py", line 118, in callWithContext
        return self.currentContext().callWithContext(ctx, func, *args, **kw)
      File "/opt/neo/venv/lib/python2.7/site-packages/twisted/python/context.py", line 81, in callWithContext
        return func(*args,**kw)
      File "/opt/neo/venv/lib/python2.7/site-packages/txzmq/req_rep.py", line 179, in messageReceived
        self._routingInfo[msgId] = routingInfo
    exceptions.AttributeError: 'ZmqREPConnection' object has no attribute '_routingInfo'
    
    

    It seems like the doRead called during initialization of ZmqREPConnection actually gets a message and tries to handle it, before the _routingInfo attribute was initialized later in its __init__.

    opened by aryes 5
  • docs: fix simple typo, recevied -> received

    docs: fix simple typo, recevied -> received

    There is a small typo in txzmq/pubsub.py.

    Should read received rather than recevied.

    Semi-automated pull request generated by https://github.com/timgates42/meticulous/blob/master/docs/NOTE.md

    opened by timgates42 0
  • txzmq/pubsub.py

    txzmq/pubsub.py", line 26 -- exception cannot conatenate bytes

    File "/home/ar/Envs/a3_pyvcluster/lib/python3.7/site-packages/txzmq/pubsub.py", line 26, in publish |class ZmqSubConnection(ZmqConnection): self.send(tag + b'\0' + message) | """
    TypeError: can only concatenate str (not "bytes") to str

    def publish(self, message, tag=b''): """ Publish message with specified tag.

        :param message: message data
        :type message: str
        :param tag: message tag
        :type tag: str
        """
        self.send(tag + b'\0' + message) <---- It does not work in python3, I tried both tag and message being bytes and it does not work either
    
    opened by alex4747-pub 1
  • Timeout handling in REP-REQ

    Timeout handling in REP-REQ

    I'm not quite sure this is a bug or if its an intended feature, and I'd like to elaborate on this problem that I've spent some time figuring out:

    I'm using REP-REQ in a context where the REP server is unstable. The txzmq implements a timeout on the ZmqREQConnection.sendMsg() method. This timeout works perfectly in itself, but after some digging it seems the timeout feature is not a part of zmg. The problem with this is that the message is still left in zmq REQ queue, even after the caller has already gotten its Errback() called for the this message. If the server becomes available at some later time, the reply will be send back to the REQ client, but it goes nowhere in the txzmq stack.

    The second problem is that sending a message to the server is the only way of knowing if the server connection is alive, so one needs to keep sending messages (that will time out) regularly to check if the link is up. When it is up again, the server will be flooded with a stream of these "alive" messages.

    My resolution to the issue is to use a variant of the lazy pirate pattern: When sending a message with sendMsg() I install an Errback callback that closes the link, which will flush the connection.

        if not socket:
            socket = ZmqREQConnection(zmq, req_endpoint)
    
        try:
            d = socket.sendMsg(message, timeout=2)
    
            def onTimeout(fail):
                socket.shutdown()
                socket = None
                return fail
    
            d.addErrback(onTimeout)
            return d
    
        except zmq.error.Again:
            print("Handle me...")
    

    Since this is more an effect of the zmq's REP-REQ implementation than txzmq, I'm not sure this is a bug in txzmq, but I dislike that a message receives a timeout, while still being scheduled for transmission. Perhaps the above logic is required? Perhaps there is some way to cancel the message when it times out?

    opened by sveinse 1
  • Add support for multipart messages and remove custom framing for pub/sub

    Add support for multipart messages and remove custom framing for pub/sub

    I've changed the pub/sub code to remove the custom framing using a null byte and added code to send and receive multipart messages. I've ensured API-level compatibility as well as making sure old versions can read data sent by the old one (thanks to the existing compatibility code there). But of course older versions won't be able to read multipart messages sent by this one.

    opened by cncfanatics 5
Releases(1.0.1)
Run Oracle on Kubernetes with El Carro

El Carro is a new project that offers a way to run Oracle databases in Kubernetes as a portable, open source, community driven, no vendor lock-in container orchestration system. El Carro provides a p

Google Cloud Platform 205 Dec 30, 2022
A cpp project template that uses CMake to build and Google Test / Github Actions to provide a CI

A cpp project template that uses CMake to build and Google Test / Github Actions to provide a CI

Martin Olivier 6 Nov 17, 2022
Let's learn how to build, release and operate your containerized applications to Amazon ECS and AWS Fargate using AWS Copilot.

🚀 Welcome to AWS Copilot Workshop In this workshop, you'll learn how to build, release and operate your containerised applications to Amazon ECS and

Donnie Prakoso 15 Jul 14, 2022
MicroK8s is a small, fast, single-package Kubernetes for developers, IoT and edge.

MicroK8s The smallest, fastest Kubernetes Single-package fully conformant lightweight Kubernetes that works on 42 flavours of Linux. Perfect for: Deve

Ubuntu 7.1k Jan 08, 2023
A colony of interacting processes

NColony Infrastructure for running "colonies" of processes. Hacking $ tox Should DTRT -- if it passes, it means unit tests are passing, and 100% cover

23 Apr 04, 2022
Webinar oficial Zabbix Brasil. Uma série de 4 aulas sobre API do Zabbix.

Repositório de scripts do Webinar de API do Zabbix Webinar oficial Zabbix Brasil. Uma série de 4 aulas sobre API do Zabbix. Nossos encontros [x] 04/11

Robert Silva 7 Mar 31, 2022
HB Case Study

HB Case Study Envoy Proxy It is a modern Layer7(App) and Layer3(TCP) proxy Incredibly modernized version of reverse proxies like NGINX, HAProxy It is

Ilker Ispir 1 Oct 22, 2021
Iris is a highly configurable and flexible service for paging and messaging.

Iris Iris core, API, UI and sender service. For third-party integration support, see iris-relay, a stateless proxy designed to sit at the edge of a pr

LinkedIn 715 Dec 28, 2022
This Docker container is build to run on a server an provide an easy to use interface for every student to vote for their councilors

This Docker container is build to run on a server and provide an easy to use interface for every student to vote for their councilors.

Robin Adelwarth 7 Nov 23, 2022
Tools and Docker images to make a fast Ruby on Rails development environment

Tools and Docker images to make a fast Ruby on Rails development environment. With the production templates, moving from development to production will be seamless.

1 Nov 13, 2022
DataOps framework for Machine Learning projects.

Noronha DataOps Noronha is a Python framework designed to help you orchestrate and manage ML projects life-cycle. It hosts Machine Learning models ins

52 Oct 30, 2022
Ansible Collection: A collection of Ansible Modules and Lookup Plugins (MLP) from Linuxfabrik.

ansible_mlp An Ansible collection of Ansible Modules and Lookup Plugins (MLP) from Linuxfabrik. Ansible Bitwarden Item Lookup Plugin Returns a passwor

Linuxfabrik 2 Feb 07, 2022
Ajenti Core and stock plugins

Ajenti is a Linux & BSD modular server admin panel. Ajenti 2 provides a new interface and a better architecture, developed with Python3 and AngularJS.

Ajenti Project 7k Jan 03, 2023
This projects provides the documentation and the automation(code) for the Oracle EMEA WLA COA Demo UseCase.

COA DevOps Training UseCase This projects provides the documentation and the automation(code) for the Oracle EMEA WLA COA Demo UseCase. Demo environme

Cosmin Tudor 1 Jan 28, 2022
A simple python application for running a CI pipeline locally This app currently supports GitLab CI scripts

🏃 Simple Local CI Runner 🏃 A simple python application for running a CI pipeline locally This app currently supports GitLab CI scripts ⚙️ Setup Inst

Tom Stowe 0 Jan 11, 2022
A repository containing a short tutorial for Docker (with Python).

Docker Tutorial for IFT 6758 Lab In this repository, we examine the advtanges of virtualization, what Docker is and how we can deploy simple programs

Arka Mukherjee 0 Dec 14, 2021
Software to automate the management and configuration of any infrastructure or application at scale. Get access to the Salt software package repository here:

Latest Salt Documentation Open an issue (bug report, feature request, etc.) Salt is the world’s fastest, most intelligent and scalable automation engi

SaltStack 12.9k Jan 04, 2023
NixOps is a tool for deploying to NixOS machines in a network or cloud.

NixOps NixOps is a tool for deploying to NixOS machines in a network or the cloud. Key features include: Declarative: NixOps determines and carries ou

Nix/Nixpkgs/NixOS 1.2k Jan 02, 2023
Blazingly-fast :rocket:, rock-solid, local application development :arrow_right: with Kubernetes.

Gefyra Gefyra gives Kubernetes-("cloud-native")-developers a completely new way of writing and testing their applications. Over are the times of custo

Michael Schilonka 352 Dec 26, 2022
This project shows how to serve an TF based image classification model as a web service with TFServing, Docker, and Kubernetes(GKE).

Deploying ML models with CPU based TFServing, Docker, and Kubernetes By: Chansung Park and Sayak Paul This project shows how to serve a TensorFlow ima

Chansung Park 104 Dec 28, 2022