RabbitMQAdapter
What it is
- A RabbitMQ-backed implementation of
IBusAdapterusingpika. - Supports:
- Publishing messages to a durable topic exchange.
- Consuming messages from a durable, non-exclusive, non-auto-deleted queue bound to a topic/routing key.
- Provides basic connection management for publishing (reuses a channel and reconnects once on failure).
Public API
class RabbitMQAdapter(IBusAdapter)__init__(rabbitmq_url: str)- Store RabbitMQ connection URL.
- Context manager:
__enter__() -> RabbitMQAdapter__exit__(exc_type, exc, tb) -> bool(callsclose(), does not suppress exceptions)
close() -> None- Closes the internal publish connection/channel and clears cached exchange declarations.
topic_publish(topic: str, routing_key: str, payload: bytes) -> None- Publishes
payloadto a durable topic exchange namedtopicwith routing keyrouting_key. - If publish fails, drops the cached connection and retries once, then raises
ConnectionErroron failure.
- Publishes
topic_consume(topic: str, routing_key: str, callback: Callable[[bytes], None]) -> threading.Thread- Starts a daemon thread that:
- Declares a durable topic exchange named
topic. - Declares a durable queue whose name is derived from
topicandrouting_keyvia SHA-256. - Binds the queue to the exchange with
routing_key. - Consumes messages and calls
callback(body).
- Declares a durable topic exchange named
- Acknowledgement behavior:
- On success:
basic_ack. - If
callbackraisesStopIteration:basic_ackthen stop consuming. - On other exceptions:
basic_nack(requeue=True)and re-raise (terminates the consumer loop with an error).
- On success:
- Starts a daemon thread that:
Configuration/Dependencies
- Requires
pika. - Requires a RabbitMQ URL string compatible with
pika.URLParameters, e.g.:amqp://guest:guest@localhost:5672/%2F
- Declares exchanges as:
exchange_type="topic",durable=True
- Publish message properties:
delivery_mode=2(persistent)
Usage
import time
from naas_abi_core.services.bus.adapters.secondary.RabbitMQAdapter import RabbitMQAdapter
RABBIT_URL = "amqp://guest:guest@localhost:5672/%2F"
def on_message(body: bytes) -> None:
print("got:", body.decode())
# Stop after first message:
raise StopIteration
with RabbitMQAdapter(RABBIT_URL) as bus:
bus.topic_publish("my.topic", "demo.key", b"hello")
t = bus.topic_consume("my.topic", "demo.key", on_message)
time.sleep(1) # allow consumer thread to run
Caveats
topic_consume(...)runs in a daemon thread; exceptions inside the consumer loop will terminate that thread.- The consuming queue name is deterministic per
(topic, routing_key)(SHA-256); multiple consumers using the same pair will share the same queue. - Stopping consumption is done by raising
StopIterationfrom the callback (acks the message, then stops).