ZeroMQ bindings for Twisted

Overview

Twisted bindings for 0MQ

https://coveralls.io/repos/smira/txZMQ/badge.png

Introduction

txZMQ allows to integrate easily ØMQ sockets into Twisted event loop (reactor).

txZMQ supports both CPython and PyPy and ØMQ library version 2.2.x or 3.2.x.

Documentation is available at ReadTheDocs.

Requirements

C library required:

  • ØMQ library 2.2.x or 3.2.x

Python packages required:

  • pyzmq >= 13 (for CPython & PyPy)
  • Twisted

Details

txZMQ introduces support for general 0MQ sockets by class ZmqConnection that can do basic event loop integration, sending-receiving messages in non-blocking manner, scatter-gather for multipart messages.

txZMQ uses ØMQ APIs to get file descriptor that is used to signal pending actions from ØMQ library IO thread running in separate thread. This is used in a custom file descriptor reader, which is then added to the Twisted reactor.

Upgrading from 0.3.x

If you're upgrading from version 0.3.1 and earlier, please apply following changes to your code:

  • root package name was changed from txZMQ to txzmq, adjust your imports accordingly;
  • ZmqEndpointType.Connect has been renamed to ZmqEndpointType.connect;
  • ZmqEndpointType.Bind has been renamed to ZmqEndpointType.bind;
  • ZmqConnection.__init__ has been changed to accept keyword arguments instead of list of endpoints; if you were using one endpoint, no changes are required; if using multiple endpoints, please look for add_endpoints method.

Hacking

Source code for txZMQ is available at github; forks and pull requests are welcome.

To start hacking, fork at github and clone to your working directory. To use the Makefile (for running unit tests, checking for PEP8 compliance and running pyflakes), you will want to have virtualenv installed (it includes a pip installation).

Create a branch, add some unit tests, write your code, check it and test it! Some useful make targets are:

  • make env
  • make check
  • make test

If you don't have an environment set up, a new one will be created for you in ./env. Additionally, txZMQ will be installed as well as required development libs.

