BusService
What it is
- A thin service wrapper around an
IBusAdapter that exposes topic-based publish/consume operations.
- Delegates all work to the provided adapter implementation.
Public API
- Class
BusService(adapter: IBusAdapter)
topic_publish(topic: str, routing_key: str, payload: bytes) -> None
- Publish a
payload to a topic using a routing_key.
topic_consume(topic: str, routing_key: str, callback: Callable[[bytes], None]) -> threading.Thread
- Start consuming messages for a
topic and routing_key, invoking callback(payload) for each message.
- Returns the
Thread used by the adapter to run consumption.
Configuration/Dependencies
- Depends on:
naas_abi_core.services.bus.BusPorts.IBusAdapter (must provide topic_publish and topic_consume)
naas_abi_core.services.ServiceBase.ServiceBase (base class)
- Standard library:
threading.Thread
typing.Callable
Usage
from threading import Thread
from naas_abi_core.services.bus.BusService import BusService
from naas_abi_core.services.bus.BusPorts import IBusAdapter
class DummyAdapter(IBusAdapter):
def topic_publish(self, topic: str, routing_key: str, payload: bytes) -> None:
print("publish", topic, routing_key, payload)
def topic_consume(self, topic: str, routing_key: str, callback):
def run():
callback(b"hello")
t = Thread(target=run, daemon=True)
t.start()
return t
bus = BusService(DummyAdapter())
bus.topic_publish("events", "user.created", b'{"id": 1}')
def on_message(payload: bytes) -> None:
print("received", payload)
t = bus.topic_consume("events", "user.*", on_message)
t.join()
Caveats
- No validation or error handling is implemented in
BusService; behavior and threading model are determined by the adapter.
topic_consume returns a Thread; lifecycle management (e.g., stopping the consumer) is adapter-specific and not defined here.