A library from RCTI+ to handle RabbitMQ tasks (connect, send, receive, etc) in Python.

Overview

Introduction

A library from RCTI+ to handle RabbitMQ tasks (connect, send, receive, etc) in Python.

Requirements

  • Python >=3.7.3
  • Pika ==1.2.0
  • Aio-pika ==6.8.0
  • Requests >=2.25.1

Installation

pip install rctiplus-rabbitmq-python-sdk

Getting latest version

pip install rctiplus-rabbitmq-python-sdk --upgrade

Usage

To start using this SDK, you may follow given instructions bellow in order.

Payload handler

First, you need to create a payload class handler that implement MessagePayload. For example, we want to make a class to handle JSON payload:

None: self.firstname = firstname self.lastname = lastname def __str__(self) -> str: """Convert JSON to string payload message Returns: str: String payload message """ return json.dumps({ 'firstname': self.firstname, 'lastname': self.lastname }) @classmethod def from_str(cls, message: str) -> 'JSONPayload': """Generate data from JSON string payload message Returns: JSONPayload: Generated data """ payload = json.loads(message) return cls(firstname=payload['firstname'], lastname=payload['lastname'])">
import json
from rctiplus_rabbitmq_python_sdk import MessagePayload

class JSONPayload(MessagePayload):
    """Example class to handle JSON payload
    """

    def __init__(self, firstname: str, lastname: str) -> None:
        self.firstname = firstname
        self.lastname = lastname

    def __str__(self) -> str:
        """Convert JSON to string payload message

        Returns:
            str: String payload message
        """
        return json.dumps({
            'firstname': self.firstname,
            'lastname': self.lastname
        })
    
    @classmethod
    def from_str(cls, message: str) -> 'JSONPayload':
        """Generate data from JSON string payload message

        Returns:
            JSONPayload: Generated data
        """
        payload = json.loads(message)
        return cls(firstname=payload['firstname'], lastname=payload['lastname'])

MessagePayload class from the SDK's core has this functions that require to implemented:

'MessagePayload': """Generate data from specified string payload message format Raises: NotImplementedError: Raise an error if not implemented """ raise NotImplementedError() def __str__(self) -> str: """Convert specified data format to string payload message Raises: NotImplementedError: Raise an error if not implemented Returns: str: String payload message """ raise NotImplementedError()">
class MessagePayload:
    """Python RabbitMQ message payload
    """
    
    @classmethod
    def from_str(cls, message: str) -> 'MessagePayload':
        """Generate data from specified string payload message format

        Raises:
            NotImplementedError: Raise an error if not implemented
        """
        raise NotImplementedError()

    def __str__(self) -> str:
        """Convert specified data format to string payload message

        Raises:
            NotImplementedError: Raise an error if not implemented

        Returns:
            str: String payload message
        """
        raise NotImplementedError()

Connect to RabbitMQ

Making connection to RabbitMQ server can be done by doing this simple way:

from rctiplus_rabbitmq_python_sdk import RabbitMQ

conn = RabbitMQ()
conn.connect(host='localhost', port=5672, username='guest', password='guest')

Sending message

After you have payload class handler & connected to the RabbitMQ server, now you can try to send a messsage to queue channel. For example, we will send JSON payload message to test queue:

payload = JSONPayload('John', 'Doe')
print('payload:', payload)
conn.send('test', payload)

Receiving message

Great. Now, in our consumer app, we want to listen & receive that message, and then doing some stuff:

def callback(ch, method, properties, body):
    print("[x] Received %r" % body)
    data = JSONPayload.from_str(body)
    print(f'data: firstname={data.firstname}, lastname={data.lastname}')

conn.receive('test', callback)

For callback function, according to Pikas standart library, you need to pass 4 arguments ch, method, properties and body to catch all needed values from incomming message.

Putting it all together

Here is the complete example from the code above:

Complete example of sender or producer app:

None: self.firstname = firstname self.lastname = lastname def __str__(self) -> str: """Convert JSON to string payload message Returns: str: String payload message """ return json.dumps({ 'firstname': self.firstname, 'lastname': self.lastname }) @classmethod def from_str(cls, message: str) -> 'JSONPayload': """Generate data from JSON string payload message Returns: JSONPayload: Generated data """ payload = json.loads(message) return cls(firstname=payload['firstname'], lastname=payload['lastname']) # Connect to RabbitMQ conn = RabbitMQ() conn.connect(host='localhost', port=5672, username='guest', password='guest') # Send payload to queue payload = JSONPayload('John', 'Doe') print('payload:', payload) conn.send('test', payload)">
import json
from rctiplus_rabbitmq_python_sdk import RabbitMQ, MessagePayload


