A library from RCTI+ to handle RabbitMQ tasks (connect, send, receive, etc) in Python.

Overview

Introduction

A library from RCTI+ to handle RabbitMQ tasks (connect, send, receive, etc) in Python.

Requirements

  • Python >=3.7.3
  • Pika ==1.2.0
  • Aio-pika ==6.8.0
  • Requests >=2.25.1

Installation

pip install rctiplus-rabbitmq-python-sdk

Getting latest version

pip install rctiplus-rabbitmq-python-sdk --upgrade

Usage

To start using this SDK, you may follow given instructions bellow in order.

Payload handler

First, you need to create a payload class handler that implement MessagePayload. For example, we want to make a class to handle JSON payload:

None: self.firstname = firstname self.lastname = lastname def __str__(self) -> str: """Convert JSON to string payload message Returns: str: String payload message """ return json.dumps({ 'firstname': self.firstname, 'lastname': self.lastname }) @classmethod def from_str(cls, message: str) -> 'JSONPayload': """Generate data from JSON string payload message Returns: JSONPayload: Generated data """ payload = json.loads(message) return cls(firstname=payload['firstname'], lastname=payload['lastname'])">
import json
from rctiplus_rabbitmq_python_sdk import MessagePayload

class JSONPayload(MessagePayload):
    """Example class to handle JSON payload
    """

    def __init__(self, firstname: str, lastname: str) -> None:
        self.firstname = firstname
        self.lastname = lastname

    def __str__(self) -> str:
        """Convert JSON to string payload message

        Returns:
            str: String payload message
        """
        return json.dumps({
            'firstname': self.firstname,
            'lastname': self.lastname
        })
    
    @classmethod
    def from_str(cls, message: str) -> 'JSONPayload':
        """Generate data from JSON string payload message

        Returns:
            JSONPayload: Generated data
        """
        payload = json.loads(message)
        return cls(firstname=payload['firstname'], lastname=payload['lastname'])

MessagePayload class from the SDK's core has this functions that require to implemented:

'MessagePayload': """Generate data from specified string payload message format Raises: NotImplementedError: Raise an error if not implemented """ raise NotImplementedError() def __str__(self) -> str: """Convert specified data format to string payload message Raises: NotImplementedError: Raise an error if not implemented Returns: str: String payload message """ raise NotImplementedError()">
class MessagePayload:
    """Python RabbitMQ message payload
    """
    
    @classmethod
    def from_str(cls, message: str) -> 'MessagePayload':
        """Generate data from specified string payload message format

        Raises:
            NotImplementedError: Raise an error if not implemented
        """
        raise NotImplementedError()

    def __str__(self) -> str:
        """Convert specified data format to string payload message

        Raises:
            NotImplementedError: Raise an error if not implemented

        Returns:
            str: String payload message
        """
        raise NotImplementedError()

Connect to RabbitMQ

Making connection to RabbitMQ server can be done by doing this simple way:

from rctiplus_rabbitmq_python_sdk import RabbitMQ

conn = RabbitMQ()
conn.connect(host='localhost', port=5672, username='guest', password='guest')

Sending message

After you have payload class handler & connected to the RabbitMQ server, now you can try to send a messsage to queue channel. For example, we will send JSON payload message to test queue:

payload = JSONPayload('John', 'Doe')
print('payload:', payload)
conn.send('test', payload)

Receiving message

Great. Now, in our consumer app, we want to listen & receive that message, and then doing some stuff:

def callback(ch, method, properties, body):
    print("[x] Received %r" % body)
    data = JSONPayload.from_str(body)
    print(f'data: firstname={data.firstname}, lastname={data.lastname}')

conn.receive('test', callback)

For callback function, according to Pikas standart library, you need to pass 4 arguments ch, method, properties and body to catch all needed values from incomming message.

Putting it all together

Here is the complete example from the code above:

Complete example of sender or producer app:

None: self.firstname = firstname self.lastname = lastname def __str__(self) -> str: """Convert JSON to string payload message Returns: str: String payload message """ return json.dumps({ 'firstname': self.firstname, 'lastname': self.lastname }) @classmethod def from_str(cls, message: str) -> 'JSONPayload': """Generate data from JSON string payload message Returns: JSONPayload: Generated data """ payload = json.loads(message) return cls(firstname=payload['firstname'], lastname=payload['lastname']) # Connect to RabbitMQ conn = RabbitMQ() conn.connect(host='localhost', port=5672, username='guest', password='guest') # Send payload to queue payload = JSONPayload('John', 'Doe') print('payload:', payload) conn.send('test', payload)">
import json
from rctiplus_rabbitmq_python_sdk import RabbitMQ, MessagePayload