Comments
  • Timeout support

    Timeout support

    I have looked at the code and it does not seem like there is any support for setting timeouts.

    Is that correct, it seems like a pretty fundamental thing for REP/REQ over tcp.

    opened by brunsgaard 23
  • Change txZMQ (from GPL) to a more permissive license (at least LGPL)

    Change txZMQ (from GPL) to a more permissive license (at least LGPL)

    Just want to confirm that you are intentionally prohibiting distribution of this package along with proprietary software. While GPL is certainly a valid license for a library, the LGPL is more common for libraries (e.g. psycopg2) -- and you (appear to) have licensed other packages under more permissive licenses like MPL and MIT.

    opened by ambsw-technology 19
  • DEALER socket losing messages

    DEALER socket losing messages

    Hi all,

    I am seeing strange behavior in an application using DEALER and ROUTER sockets.

    Requests made by the DEALER are always received by the ROUTER, but the responses made by the ROUTER are sometimes lost. I've added a test to demonstrate this behavior. It is the only change in this pull request.

    https://github.com/wehriam/txZMQ/blob/master/txZMQ/test/test_async_dealer_router.py

    In the application (but not demonstrated in the test) after some varying number of requests responses eventually fail to arrive entirely.

    I was unsuccessful in resolving this issue and hope you'll have better luck in determining the underlying problem.

    OSX Lion, Python 2.7.1 ZMQ 2.1.9

    Twisted==11.0.0 pep8==0.6.1 pyflakes==0.5.0 pyzmq==2.1.10 txZMQ==0.3 wsgiref==0.1.2 zope.interface==3.8.0

    opened by wehriam 13
  • push with bind

    push with bind

    i run push_pull.py example, using python push_pull.py --method=bind --mode=push, raise

    python push_pull.py --method=bind --mode=push
    
    producing ['1352354886.76', 'ubuntu']
    Unhandled Error
    Traceback (most recent call last):
      File "/usr/local/lib/python2.7/dist-packages/Twisted-12.2.0-py2.7-linux-i686.egg/twisted/internet/base.py", line 413, in fireEvent
        DeferredList(beforeResults).addCallback(self._continueFiring)
      File "/usr/local/lib/python2.7/dist-packages/Twisted-12.2.0-py2.7-linux-i686.egg/twisted/internet/defer.py", line 301, in addCallback
        callbackKeywords=kw)
      File "/usr/local/lib/python2.7/dist-packages/Twisted-12.2.0-py2.7-linux-i686.egg/twisted/internet/defer.py", line 290, in addCallbacks
        self._runCallbacks()
      File "/usr/local/lib/python2.7/dist-packages/Twisted-12.2.0-py2.7-linux-i686.egg/twisted/internet/defer.py", line 551, in _runCallbacks
        current.result = callback(current.result, *args, **kw)
    --- <exception caught here> ---
      File "/usr/local/lib/python2.7/dist-packages/Twisted-12.2.0-py2.7-linux-i686.egg/twisted/internet/base.py", line 426, in _continueFiring
        callable(*args, **kwargs)
      File "push_pull.py", line 45, in produce
        s.push(data)
      File "/home/jjx/sources/txZMQ/txzmq/pushpull.py", line 22, in push
        self.send(message)
      File "/home/jjx/sources/txZMQ/txzmq/connection.py", line 245, in send
        self.socket.send(m, constants.NOBLOCK | constants.SNDMORE)
      File "socket.pyx", line 499, in zmq.core.socket.Socket.send (zmq/core/socket.c:5381)
    
      File "socket.pyx", line 546, in zmq.core.socket.Socket.send (zmq/core/socket.c:5143)
    
      File "socket.pyx", line 175, in zmq.core.socket._send_copy (zmq/core/socket.c:2139)
    
    zmq.core.error.ZMQError: Resource temporarily unavailable
    
    
    opened by jiangjianxiao 11
  • Support for Zeromq3.2

    Support for Zeromq3.2

    Hi, zeromq3.2 isn't quite out yet, but it will be soon! pyzmq has had experimental support for it since pyzmq-2.1.7.

    The following commits introduce the start of experimental support for zeromq-3.2. Whether pyzmq is built against zeromq-2.2 or zeromq-3.2, the txZMQ tests should pass.

    pubsub works, but the req/rep and router/dealer patterns are broken for reasons I haven't figured out yet.

    opened by ralphbean 9
  • Integrate message reading into twisted reactor loop

    Integrate message reading into twisted reactor loop

    During work with txZMQ I met the case when fast incoming messages stream with not so fast message processing functions prevents twisted reactor's loop from resuming because loop reading messages in doRead() never ends - there are always new messages arrived. This pull request transforms message reading and processing loop to cooperative task scheduled by Twisted.

    opened by daa 7
  • XREQ/XREP

    XREQ/XREP

    Hi!

    I just put together this initial XREQ/XREP ZmqConnections subclasses.

    There are also some small changes to the base ZmqConnection class, the most important is the doRead call at the end of _startWriting method. This is the only way (I've found ) to be sure we don't miss any events from the socket, as it's event-triggered and twisted might miss an event while we are writting.

    Cheers.

    opened by verterok 7
  • Incorrect (twice?) shutdown when registered for shutdown

    Incorrect (twice?) shutdown when registered for shutdown

    The factory which has been registered for shutdown ...

            self.zmqfactory = ZmqFactory()
            self.zmqfactory.registerForShutdown()
            self.receiver = MyZmqSubConnection(self,self.zmqfactory)
    

    is shutted down later

           self.zmqfactory.shutdown()
    

    As a result, the unhandled exception happens when the reactor is shutted down:

    Unhandled Error
    Traceback (most recent call last):
      File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 416, in fireEvent
        DeferredList(beforeResults).addCallback(self._continueFiring)
      File "/usr/local/lib/python2.7/dist-packages/twisted/internet/defer.py", line 307, in addCallback
        callbackKeywords=kw)
      File "/usr/local/lib/python2.7/dist-packages/twisted/internet/defer.py", line 296, in addCallbacks
        self._runCallbacks()
      File "/usr/local/lib/python2.7/dist-packages/twisted/internet/defer.py", line 578, in _runCallbacks
        current.result = callback(current.result, *args, **kw)
    --- <exception caught here> ---
      File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 429, in _continueFiring
        callable(*args, **kwargs)
      File "/usr/local/lib/python2.7/dist-packages/txzmq/factory.py", line 51, in shutdown
        for connection in self.connections.copy():
    

    It probably useful either add an 'active' flag of the factory to avoid conflict when shutting down twice (manually and automatically), or unregister from the reactor to be shutted down automatically to avoid a conflict as such.

    opened by nnseva 6
  • Is it possible to create txzmq daemon as twistd plugin?

    Is it possible to create txzmq daemon as twistd plugin?

    Hi,

    How can i create txzmq daemon as twistd plugin? I googled for some information, but without success. If it is possible to do, I will be grateful for any help, advice or information.

    opened by tmpd 6
  • highWaterMark cannot protect from memory overflow

    highWaterMark cannot protect from memory overflow

    With pyzmq, setting HWM would protect you from memory overflow issues like this:

    import zmq
    ctx = zmq.Context()
    s= zmq.socket(zmq.PUSH)
    s.setsockopt(zmq.HWM, 8)
    s.connect('tcp://localhost:33710')  # non-exist endpoint
    while True:
        s.send('XXX')  # this will block when high water mark is reached
    

    With txZMQ, ZmqConnection.send() simply append into a deque(), regardless of highWaterMark. Therefore test like above would take up a lot of memory.

    If send() could return a Deferred object, the calling code would be able to do something about it.

    A limited work around is to set a maxlen on the deque object, so it would drop data when full. Problem is if you really want to wait until the exceptional HWM state is finished (data is very important for example), this won't help.

    feature 
    opened by fantix 6
  • 'ZmqREPConnection' object has no attribute '_routingInfo'

    'ZmqREPConnection' object has no attribute '_routingInfo'

    In our project we encountered the following failure:

    
      File "/opt/neo/venv/lib/python2.7/site-packages/txzmq/req_rep.py", line 153, in __init__
        ZmqConnection.__init__(self, *args, **kwargs)
      File "/opt/neo/venv/lib/python2.7/site-packages/txzmq/connection.py", line 174, in __init__
        self.doRead()
      File "/opt/neo/venv/lib/python2.7/site-packages/txzmq/connection.py", line 281, in doRead
        log.callWithLogger(self, self.messageReceived, message)
    --- <exception caught here> ---
      File "/opt/neo/venv/lib/python2.7/site-packages/twisted/python/log.py", line 88, in callWithLogger
        return callWithContext({"system": lp}, func, *args, **kw)
      File "/opt/neo/venv/lib/python2.7/site-packages/twisted/python/log.py", line 73, in callWithContext
        return context.call({ILogContext: newCtx}, func, *args, **kw)
      File "/opt/neo/venv/lib/python2.7/site-packages/twisted/python/context.py", line 118, in callWithContext
        return self.currentContext().callWithContext(ctx, func, *args, **kw)
      File "/opt/neo/venv/lib/python2.7/site-packages/twisted/python/context.py", line 81, in callWithContext
        return func(*args,**kw)
      File "/opt/neo/venv/lib/python2.7/site-packages/txzmq/req_rep.py", line 179, in messageReceived
        self._routingInfo[msgId] = routingInfo
    exceptions.AttributeError: 'ZmqREPConnection' object has no attribute '_routingInfo'
    
    

    It seems like the doRead called during initialization of ZmqREPConnection actually gets a message and tries to handle it, before the _routingInfo attribute was initialized later in its __init__.

    opened by aryes 5
  • docs: fix simple typo, recevied -> received

    docs: fix simple typo, recevied -> received

    There is a small typo in txzmq/pubsub.py.

    Should read received rather than recevied.

    Semi-automated pull request generated by https://github.com/timgates42/meticulous/blob/master/docs/NOTE.md

    opened by timgates42 0
  • txzmq/pubsub.py

    txzmq/pubsub.py", line 26 -- exception cannot conatenate bytes

    File "/home/ar/Envs/a3_pyvcluster/lib/python3.7/site-packages/txzmq/pubsub.py", line 26, in publish |class ZmqSubConnection(ZmqConnection): self.send(tag + b'\0' + message) | """
    TypeError: can only concatenate str (not "bytes") to str

    def publish(self, message, tag=b''): """ Publish message with specified tag.

        :param message: message data
        :type message: str
        :param tag: message tag
        :type tag: str
        """
        self.send(tag + b'\0' + message) <---- It does not work in python3, I tried both tag and message being bytes and it does not work either
    
    opened by alex4747-pub 1
  • Timeout handling in REP-REQ

    Timeout handling in REP-REQ

    I'm not quite sure this is a bug or if its an intended feature, and I'd like to elaborate on this problem that I've spent some time figuring out:

    I'm using REP-REQ in a context where the REP server is unstable. The txzmq implements a timeout on the ZmqREQConnection.sendMsg() method. This timeout works perfectly in itself, but after some digging it seems the timeout feature is not a part of zmg. The problem with this is that the message is still left in zmq REQ queue, even after the caller has already gotten its Errback() called for the this message. If the server becomes available at some later time, the reply will be send back to the REQ client, but it goes nowhere in the txzmq stack.

    The second problem is that sending a message to the server is the only way of knowing if the server connection is alive, so one needs to keep sending messages (that will time out) regularly to check if the link is up. When it is up again, the server will be flooded with a stream of these "alive" messages.

    My resolution to the issue is to use a variant of the lazy pirate pattern: When sending a message with sendMsg() I install an Errback callback that closes the link, which will flush the connection.

        if not socket:
            socket = ZmqREQConnection(zmq, req_endpoint)
    
        try:
            d = socket.sendMsg(message, timeout=2)
    
            def onTimeout(fail):
                socket.shutdown()
                socket = None
                return fail
    
            d.addErrback(onTimeout)
            return d
    
        except zmq.error.Again:
            print("Handle me...")
    

    Since this is more an effect of the zmq's REP-REQ implementation than txzmq, I'm not sure this is a bug in txzmq, but I dislike that a message receives a timeout, while still being scheduled for transmission. Perhaps the above logic is required? Perhaps there is some way to cancel the message when it times out?

    opened by sveinse 1
  • Add support for multipart messages and remove custom framing for pub/sub

    Add support for multipart messages and remove custom framing for pub/sub

    I've changed the pub/sub code to remove the custom framing using a null byte and added code to send and receive multipart messages. I've ensured API-level compatibility as well as making sure old versions can read data sent by the old one (thanks to the existing compatibility code there). But of course older versions won't be able to read multipart messages sent by this one.

    opened by cncfanatics 5
Releases(1.0.1)
Cado Response Integration with Amazon GuardDuty using AWS Lambda

Cado Response Integration with Amazon GuardDuty using AWS Lambda This repository contains a simple example where: An alert is triggered by GuardDuty T

Cado Security 4 Mar 02, 2022
A repository containing a short tutorial for Docker (with Python).

Docker Tutorial for IFT 6758 Lab In this repository, we examine the advtanges of virtualization, what Docker is and how we can deploy simple programs

Arka Mukherjee 0 Dec 14, 2021
Micro Data Lake based on Docker Compose

Micro Data Lake based on Docker Compose This is the implementation of a Minimum Data Lake

Abel Coronado 15 Jan 07, 2023
Blazingly-fast :rocket:, rock-solid, local application development :arrow_right: with Kubernetes.

Gefyra Gefyra gives Kubernetes-("cloud-native")-developers a completely new way of writing and testing their applications. Over are the times of custo

Michael Schilonka 352 Dec 26, 2022
NixOps is a tool for deploying to NixOS machines in a network or cloud.

NixOps NixOps is a tool for deploying to NixOS machines in a network or the cloud. Key features include: Declarative: NixOps determines and carries ou

Nix/Nixpkgs/NixOS 1.2k Jan 02, 2023
A Habitica Integration with Github Workflows.

Habitica-Workflow A Habitica Integration with Github Workflows. How To Use? Fork (and Star) this repository. Set environment variable in Settings - S

Priate 2 Dec 20, 2021
framework providing automatic constructions of vulnerable infrastructures

中文 | English 1 Introduction Metarget = meta- + target, a framework providing automatic constructions of vulnerable infrastructures, used to deploy sim

rambolized 685 Dec 28, 2022
Build Netbox as a Docker container

netbox-docker The Github repository houses the components needed to build Netbox as a Docker container. Images are built using this code and are relea

Farshad Nick 1 Dec 18, 2021
Checkmk kube agent - Checkmk Kubernetes Cluster and Node Collectors

Checkmk Kubernetes Cluster and Node Collectors Checkmk cluster and node collecto

tribe29 GmbH 15 Dec 26, 2022
Travis CI testing a Dockerfile based on Palantir's remix of Apache Cassandra, testing IaC, and testing integration health of Debian

Testing Palantir's remix of Apache Cassandra with Snyk & Travis CI This repository is to show Travis CI testing a Dockerfile based on Palantir's remix

Montana Mendy 1 Dec 20, 2021
Big data on k8s

# microsoft azure # https://docs.microsoft.com/en-us/cli/azure/install-azure-cli az account set --subscription [] az aks get-credentials --resource-g

Luan Moreno 22 Dec 24, 2022
Ingress patch example by Kustomize

Ingress patch example by Kustomize

Jinu 10 Nov 14, 2022
Azure plugins for Feast (FEAture STore)

Feast on Azure This project provides resources to enable running a feast feature store on Azure. Feast Azure Provider The Feast Azure provider acts li

Microsoft Azure 70 Dec 31, 2022
Emissary - open source Kubernetes-native API gateway for microservices built on the Envoy Proxy

Emissary-ingress Emissary-Ingress is an open-source Kubernetes-native API Gateway + Layer 7 load balancer + Kubernetes Ingress built on Envoy Proxy. E

Emissary Ingress 4k Dec 31, 2022
Bugbane - Application security tools for CI/CD pipeline

BugBane Набор утилит для аудита безопасности приложений. Основные принципы и осо

GardaTech 20 Dec 09, 2022
This project shows how to serve an TF based image classification model as a web service with TFServing, Docker, and Kubernetes(GKE).

Deploying ML models with CPU based TFServing, Docker, and Kubernetes By: Chansung Park and Sayak Paul This project shows how to serve a TensorFlow ima

Chansung Park 104 Dec 28, 2022
Play Wordle from any Kubernetes cluster.

wordle-operator 🟩 ⬛ 🟩 🟨 ⬛ Play Wordle from any Kubernetes cluster. Using the power of CustomResourceDefinitions and Kubernetes Operators, now you c

Lucas Melin 1 Jan 15, 2022
Rancher Kubernetes API compatible with RKE, RKE2 and maybe others?

kctl Rancher Kubernetes API compatible with RKE, RKE2 and maybe others? Documentation is WIP. Quickstart pip install --upgrade kctl Usage from lazycls

1 Dec 02, 2021
Hatch plugin for Docker containers

hatch-containers CI/CD Package Meta This provides a plugin for Hatch that allows

Ofek Lev 11 Dec 30, 2022
This repository contains code examples and documentation for learning how applications can be developed with Kubernetes

BigBitBus KAT Components Click on the diagram to enlarge, or follow this link for detailed documentation Introduction Welcome to the BigBitBus Kuberne

51 Oct 16, 2022