# Create payload class handler that implement `MessagePayload`
class JSONPayload(MessagePayload):
    """Example class to handle JSON payload
    """

    def __init__(self, firstname: str, lastname: str) -> None:
        self.firstname = firstname
        self.lastname = lastname

    def __str__(self) -> str:
        """Convert JSON to string payload message

        Returns:
            str: String payload message
        """
        return json.dumps({
            'firstname': self.firstname,
            'lastname': self.lastname
        })
    
    @classmethod
    def from_str(cls, message: str) -> 'JSONPayload':
        """Generate data from JSON string payload message

        Returns:
            JSONPayload: Generated data
        """
        payload = json.loads(message)
        return cls(firstname=payload['firstname'], lastname=payload['lastname'])


# Connect to RabbitMQ
conn = RabbitMQ()
conn.connect(host='localhost', port=5672, username='guest', password='guest')

# Send payload to queue
payload = JSONPayload('John', 'Doe')
print('payload:', payload)
conn.send('test', payload)

Complete example of consumer or receiver app:

None: self.firstname = firstname self.lastname = lastname def __str__(self) -> str: """Convert JSON to string payload message Returns: str: String payload message """ return json.dumps({ 'firstname': self.firstname, 'lastname': self.lastname }) @classmethod def from_str(cls, message: str) -> 'JSONPayload': """Generate data from JSON string payload message Returns: JSONPayload: Generated data """ payload = json.loads(message) return cls(firstname=payload['firstname'], lastname=payload['lastname']) # Connect to RabbitMQ conn = RabbitMQ() conn.connect(host='localhost', port=5672, username='guest', password='guest') # Create a callback to be executed immadiately after recieved a message def callback(ch, method, properties, body): print("[x] Received %r" % body) # Generate data from string payload message data = JSONPayload.from_str(body) print(f'data: firstname={data.firstname}, lastname={data.lastname}') # Receive & listen messages from queue channel conn.receive('test', callback)">
import json
from rctiplus_rabbitmq_python_sdk import RabbitMQ, MessagePayload


# Create payload class handler that implement `MessagePayload`
class JSONPayload(MessagePayload):
    """Example class to handle JSON payload
    """

    def __init__(self, firstname: str, lastname: str) -> None:
        self.firstname = firstname
        self.lastname = lastname

    def __str__(self) -> str:
        """Convert JSON to string payload message

        Returns:
            str: String payload message
        """
        return json.dumps({
            'firstname': self.firstname,
            'lastname': self.lastname
        })

    @classmethod
    def from_str(cls, message: str) -> 'JSONPayload':
        """Generate data from JSON string payload message

        Returns:
            JSONPayload: Generated data
        """
        payload = json.loads(message)
        return cls(firstname=payload['firstname'], lastname=payload['lastname'])


# Connect to RabbitMQ
conn = RabbitMQ()
conn.connect(host='localhost', port=5672, username='guest', password='guest')

# Create a callback to be executed immadiately after recieved a message
def callback(ch, method, properties, body):
    print("[x] Received %r" % body)
    
    # Generate data from string payload message
    data = JSONPayload.from_str(body)
    print(f'data: firstname={data.firstname}, lastname={data.lastname}')

# Receive & listen messages from queue channel
conn.receive('test', callback)

Asynchronous

This SDK also support asynchronous process. To use this feature, use AIORabbitMQ instead of RabbitMQ. All methods provided in AIORabbitMQ are treated as async function. So, when you calling the methods, you need to await them.

Async connect to RabbitMQ

from rctiplus_rabbitmq_python_sdk import AIORabbitMQ

conn = AIORabbitMQ(loop)
await conn.connect(host='localhost', port=5672, username='guest', password='guest')

loop is an asynchronous event loop, example: asyncio.get_event_loop()

Async sending message

payload = JSONPayload('John', 'Doe')
print('payload:', payload)
await conn.send('test', payload)

Async receiving message

async def callback(message):
    body = message.body
    print("[x] Received %r" % body)
    data = JSONPayload.from_str(body)
    print(f'data: firstname={data.firstname}, lastname={data.lastname}')

await conn.receive('test', callback)

In asynchronous process, you just need pass 1 argument on callback function. This argument is a representation of aio_pika.IncomingMessage to catch all needed values from incomming message.

Complete example of asynchronous process

Here is the complete example of asynchronous process above:

Complete example of asynchronous sender or producer app:

None: self.firstname = firstname self.lastname = lastname def __str__(self) -> str: """Convert JSON to string payload message Returns: str: String payload message """ return json.dumps({ 'firstname': self.firstname, 'lastname': self.lastname }) @classmethod def from_str(cls, message: str) -> 'JSONPayload': """Generate data from JSON string payload message Returns: JSONPayload: Generated data """ payload = json.loads(message) return cls(firstname=payload['firstname'], lastname=payload['lastname']) # Main function async def main(loop): # Connect to RabbitMQ conn = AIORabbitMQ(loop) await conn.connect(host='localhost', port=5672, username='guest', password='guest') async with conn.connection: # Send payload to queue payload = JSONPayload('John', 'Doe') print('payload:', payload) await conn.send('test', payload) # Event loop loop = asyncio.get_event_loop() loop.run_until_complete(main(loop)) loop.close()">
import json
import asyncio
from rctiplus_rabbitmq_python_sdk import AIORabbitMQ, MessagePayload


# Create payload class handler that implement `MessagePayload`
class JSONPayload(MessagePayload):
    """Example class to handle JSON payload
    """

    def __init__(self, firstname: str, lastname: str) -> None:
        self.firstname = firstname
        self.lastname = lastname

    def __str__(self) -> str:
        """Convert JSON to string payload message

        Returns:
            str: String payload message
        """
        return json.dumps({
            'firstname': self.firstname,
            'lastname': self.lastname
        })
    
    @classmethod
    def from_str(cls, message: str) -> 'JSONPayload':
        """Generate data from JSON string payload message

        Returns:
            JSONPayload: Generated data
        """
        payload = json.loads(message)
        return cls(firstname=payload['firstname'], lastname=payload['lastname'])


# Main function
async def main(loop):

    # Connect to RabbitMQ
    conn = AIORabbitMQ(loop)
    await conn.connect(host='localhost', port=5672, username='guest', password='guest')
    
    async with conn.connection:
        # Send payload to queue
        payload = JSONPayload('John', 'Doe')
        print('payload:', payload)
        await conn.send('test', payload)


# Event loop
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()

Complete example of asynchronous consumer or receiver app:

None: self.firstname = firstname self.lastname = lastname def __str__(self) -> str: """Convert JSON to string payload message Returns: str: String payload message """ return json.dumps({ 'firstname': self.firstname, 'lastname': self.lastname }) @classmethod def from_str(cls, message: str) -> 'JSONPayload': """Generate data from JSON string payload message Returns: JSONPayload: Generated data """ payload = json.loads(message) return cls(firstname=payload['firstname'], lastname=payload['lastname']) # Main function async def main(loop): # Connect to RabbitMQ conn = AIORabbitMQ(loop) await conn.connect(host='localhost', port=5672, username='guest', password='guest') # Create a callback to be executed immadiately after recieved a message async def callback(message): body = message.body print("[x] Received %r" % body) # Generate data from string payload message data = JSONPayload.from_str(body) print(f'data: firstname={data.firstname}, lastname={data.lastname}') # Receive & listen messages from queue channel await conn.receive('test', callback) return conn # Event loop loop = asyncio.get_event_loop() connection = loop.run_until_complete(main(loop)) try: loop.run_forever() finally: loop.run_until_complete(connection.disconnect())">
import json
import asyncio
from rctiplus_rabbitmq_python_sdk import AIORabbitMQ, MessagePayload


# Create payload class handler that implement `MessagePayload`
class JSONPayload(MessagePayload):
    """Example class to handle JSON payload
    """

    def __init__(self, firstname: str, lastname: str) -> None:
        self.firstname = firstname
        self.lastname = lastname

    def __str__(self) -> str:
        """Convert JSON to string payload message

        Returns:
            str: String payload message
        """
        return json.dumps({
            'firstname': self.firstname,
            'lastname': self.lastname
        })
    
    @classmethod
    def from_str(cls, message: str) -> 'JSONPayload':
        """Generate data from JSON string payload message

        Returns:
            JSONPayload: Generated data
        """
        payload = json.loads(message)
        return cls(firstname=payload['firstname'], lastname=payload['lastname'])


# Main function
async def main(loop):

    # Connect to RabbitMQ
    conn = AIORabbitMQ(loop)
    await conn.connect(host='localhost', port=5672, username='guest', password='guest')
    
    # Create a callback to be executed immadiately after recieved a message
    async def callback(message):
        body = message.body
        print("[x] Received %r" % body)
        
        # Generate data from string payload message
        data = JSONPayload.from_str(body)
        print(f'data: firstname={data.firstname}, lastname={data.lastname}')

    # Receive & listen messages from queue channel
    await conn.receive('test', callback)

    return conn


# Event loop
loop = asyncio.get_event_loop()
connection = loop.run_until_complete(main(loop))
try:
    loop.run_forever()