# Create payload class handler that implement `MessagePayload`
class JSONPayload(MessagePayload):
    """Example class to handle JSON payload
    """

    def __init__(self, firstname: str, lastname: str) -> None:
        self.firstname = firstname
        self.lastname = lastname

    def __str__(self) -> str:
        """Convert JSON to string payload message

        Returns:
            str: String payload message
        """
        return json.dumps({
            'firstname': self.firstname,
            'lastname': self.lastname
        })
    
    @classmethod
    def from_str(cls, message: str) -> 'JSONPayload':
        """Generate data from JSON string payload message

        Returns:
            JSONPayload: Generated data
        """
        payload = json.loads(message)
        return cls(firstname=payload['firstname'], lastname=payload['lastname'])


# Connect to RabbitMQ
conn = RabbitMQ()
conn.connect(host='localhost', port=5672, username='guest', password='guest')

# Send payload to queue
payload = JSONPayload('John', 'Doe')
print('payload:', payload)
conn.send('test', payload)

Complete example of consumer or receiver app:

None: self.firstname = firstname self.lastname = lastname def __str__(self) -> str: """Convert JSON to string payload message Returns: str: String payload message """ return json.dumps({ 'firstname': self.firstname, 'lastname': self.lastname }) @classmethod def from_str(cls, message: str) -> 'JSONPayload': """Generate data from JSON string payload message Returns: JSONPayload: Generated data """ payload = json.loads(message) return cls(firstname=payload['firstname'], lastname=payload['lastname']) # Connect to RabbitMQ conn = RabbitMQ() conn.connect(host='localhost', port=5672, username='guest', password='guest') # Create a callback to be executed immadiately after recieved a message def callback(ch, method, properties, body): print("[x] Received %r" % body) # Generate data from string payload message data = JSONPayload.from_str(body) print(f'data: firstname={data.firstname}, lastname={data.lastname}') # Receive & listen messages from queue channel conn.receive('test', callback)">
import json
from rctiplus_rabbitmq_python_sdk import RabbitMQ, MessagePayload


# Create payload class handler that implement `MessagePayload`
class JSONPayload(MessagePayload):
    """Example class to handle JSON payload
    """

    def __init__(self, firstname: str, lastname: str) -> None:
        self.firstname = firstname
        self.lastname = lastname

    def __str__(self) -> str:
        """Convert JSON to string payload message

        Returns:
            str: String payload message
        """
        return json.dumps({
            'firstname': self.firstname,
            'lastname': self.lastname
        })

    @classmethod
    def from_str(cls, message: str) -> 'JSONPayload':
        """Generate data from JSON string payload message

        Returns:
            JSONPayload: Generated data
        """
        payload = json.loads(message)
        return cls(firstname=payload['firstname'], lastname=payload['lastname'])


# Connect to RabbitMQ
conn = RabbitMQ()
conn.connect(host='localhost', port=5672, username='guest', password='guest')

# Create a callback to be executed immadiately after recieved a message
def callback(ch, method, properties, body):
    print("[x] Received %r" % body)
    
    # Generate data from string payload message
    data = JSONPayload.from_str(body)
    print(f'data: firstname={data.firstname}, lastname={data.lastname}')

# Receive & listen messages from queue channel
conn.receive('test', callback)

Asynchronous

This SDK also support asynchronous process. To use this feature, use AIORabbitMQ instead of RabbitMQ. All methods provided in AIORabbitMQ are treated as async function. So, when you calling the methods, you need to await them.

Async connect to RabbitMQ

from rctiplus_rabbitmq_python_sdk import AIORabbitMQ

conn = AIORabbitMQ(loop)
await conn.connect(host='localhost', port=5672, username='guest', password='guest')

loop is an asynchronous event loop, example: asyncio.get_event_loop()

Async sending message

payload = JSONPayload('John', 'Doe')
print('payload:', payload)
await conn.send('test', payload)

Async receiving message

async def callback(message):
    body = message.body
    print("[x] Received %r" % body)
    data = JSONPayload.from_str(body)
    print(f'data: firstname={data.firstname}, lastname={data.lastname}')

await conn.receive('test', callback)

In asynchronous process, you just need pass 1 argument on callback function. This argument is a representation of aio_pika.IncomingMessage to catch all needed values from incomming message.

Complete example of asynchronous process

Here is the complete example of asynchronous process above:

Complete example of asynchronous sender or producer app:

None: self.firstname = firstname self.lastname = lastname def __str__(self) -> str: """Convert JSON to string payload message Returns: str: String payload message """ return json.dumps({ 'firstname': self.firstname, 'lastname': self.lastname }) @classmethod def from_str(cls, message: str) -> 'JSONPayload': """Generate data from JSON string payload message Returns: JSONPayload: Generated data """ payload = json.loads(message) return cls(firstname=payload['firstname'], lastname=payload['lastname']) # Main function async def main(loop): # Connect to RabbitMQ conn = AIORabbitMQ(loop) await conn.connect(host='localhost', port=5672, username='guest', password='guest') async with conn.connection: # Send payload to queue payload = JSONPayload('John', 'Doe') print('payload:', payload) await conn.send('test', payload) # Event loop loop = asyncio.get_event_loop() loop.run_until_complete(main(loop)) loop.close()">
import json
import asyncio
from rctiplus_rabbitmq_python_sdk import AIORabbitMQ, MessagePayload


# Create payload class handler that implement `MessagePayload`
class JSONPayload(MessagePayload):
    """Example class to handle JSON payload
    """

    def __init__(self, firstname: str, lastname: str) -> None:
        self.firstname = firstname
        self.lastname = lastname

    def __str__(self) -> str:
        """Convert JSON to string payload message

        Returns:
            str: String payload message
        """
        return json.dumps({
            'firstname': self.firstname,
            'lastname': self.lastname
        })
    
    @classmethod
    def from_str(cls, message: str) -> 'JSONPayload':
        """Generate data from JSON string payload message

        Returns:
            JSONPayload: Generated data
        """
        payload = json.loads(message)
        return cls(firstname=payload['firstname'], lastname=payload['lastname'])


# Main function
async def main(loop):

    # Connect to RabbitMQ
    conn = AIORabbitMQ(loop)
    await conn.connect(host='localhost', port=5672, username='guest', password='guest')
    
    async with conn.connection:
        # Send payload to queue
        payload = JSONPayload('John', 'Doe')
        print('payload:', payload)
        await conn.send('test', payload)


# Event loop
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()

Complete example of asynchronous consumer or receiver app:

None: self.firstname = firstname self.lastname = lastname def __str__(self) -> str: """Convert JSON to string payload message Returns: str: String payload message """ return json.dumps({ 'firstname': self.firstname, 'lastname': self.lastname }) @classmethod def from_str(cls, message: str) -> 'JSONPayload': """Generate data from JSON string payload message Returns: JSONPayload: Generated data """ payload = json.loads(message) return cls(firstname=payload['firstname'], lastname=payload['lastname']) # Main function async def main(loop): # Connect to RabbitMQ conn = AIORabbitMQ(loop) await conn.connect(host='localhost', port=5672, username='guest', password='guest') # Create a callback to be executed immadiately after recieved a message async def callback(message): body = message.body print("[x] Received %r" % body) # Generate data from string payload message data = JSONPayload.from_str(body) print(f'data: firstname={data.firstname}, lastname={data.lastname}') # Receive & listen messages from queue channel await conn.receive('test', callback) return conn # Event loop loop = asyncio.get_event_loop() connection = loop.run_until_complete(main(loop)) try: loop.run_forever() finally: loop.run_until_complete(connection.disconnect())">
import json
import asyncio
from rctiplus_rabbitmq_python_sdk import AIORabbitMQ, MessagePayload


# Create payload class handler that implement `MessagePayload`
class JSONPayload(MessagePayload):
    """Example class to handle JSON payload
    """

    def __init__(self, firstname: str, lastname: str) -> None:
        self.firstname = firstname
        self.lastname = lastname

    def __str__(self) -> str:
        """Convert JSON to string payload message

        Returns:
            str: String payload message
        """
        return json.dumps({
            'firstname': self.firstname,
            'lastname': self.lastname
        })
    
    @classmethod
    def from_str(cls, message: str) -> 'JSONPayload':
        """Generate data from JSON string payload message

        Returns:
            JSONPayload: Generated data
        """
        payload = json.loads(message)
        return cls(firstname=payload['firstname'], lastname=payload['lastname'])


# Main function
async def main(loop):

    # Connect to RabbitMQ
    conn = AIORabbitMQ(loop)
    await conn.connect(host='localhost', port=5672, username='guest', password='guest')
    
    # Create a callback to be executed immadiately after recieved a message
    async def callback(message):
        body = message.body
        print("[x] Received %r" % body)
        
        # Generate data from string payload message
        data = JSONPayload.from_str(body)
        print(f'data: firstname={data.firstname}, lastname={data.lastname}')

    # Receive & listen messages from queue channel
    await conn.receive('test', callback)

    return conn


# Event loop
loop = asyncio.get_event_loop()
connection = loop.run_until_complete(main(loop))
try:
    loop.run_forever()
