Message Broker Adapter¶
The message broker has the task of providing a unified interface to interact with different message brokers (e.g., RabbitMQ, Kafka, etc.). It allows sending and receiving messages in a standardized way, abstracting the underlying implementation details of each broker.
This module defines the following components:
Message: A dataclass representing a message with a unique ID, metadata, and payload. Includes methods for serialization and deserialization.MessageContext: Provides context and lifecycle control for a message, including acknowledgment (ack) and negative acknowledgment (nack).Subscription: Represents a subscription to a message stream/channel, providing an asynchronous iterator interface to consume messages.MessageBroker: The main broker interface for connecting, disconnecting, publishing, subscribing, and listening to message streams using Redis Streams as backend.MessageRetryException: Exception to signal that a message should be retried.
- class Message¶
Bases:
objectContains the message data and metadata
- Variables:
id (str) – Unique identifier for the message
metadata (dict) – Additional metadata associated with the message
payload (dict) – The actual message data
- __init__(id, metadata, payload)¶
- classmethod from_dict(data)¶
Create a Message instance from a dictionary representation
- to_dict()¶
Convert the message to a dictionary representation
- class MessageContext¶
Bases:
objectContext for controlling message lifecycle
- __init__(stream, group, consumer, message, redis_ref)¶
Initialize the message context. This gives you control over the message lifecycle (acknowledgment, negative acknowledgment, etc.)
- Parameters:
stream (str) – Stream name
group (str) – Consumer group name
consumer (str) – Consumer name
message (Message) – The message instance
redis_ref (Redis) – Redis client reference
- async ack()¶
Acknowledge the receival of a message
- get_message()¶
Get the message associated with this context
- property id¶
Get the unique identifier of the message
- property metadata¶
Get the metadata of the message
- async nack()¶
Negatively acknowledge the receival of a message. This will make the message available for redelivery.
- property payload¶
Get the payload of the message
- class MessageBroker¶
Bases:
object- __init__(message_broker_connection_url=None)¶
Initialize the MessageBroker instance.
- Parameters:
message_broker_connection_url (str) – Connection URL for the message broker. If not provided, it will be read from the environment variable ‘MESSAGE_BROKER_CONNECTION_URL’.
- async connect()¶
Connect to the message broker.
- async disconnect()¶
Disconnect from the message broker.
- async listen(stream, group, consumer, on_message, concurrency=1)¶
Listen to a stream and process incoming messages using the provided callback. It returns a callable to stop listening. It handles for you the concurrency, auto-acknowledgment, and retry strategy.
- Parameters:
stream (str) – The stream to listen to.
group (str) – The consumer group name.
consumer (str) – The consumer name.
on_message (Callable[[MessageContext], Awaitable[None]]) – Callback function to process incoming messages.
concurrency (int, optional) – Number of concurrent message processing tasks. Defaults to 1.
- Returns:
Callable[[], Awaitable[None]] – A callable that, when invoked, stops listening to the stream and cleans up resources.
- async publish(stream, message)¶
Publish a message in the message broker.
- Parameters:
stream (str) – Stream to which the message will be published. If the stream does not exist, it will be created.
message (Message) – Message to be published.
- async subscribe(stream, group, consumer, retryLogic='NoRetry')¶
Returns a subscription object for the given stream that can be used to manage message consumption on your own.
- Parameters:
stream (str) – The stream to subscribe to.
- Returns:
Subscription – Subscription object for managing message consumption.
- class Subscription¶
Bases:
objectSubscription to a message channel
- __init__(stream, group, consumer, retry_logic, redis_ref)¶
Initialize the subscription.
- Parameters:
stream (str) – Stream name
group (str) – Consumer group name
consumer (str) – Consumer name
retry_logic (str) – Retry logic strategy
redis_ref (Redis) – Redis client reference
- async close()¶
Unsubscribe from the channel and clean up resources
- exception MessageRetryException¶
Bases:
ExceptionException raised when a message needs to be retried
- __init__(*args, **kwargs)¶
- classmethod __new__(*args, **kwargs)¶
Example Usage¶
Case 1: Using the managed listen method
import asyncio
from pyutils.data_adapter.message_broker import Message, MessageBroker
# Define the connection URL for the message broker
MESSAGE_BROKER_CONNECTION_URL = "redis://localhost:6379/0"
# Define the main async function
async def main():
# Connect to the message broker
broker = MessageBroker()
await broker.connect()
# Publish a message
message = Message(id="1", payload={"key": "value"}, metadata={"header1": "value1"})
await broker.publish("my_channel", message)
# Listen to a channel
async def on_message(ctx):
print(f"Received message: {ctx.payload} with metadata: {ctx.metadata}")
stop_listening = await broker.listen(
stream="my_channel",
group="my_group",
consumer="my_consumer",
on_message=on_message,
concurrency=2
)
# Keep the listener running for a while
await asyncio.sleep(10)
await stop_listening()
await broker.disconnect()
# Run the main function
asyncio.run(main())
Case 2: Using the subscription directly and having the message broker connection URL set via environment variable
.env:
MESSAGE_BROKER_CONNECTION_URL=redis://localhost:6379/0
main.py:
import asyncio
from pyutils.data_adapter.message_broker import Message, MessageBroker
async def main():
# Connect to the message broker
broker = MessageBroker()
await broker.connect()
# Publish a message
message = Message(id="2", payload={"key": "value"}, metadata={"header1": "value1"})
await broker.publish("my_channel", message)
# Subscribe to a channel
subscription = await broker.subscribe("my_channel", group="my_group", consumer="my_consumer")
# Consume the message from the subscription
async for ctx in subscription:
print(f"Received message: {ctx.payload} with metadata: {ctx.metadata}")
await ctx.ack()
# Disconnect from the broker
await broker.disconnect()
# Run the main function
asyncio.run(main())