finally:
    loop.run_until_complete(connection.disconnect())

License

GNU General Public License v3

Owner
Dali Kewara
An unexpected journey and gonna make it simple but Spectacular!
Dali Kewara
Abstraction of a Unit, includes convertions and basic operations.

Units Abstraction of a Unit, includes convertions and basic operations. ------ EXAMPLE : Free Fall (No air resistance) ------- from units_test import

1 Dec 23, 2021
A collection of resources/tools and analyses for the angr binary analysis framework.

Awesome angr A collection of resources/tools and analyses for the angr binary analysis framework. This page does not only collect links and external r

105 Jan 02, 2023
[P]ython [w]rited [B]inary [C]onverter

pwbinaryc [P]ython [w]rited [Binary] [C]onverter You have rights to: Modify the code and use it private (friends are allowed too) Make a page and redi

0 Jun 21, 2022
Script for generating Hearthstone card spoilers & checklists

This is a script for generating text spoilers and set checklists for Hearthstone. Installation & Running Python 3.6 or higher is required. Copy/clone

John T. Wodder II 1 Oct 11, 2022
Adding two matrix from scratch using python.

Adding-two-matrix-from-scratch-using-python. Here, I have take two matrix from user and add it without using any library. I made this program from scr

Sachin Vinayak Dabhade 4 Sep 24, 2021
Local backup made easy, with Python and shutil

KTBackup BETA Local backup made easy, with Python and shutil Features One-command backup and restore Minimalistic (only using stdlib) Convenient direc

kelptaken 1 Dec 27, 2021
A simple tool to move and rename Nvidia Share recordings to a more sensible format.

A simple tool to move and rename Nvidia Share recordings to a more sensible format.

Jasper Rebane 8 Dec 23, 2022
Modest utility collection for development with AIOHTTP framework.

aiohttp-things Modest utility collection for development with AIOHTTP framework. Documentation https://aiohttp-things.readthedocs.io Installation Inst

Ruslan Ilyasovich Gilfanov 0 Dec 11, 2022
A quick username checker to see if a username is available on a list of assorted websites.

A quick username checker to see if a username is available on a list of assorted websites.

Maddie 4 Jan 04, 2022
Grank is a feature-rich script that automatically grinds Dank Memer for you

Grank Inspired by this repository. This is a WIP and there will be more functions added in the future. What is Grank? Grank is a feature-rich script t

42 Jul 20, 2022
Minimal Windows system information tool written in Python

wfetch wfetch is a Minimal Windows system information tool written in Python (Only works on Windows) Installation First of all have python installed.

zJairO 3 Jan 24, 2022
Report Bobcat Status to Google Sheets

bobcat-status-reporter Report Bobcat Status to Google Sheets Why? I recently relocated my miner from my root into the attic. Bobcat recommends operati

Jasmit Tarang 3 Sep 22, 2021
It is a tool that looks for a specific username in social networks

It is a tool that looks for a specific username in social networks

MasterBurnt 6 Oct 07, 2022
DUQ is a python package for working with physical Dimensions, Units, and Quantities.

DUQ is a python package for working with physical Dimensions, Units, and Quantities.

2 Nov 02, 2022
A meme error handler for python

Pwython OwO what's this? Pwython is project aiming to fill in one of the biggest problems with python, which is that it is slow lacks owoified text. N

SystematicError 23 Jan 15, 2022
A string extractor module for python

A string extractor module for python

Fayas Noushad 4 Jul 19, 2022
Retrying library for Python

Tenacity Tenacity is an Apache 2.0 licensed general-purpose retrying library, written in Python, to simplify the task of adding retry behavior to just

Julien Danjou 4.3k Jan 05, 2023
API for obtaining results from the Beery-Bukenica test of the visomotor integration development (VMI) 4th edition.

VMI API API for obtaining results from the Beery-Bukenica test of the visomotor integration development (VMI) 4th edition. Install docker-compose up -

Victor Vargas Sandoval 1 Oct 26, 2021
✨ Un générateur de mot de passe aléatoire totalement fait en Python par moi, et en français.

Password Generator ❗ Un générateur de mot de passe aléatoire totalement fait en Python par moi, et en français. 🔮 Grâce a une au module random et str

MrGabin 3 Jul 29, 2021
convert a dict-list object from / to a typed object(class instance with type annotation)

objtyping 带类型定义的对象转换器 由来 Python不是强类型语言,开发人员没有给数据定义类型的习惯。这样虽然灵活,但处理复杂业务逻辑的时候却不够方便——缺乏类型检查可能导致很难发现错误,在IDE里编码时也没

Song Hui 15 Dec 22, 2022