RabbitMQAdapter
What it is
- A RabbitMQ-backed implementation of
IBusAdapterusingpika. - Supports:
- Publishing messages to a topic exchange.
- Consuming messages from a durable queue bound to a topic exchange + routing key.
- Provides context-manager support for clean shutdown of the publishing connection.
Public API
class RabbitMQAdapter(IBusAdapter)__init__(rabbitmq_url: str)- Store the RabbitMQ connection URL.
__enter__() -> RabbitMQAdapter/__exit__(...)- Enables
with RabbitMQAdapter(...) as bus: ...; closes resources on exit.
- Enables
close() -> None- Closes the internal publish channel/connection (consumer threads manage their own connections).
topic_publish(topic: str, routing_key: str, payload: bytes) -> None- Declares the
topicexchange (typetopic, durable) once per adapter instance. - Publishes
payloadwithdelivery_mode=2(persistent). - On failure: closes publish connection and raises
ConnectionError("RabbitMQ publish failed").
- Declares the
topic_consume(topic: str, routing_key: str, callback: Callable[[bytes], None]) -> threading.Thread- Spawns a daemon thread that:
- Connects to RabbitMQ, declares a durable topic exchange.
- Declares a durable, non-exclusive, non-auto-delete queue with a deterministic name based on
(topic, routing_key). - Binds queue to exchange with the given routing key.
- Consumes messages with manual ack/nack:
callback(body)success ⇒basic_ackStopIterationfrom callback ⇒basic_ackand stops consuming (thread exits)- any other exception ⇒
basic_nack(requeue=True)then re-raises
- On AMQP/OS error: raises
ConnectionError("RabbitMQ consume failed")inside the thread.
- Spawns a daemon thread that:
Configuration/Dependencies
- Requires
pika. - Requires a RabbitMQ URL suitable for
pika.URLParameters, e.g.:amqp://user:pass@host:5672/vhost
- Logging via
naas_abi_core.utils.Logger.logger(used at debug level when starting consumption).
Usage
from naas_abi_core.services.bus.adapters.secondary.RabbitMQAdapter import RabbitMQAdapter
RABBIT_URL = "amqp://guest:guest@localhost:5672/%2F"
def on_message(body: bytes) -> None:
print("got:", body.decode("utf-8"))
# raise StopIteration to stop the consumer thread after one message
raise StopIteration
with RabbitMQAdapter(RABBIT_URL) as bus:
bus.topic_publish("events", "user.created", b"hello")
t = bus.topic_consume("events", "user.created", on_message)
t.join(timeout=5)
Caveats
topic_consume(...)runs in a daemon thread; exceptions raised there will not be raised in the caller thread.- Stopping consumption is only implemented via raising
StopIterationfrom the callback. - Queue names are derived from
sha256(f"{topic}:{routing_key}"); different routing keys create different queues (no wildcards handled beyond RabbitMQ routing itself).