Connects to RabbitMQ, declares a durable topic exchange.
Declares a durable, non-exclusive, non-auto-delete queue with a deterministic name based on (topic, routing_key).
Binds queue to exchange with the given routing key.
Consumes messages with manual ack/nack:
callback(body) success ⇒ basic_ack
StopIteration from callback ⇒ basic_ack and stops consuming (thread exits)
any other exception ⇒ basic_nack(requeue=True) then re-raises
On AMQP/OS error: raises ConnectionError("RabbitMQ consume failed") inside the thread.
Configuration/Dependencies
Requires pika.
Requires a RabbitMQ URL suitable for pika.URLParameters, e.g.:
amqp://user:pass@host:5672/vhost
Logging via naas_abi_core.utils.Logger.logger (used at debug level when starting consumption).
Usage
from naas_abi_core.services.bus.adapters.secondary.RabbitMQAdapter import RabbitMQAdapterRABBIT_URL = "amqp://guest:guest@localhost:5672/%2F"def on_message(body: bytes) -> None: print("got:", body.decode("utf-8")) # raise StopIteration to stop the consumer thread after one message raise StopIterationwith RabbitMQAdapter(RABBIT_URL) as bus: bus.topic_publish("events", "user.created", b"hello") t = bus.topic_consume("events", "user.created", on_message) t.join(timeout=5)
Caveats
topic_consume(...) runs in a daemon thread; exceptions raised there will not be raised in the caller thread.
Stopping consumption is only implemented via raising StopIteration from the callback.
Queue names are derived from sha256(f"{topic}:{routing_key}"); different routing keys create different queues (no wildcards handled beyond RabbitMQ routing itself).