finally:
    loop.run_until_complete(connection.disconnect())

License

GNU General Public License v3

Owner
Dali Kewara
An unexpected journey and gonna make it simple but Spectacular!
Dali Kewara
Skywater 130nm Klayout Device Generators PDK

Skywaters 130nm Technology for KLayout Device Generators Mabrains is excited to share with you our Device Generator Library for Skywater 130nm PDK. It

Mabrains 18 Dec 14, 2022
Python program to do with percentages and chances, random generation.

Chances and Percentages Python program to do with percentages and chances, random generation. What is this? This small program will generate a list wi

n0 3 Jul 15, 2021
Quickly edit your slack posts.

Lightning Edit Quickly edit your Slack posts. Heavily inspired by @KhushrajRathod's LightningDelete. Usage: Note: Before anything, be sure to head ove

14 Nov 19, 2021
Adding two matrix from scratch using python.

Adding-two-matrix-from-scratch-using-python. Here, I have take two matrix from user and add it without using any library. I made this program from scr

Sachin Vinayak Dabhade 4 Sep 24, 2021
Teleport Ur Logs with Love

Whatever you pipe into tull, will get a unique UUID and the data gets stored locally - accessible via a flask server with simple endpoints. You can use ngrok or localtunnel then to share it outside L

Lokendra Sharma 11 Jul 30, 2021
A simple, console based nHentai Code Generator

nHentai Code Generator A simple, console based nHentai Code Generator. How to run? Windows Android Windows Make sure you have python and git installed

5 Jun 02, 2022
Entropy-controlled contexts in Python

Python module ordered ordered module is the opposite to random - it maintains order in the program. import random x = 5 def increase(): global x

HyperC 36 Nov 03, 2022
An awesome tool to save articles from RSS feed to Pocket automatically.

RSS2Pocket An awesome tool to save articles from RSS feed to Pocket automatically. About the Project I used to use IFTTT to save articles from RSS fee

Hank Liao 10 Nov 12, 2022
Fraud Multiplication Table Detection in python

Fraud-Multiplication-Table-Detection-in-python In this program, I have detected fraud multiplication table using python without class. Here, I have co

Sachin Vinayak Dabhade 4 Sep 24, 2021
Check username

Checker-Oukee Check username It checks the available usernames and creates a new account for them Doesn't need proxies Create a file with usernames an

4 Jun 05, 2022
A python script to generate wallpaper

wallpaper eits Warning You need to set the path to Robot Mono font in the source code. (Settings are in the main function) Usage A script that given a

Henrique Tsuyoshi Yara 5 Dec 02, 2021
Napari plugin for loading Bitplane Imaris files .ims

napari-imaris-loader Napari plugin for loading Bitplane Imaris files '.ims'. Notes: For this plugin to work "File/Preferences/Experimental/Render Imag

Alan Watson 4 Dec 01, 2022
A Program that generates and checks Stripe keys 24x7.

A Program that generates and checks Stripe keys 24x7. This was made only for Educational Purposes, I'm not responsible for the damages cause by you

iNaveen 18 Dec 17, 2022
A random cats photos python module

A random cats photos python module

Fayas Noushad 6 Dec 01, 2021
A functional standard library for Python.

Toolz A set of utility functions for iterators, functions, and dictionaries. See the PyToolz documentation at https://toolz.readthedocs.io LICENSE New

4.1k Dec 30, 2022
Check the basic quality of any dataset

Data Quality Checker in Python Check the basic quality of any dataset. Sneak Peek Read full tutorial at Medium. Explore the app Requirements python 3.

MalaDeep 8 Feb 23, 2022
An okayish python script to generate a random Euler circuit with given number of vertices and edges.

Euler-Circuit-Test-Case-Generator An okayish python script to generate a random Euler circuit with given number of vertices and edges. Executing the S

Alen Antony 1 Nov 13, 2021
Regression Metrics Calculation Made easy

Regression Metrics Mean Absolute Error Mean Square Error Root Mean Square Error Root Mean Square Logarithmic Error Root Mean Square Logarithmic Error

Ashish Patel 12 Jan 02, 2023
Program to extract signatures from documents.

Extracting Signatures from Bank Checks Introduction Ahmed et al. [1] suggest a connected components-based method for segmenting signatures in document

Muhammad Saif Ullah Khan 9 Jan 26, 2022
A repository containing several general purpose Python scripts to automate daily and common tasks.

General Purpose Scripts Introduction This repository holds a curated list of Python scripts which aim to help us automate daily and common tasks. You

GDSC RCCIIT 46 Dec 25, 2022