Message Broker Adapter

The message broker has the task of providing a unified interface to interact with different message brokers (e.g., RabbitMQ, Kafka, etc.). It allows sending and receiving messages in a standardized way, abstracting the underlying implementation details of each broker.

This module defines the following components:

  • Message: A dataclass representing a message with a unique ID, metadata, and payload. Includes methods for serialization and deserialization.

  • MessageContext: Provides context and lifecycle control for a message, including acknowledgment (ack) and negative acknowledgment (nack).

  • Subscription: Represents a subscription to a message stream/channel, providing an asynchronous iterator interface to consume messages.

  • MessageBroker: The main broker interface for connecting, disconnecting, publishing, subscribing, and listening to message streams using Redis Streams as backend.

  • MessageRetryException: Exception to signal that a message should be retried.

class Message

Bases: object

Contains the message data and metadata

Variables:
  • id (str) – Unique identifier for the message

  • metadata (dict) – Additional metadata associated with the message

  • payload (dict) – The actual message data

__init__(id, metadata, payload)
classmethod from_dict(data)

Create a Message instance from a dictionary representation

to_dict()

Convert the message to a dictionary representation

class MessageContext

Bases: object

Context for controlling message lifecycle

__init__(stream, group, consumer, message, redis_ref)

Initialize the message context. This gives you control over the message lifecycle (acknowledgment, negative acknowledgment, etc.)

Parameters:
  • stream (str) – Stream name

  • group (str) – Consumer group name

  • consumer (str) – Consumer name

  • message (Message) – The message instance

  • redis_ref (Redis) – Redis client reference

async ack()

Acknowledge the receival of a message

get_message()

Get the message associated with this context

property id

Get the unique identifier of the message

property metadata

Get the metadata of the message

async nack()

Negatively acknowledge the receival of a message. This will make the message available for redelivery.

property payload

Get the payload of the message

class MessageBroker

Bases: object

__init__(message_broker_connection_url=None)

Initialize the MessageBroker instance.

Parameters:

message_broker_connection_url (str) – Connection URL for the message broker. If not provided, it will be read from the environment variable ‘MESSAGE_BROKER_CONNECTION_URL’.

async connect()

Connect to the message broker.

async disconnect()

Disconnect from the message broker.

async listen(stream, group, consumer, on_message, concurrency=1)

Listen to a stream and process incoming messages using the provided callback. It returns a callable to stop listening. It handles for you the concurrency, auto-acknowledgment, and retry strategy.

Parameters:
  • stream (str) – The stream to listen to.

  • group (str) – The consumer group name.

  • consumer (str) – The consumer name.

  • on_message (Callable[[MessageContext], Awaitable[None]]) – Callback function to process incoming messages.

  • concurrency (int, optional) – Number of concurrent message processing tasks. Defaults to 1.

Returns:

Callable[[], Awaitable[None]] – A callable that, when invoked, stops listening to the stream and cleans up resources.

async publish(stream, message)

Publish a message in the message broker.

Parameters:
  • stream (str) – Stream to which the message will be published. If the stream does not exist, it will be created.

  • message (Message) – Message to be published.

async subscribe(stream, group, consumer, retryLogic='NoRetry')

Returns a subscription object for the given stream that can be used to manage message consumption on your own.

Parameters:

stream (str) – The stream to subscribe to.

Returns:

Subscription – Subscription object for managing message consumption.

class Subscription

Bases: object

Subscription to a message channel

__init__(stream, group, consumer, retry_logic, redis_ref)

Initialize the subscription.

Parameters:
  • stream (str) – Stream name

  • group (str) – Consumer group name

  • consumer (str) – Consumer name

  • retry_logic (str) – Retry logic strategy

  • redis_ref (Redis) – Redis client reference

async close()

Unsubscribe from the channel and clean up resources

exception MessageRetryException

Bases: Exception

Exception raised when a message needs to be retried

__init__(*args, **kwargs)
classmethod __new__(*args, **kwargs)

Example Usage

Case 1: Using the managed listen method

import asyncio
from pyutils.data_adapter.message_broker import Message, MessageBroker

# Define the connection URL for the message broker
MESSAGE_BROKER_CONNECTION_URL = "redis://localhost:6379/0"

# Define the main async function
async def main():
    # Connect to the message broker
    broker = MessageBroker()
    await broker.connect()

    # Publish a message
    message = Message(id="1", payload={"key": "value"}, metadata={"header1": "value1"})
    await broker.publish("my_channel", message)

    # Listen to a channel
    async def on_message(ctx):
        print(f"Received message: {ctx.payload} with metadata: {ctx.metadata}")
    stop_listening = await broker.listen(
        stream="my_channel",
        group="my_group",
        consumer="my_consumer",
        on_message=on_message,
        concurrency=2
    )

    # Keep the listener running for a while
    await asyncio.sleep(10)
    await stop_listening()
    await broker.disconnect()

# Run the main function
asyncio.run(main())

Case 2: Using the subscription directly and having the message broker connection URL set via environment variable

.env:

MESSAGE_BROKER_CONNECTION_URL=redis://localhost:6379/0

main.py:

import asyncio
from pyutils.data_adapter.message_broker import Message, MessageBroker

async def main():
    # Connect to the message broker
    broker = MessageBroker()
    await broker.connect()

    # Publish a message
    message = Message(id="2", payload={"key": "value"}, metadata={"header1": "value1"})
    await broker.publish("my_channel", message)

    # Subscribe to a channel
    subscription = await broker.subscribe("my_channel", group="my_group", consumer="my_consumer")

    # Consume the message from the subscription
    async for ctx in subscription:
        print(f"Received message: {ctx.payload} with metadata: {ctx.metadata}")
        await ctx.ack()

    # Disconnect from the broker
    await broker.disconnect()

# Run the main function
asyncio.run(main())