A library from RCTI+ to handle RabbitMQ tasks (connect, send, receive, etc) in Python.



A library from RCTI+ to handle RabbitMQ tasks (connect, send, receive, etc) in Python.


  • Python >=3.7.3
  • Pika ==1.2.0
  • Aio-pika ==6.8.0
  • Requests >=2.25.1


pip install rctiplus-rabbitmq-python-sdk

Getting latest version

pip install rctiplus-rabbitmq-python-sdk --upgrade


To start using this SDK, you may follow given instructions bellow in order.

Payload handler

First, you need to create a payload class handler that implement MessagePayload. For example, we want to make a class to handle JSON payload:

None: self.firstname = firstname self.lastname = lastname def __str__(self) -> str: """Convert JSON to string payload message Returns: str: String payload message """ return json.dumps({ 'firstname': self.firstname, 'lastname': self.lastname }) @classmethod def from_str(cls, message: str) -> 'JSONPayload': """Generate data from JSON string payload message Returns: JSONPayload: Generated data """ payload = json.loads(message) return cls(firstname=payload['firstname'], lastname=payload['lastname'])">
import json
from rctiplus_rabbitmq_python_sdk import MessagePayload

class JSONPayload(MessagePayload):
    """Example class to handle JSON payload

    def __init__(self, firstname: str, lastname: str) -> None:
        self.firstname = firstname
        self.lastname = lastname

    def __str__(self) -> str:
        """Convert JSON to string payload message

            str: String payload message
        return json.dumps({
            'firstname': self.firstname,
            'lastname': self.lastname
    def from_str(cls, message: str) -> 'JSONPayload':
        """Generate data from JSON string payload message

            JSONPayload: Generated data
        payload = json.loads(message)
        return cls(firstname=payload['firstname'], lastname=payload['lastname'])

MessagePayload class from the SDK's core has this functions that require to implemented:

'MessagePayload': """Generate data from specified string payload message format Raises: NotImplementedError: Raise an error if not implemented """ raise NotImplementedError() def __str__(self) -> str: """Convert specified data format to string payload message Raises: NotImplementedError: Raise an error if not implemented Returns: str: String payload message """ raise NotImplementedError()">
class MessagePayload:
    """Python RabbitMQ message payload
    def from_str(cls, message: str) -> 'MessagePayload':
        """Generate data from specified string payload message format

            NotImplementedError: Raise an error if not implemented
        raise NotImplementedError()

    def __str__(self) -> str:
        """Convert specified data format to string payload message

            NotImplementedError: Raise an error if not implemented

            str: String payload message
        raise NotImplementedError()

Connect to RabbitMQ

Making connection to RabbitMQ server can be done by doing this simple way:

from rctiplus_rabbitmq_python_sdk import RabbitMQ

conn = RabbitMQ()
conn.connect(host='localhost', port=5672, username='guest', password='guest')

Sending message

After you have payload class handler & connected to the RabbitMQ server, now you can try to send a messsage to queue channel. For example, we will send JSON payload message to test queue:

payload = JSONPayload('John', 'Doe')
print('payload:', payload)
conn.send('test', payload)

Receiving message

Great. Now, in our consumer app, we want to listen & receive that message, and then doing some stuff:

def callback(ch, method, properties, body):
    print("[x] Received %r" % body)
    data = JSONPayload.from_str(body)
    print(f'data: firstname={data.firstname}, lastname={data.lastname}')

conn.receive('test', callback)

For callback function, according to Pikas standart library, you need to pass 4 arguments ch, method, properties and body to catch all needed values from incomming message.

Putting it all together

Here is the complete example from the code above:

Complete example of sender or producer app:

None: self.firstname = firstname self.lastname = lastname def __str__(self) -> str: """Convert JSON to string payload message Returns: str: String payload message """ return json.dumps({ 'firstname': self.firstname, 'lastname': self.lastname }) @classmethod def from_str(cls, message: str) -> 'JSONPayload': """Generate data from JSON string payload message Returns: JSONPayload: Generated data """ payload = json.loads(message) return cls(firstname=payload['firstname'], lastname=payload['lastname']) # Connect to RabbitMQ conn = RabbitMQ() conn.connect(host='localhost', port=5672, username='guest', password='guest') # Send payload to queue payload = JSONPayload('John', 'Doe') print('payload:', payload) conn.send('test', payload)">
import json
from rctiplus_rabbitmq_python_sdk import RabbitMQ, MessagePayload

# Create payload class handler that implement `MessagePayload`
class JSONPayload(MessagePayload):
    """Example class to handle JSON payload

    def __init__(self, firstname: str, lastname: str) -> None:
        self.firstname = firstname
        self.lastname = lastname

    def __str__(self) -> str:
        """Convert JSON to string payload message

            str: String payload message
        return json.dumps({
            'firstname': self.firstname,
            'lastname': self.lastname
    def from_str(cls, message: str) -> 'JSONPayload':
        """Generate data from JSON string payload message

            JSONPayload: Generated data
        payload = json.loads(message)
        return cls(firstname=payload['firstname'], lastname=payload['lastname'])

# Connect to RabbitMQ
conn = RabbitMQ()
conn.connect(host='localhost', port=5672, username='guest', password='guest')

# Send payload to queue
payload = JSONPayload('John', 'Doe')
print('payload:', payload)
conn.send('test', payload)

Complete example of consumer or receiver app:

None: self.firstname = firstname self.lastname = lastname def __str__(self) -> str: """Convert JSON to string payload message Returns: str: String payload message """ return json.dumps({ 'firstname': self.firstname, 'lastname': self.lastname }) @classmethod def from_str(cls, message: str) -> 'JSONPayload': """Generate data from JSON string payload message Returns: JSONPayload: Generated data """ payload = json.loads(message) return cls(firstname=payload['firstname'], lastname=payload['lastname']) # Connect to RabbitMQ conn = RabbitMQ() conn.connect(host='localhost', port=5672, username='guest', password='guest') # Create a callback to be executed immadiately after recieved a message def callback(ch, method, properties, body): print("[x] Received %r" % body) # Generate data from string payload message data = JSONPayload.from_str(body) print(f'data: firstname={data.firstname}, lastname={data.lastname}') # Receive & listen messages from queue channel conn.receive('test', callback)">
import json
from rctiplus_rabbitmq_python_sdk import RabbitMQ, MessagePayload

# Create payload class handler that implement `MessagePayload`
class JSONPayload(MessagePayload):
    """Example class to handle JSON payload

    def __init__(self, firstname: str, lastname: str) -> None:
        self.firstname = firstname
        self.lastname = lastname

    def __str__(self) -> str:
        """Convert JSON to string payload message

            str: String payload message
        return json.dumps({
            'firstname': self.firstname,
            'lastname': self.lastname

    def from_str(cls, message: str) -> 'JSONPayload':
        """Generate data from JSON string payload message

            JSONPayload: Generated data
        payload = json.loads(message)
        return cls(firstname=payload['firstname'], lastname=payload['lastname'])

# Connect to RabbitMQ
conn = RabbitMQ()
conn.connect(host='localhost', port=5672, username='guest', password='guest')

# Create a callback to be executed immadiately after recieved a message
def callback(ch, method, properties, body):
    print("[x] Received %r" % body)
    # Generate data from string payload message
    data = JSONPayload.from_str(body)
    print(f'data: firstname={data.firstname}, lastname={data.lastname}')

# Receive & listen messages from queue channel
conn.receive('test', callback)


This SDK also support asynchronous process. To use this feature, use AIORabbitMQ instead of RabbitMQ. All methods provided in AIORabbitMQ are treated as async function. So, when you calling the methods, you need to await them.

Async connect to RabbitMQ

from rctiplus_rabbitmq_python_sdk import AIORabbitMQ

conn = AIORabbitMQ(loop)
await conn.connect(host='localhost', port=5672, username='guest', password='guest')

loop is an asynchronous event loop, example: asyncio.get_event_loop()

Async sending message

payload = JSONPayload('John', 'Doe')
print('payload:', payload)
await conn.send('test', payload)

Async receiving message

async def callback(message):
    body = message.body
    print("[x] Received %r" % body)
    data = JSONPayload.from_str(body)
    print(f'data: firstname={data.firstname}, lastname={data.lastname}')

await conn.receive('test', callback)

In asynchronous process, you just need pass 1 argument on callback function. This argument is a representation of aio_pika.IncomingMessage to catch all needed values from incomming message.

Complete example of asynchronous process

Here is the complete example of asynchronous process above:

Complete example of asynchronous sender or producer app:

None: self.firstname = firstname self.lastname = lastname def __str__(self) -> str: """Convert JSON to string payload message Returns: str: String payload message """ return json.dumps({ 'firstname': self.firstname, 'lastname': self.lastname }) @classmethod def from_str(cls, message: str) -> 'JSONPayload': """Generate data from JSON string payload message Returns: JSONPayload: Generated data """ payload = json.loads(message) return cls(firstname=payload['firstname'], lastname=payload['lastname']) # Main function async def main(loop): # Connect to RabbitMQ conn = AIORabbitMQ(loop) await conn.connect(host='localhost', port=5672, username='guest', password='guest') async with conn.connection: # Send payload to queue payload = JSONPayload('John', 'Doe') print('payload:', payload) await conn.send('test', payload) # Event loop loop = asyncio.get_event_loop() loop.run_until_complete(main(loop)) loop.close()">
import json
import asyncio
from rctiplus_rabbitmq_python_sdk import AIORabbitMQ, MessagePayload

# Create payload class handler that implement `MessagePayload`
class JSONPayload(MessagePayload):
    """Example class to handle JSON payload

    def __init__(self, firstname: str, lastname: str) -> None:
        self.firstname = firstname
        self.lastname = lastname

    def __str__(self) -> str:
        """Convert JSON to string payload message

            str: String payload message
        return json.dumps({
            'firstname': self.firstname,
            'lastname': self.lastname
    def from_str(cls, message: str) -> 'JSONPayload':
        """Generate data from JSON string payload message

            JSONPayload: Generated data
        payload = json.loads(message)
        return cls(firstname=payload['firstname'], lastname=payload['lastname'])

# Main function
async def main(loop):

    # Connect to RabbitMQ
    conn = AIORabbitMQ(loop)
    await conn.connect(host='localhost', port=5672, username='guest', password='guest')
    async with conn.connection:
        # Send payload to queue
        payload = JSONPayload('John', 'Doe')
        print('payload:', payload)
        await conn.send('test', payload)

# Event loop
loop = asyncio.get_event_loop()

Complete example of asynchronous consumer or receiver app:

None: self.firstname = firstname self.lastname = lastname def __str__(self) -> str: """Convert JSON to string payload message Returns: str: String payload message """ return json.dumps({ 'firstname': self.firstname, 'lastname': self.lastname }) @classmethod def from_str(cls, message: str) -> 'JSONPayload': """Generate data from JSON string payload message Returns: JSONPayload: Generated data """ payload = json.loads(message) return cls(firstname=payload['firstname'], lastname=payload['lastname']) # Main function async def main(loop): # Connect to RabbitMQ conn = AIORabbitMQ(loop) await conn.connect(host='localhost', port=5672, username='guest', password='guest') # Create a callback to be executed immadiately after recieved a message async def callback(message): body = message.body print("[x] Received %r" % body) # Generate data from string payload message data = JSONPayload.from_str(body) print(f'data: firstname={data.firstname}, lastname={data.lastname}') # Receive & listen messages from queue channel await conn.receive('test', callback) return conn # Event loop loop = asyncio.get_event_loop() connection = loop.run_until_complete(main(loop)) try: loop.run_forever() finally: loop.run_until_complete(connection.disconnect())">
import json
import asyncio
from rctiplus_rabbitmq_python_sdk import AIORabbitMQ, MessagePayload

# Create payload class handler that implement `MessagePayload`
class JSONPayload(MessagePayload):
    """Example class to handle JSON payload

    def __init__(self, firstname: str, lastname: str) -> None:
        self.firstname = firstname
        self.lastname = lastname

    def __str__(self) -> str:
        """Convert JSON to string payload message

            str: String payload message
        return json.dumps({
            'firstname': self.firstname,
            'lastname': self.lastname
    def from_str(cls, message: str) -> 'JSONPayload':
        """Generate data from JSON string payload message

            JSONPayload: Generated data
        payload = json.loads(message)
        return cls(firstname=payload['firstname'], lastname=payload['lastname'])

# Main function
async def main(loop):

    # Connect to RabbitMQ
    conn = AIORabbitMQ(loop)
    await conn.connect(host='localhost', port=5672, username='guest', password='guest')
    # Create a callback to be executed immadiately after recieved a message
    async def callback(message):
        body = message.body
        print("[x] Received %r" % body)
        # Generate data from string payload message
        data = JSONPayload.from_str(body)
        print(f'data: firstname={data.firstname}, lastname={data.lastname}')

    # Receive & listen messages from queue channel
    await conn.receive('test', callback)

    return conn

# Event loop
loop = asyncio.get_event_loop()
connection = loop.run_until_complete(main(loop))


GNU General Public License v3

You might also like...
Retrying library for Python

Tenacity Tenacity is an Apache 2.0 licensed general-purpose retrying library, written in Python, to simplify the task of adding retry behavior to just

Retrying is an Apache 2.0 licensed general-purpose retrying library, written in Python, to simplify the task of adding retry behavior to just about anything.

Retrying Retrying is an Apache 2.0 licensed general-purpose retrying library, written in Python, to simplify the task of adding retry behavior to just

isort is a Python utility / library to sort imports alphabetically, and automatically separated into sections and by type.
isort is a Python utility / library to sort imports alphabetically, and automatically separated into sections and by type.

isort is a Python utility / library to sort imports alphabetically, and automatically separated into sections and by type. It provides a command line utility, Python library and plugins for various editors to quickly sort all your imports.

A Python library for reading, writing and visualizing the OMEGA Format
A Python library for reading, writing and visualizing the OMEGA Format

A Python library for reading, writing and visualizing the OMEGA Format, targeted towards storing reference and perception data in the automotive context on an object list basis with a focus on an urban use case.

RapidFuzz is a fast string matching library for Python and C++

RapidFuzz is a fast string matching library for Python and C++, which is using the string similarity calculations from FuzzyWuzzy

pydsinternals - A Python native library containing necessary classes, functions and structures to interact with Windows Active Directory.
pydsinternals - A Python native library containing necessary classes, functions and structures to interact with Windows Active Directory.

pydsinternals - Directory Services Internals Library A Python native library containing necessary classes, functions and structures to interact with W

Library for processing molecules and reactions in python way

Chython [ˈkʌɪθ(ə)n] Library for processing molecules and reactions in python way. Features: Read/write/convert formats: MDL .RDF (.RXN) and .SDF (.MOL

ecowater-softner is a Python library for collecting information from Ecowater water softeners.

Ecowater Softner ecowater-softner is a Python library for collecting information from Ecowater water softeners. Installation Use the package manager p

A tiny Python library for generating public IDs from integers

pids Create short public identifiers based on integer IDs. Installation pip install pids Usage from pids import pid public_id = pid.from_int(1234) #

Dali Kewara
An unexpected journey and gonna make it simple but Spectacular!
Dali Kewara
A monitor than send discord webhook when a specific monitored product has stock in your nearby pickup stores.

Welcome to Apple In-store Monitor This is a monitor that are not fully scaled, and might still have some bugs.

null 5 Jun 16, 2022
Auto-generate /etc/hosts for HackTheBox machines

Auto-generate /etc/hosts for HackTheBox machines Save yourself some tedium on getting started on a new machine by having your /etc/hosts ready to go.

null 3 Feb 16, 2022
Parse URLs for DOIs, PubMed identifiers, PMC identifiers, arXiv identifiers, etc.

citation-url Parse URLs for DOIs, PubMed identifiers, PMC identifiers, arXiv identifiers, etc. This module has a single parse() function that takes in

Charles Tapley Hoyt 2 Feb 12, 2022
A set of Python scripts to surpass human limits in accomplishing simple tasks.

Human benchmark fooler Summary A set of Python scripts with Selenium designed to surpass human limits in accomplishing simple tasks available on https

Bohdan Dudchenko 3 Feb 10, 2022
A repository containing several general purpose Python scripts to automate daily and common tasks.

General Purpose Scripts Introduction This repository holds a curated list of Python scripts which aim to help us automate daily and common tasks. You

GDSC RCCIIT 46 Dec 25, 2022
A clock app, which helps you with routine tasks.

Clock This app helps you with routine tasks. Alarm Clock Timer Stop Watch World Time (Which city you want) About me Full name: Matin Ardestani Age: 14

Matin Ardestani 13 Jul 30, 2022
This tool lets you perform some quick tasks for CTFs and Pentesting.

This tool lets you convert strings and numbers between number bases (2, 8, 10 and 16) as well as ASCII text. You can use the IP address analyzer to find out details on IPv4 and perform abbreviation as well as expansion on IPv6 addresses.It can also perform a two's complement calculation as well.

Ayomide Ayodele-Soyebo 1 Jul 16, 2022
ticktock is a minimalist library to view Python time performance of Python code.

ticktock is a minimalist library to view Python time performance of Python code.

Victor Benichoux 30 Sep 28, 2022
A functional standard library for Python.

Toolz A set of utility functions for iterators, functions, and dictionaries. See the PyToolz documentation at https://toolz.readthedocs.io LICENSE New

null 4.1k Dec 30, 2022
🔩 Like builtins, but boltons. 250+ constructs, recipes, and snippets which extend (and rely on nothing but) the Python standard library. Nothing like Michael Bolton.

Boltons boltons should be builtins. Boltons is a set of over 230 BSD-licensed, pure-Python utilities in the same spirit as — and yet conspicuously mis

Mahmoud Hashemi 6k Jan 4, 2023