BusService
What it is
BusServiceis a thin service wrapper around anIBusAdapter.- It exposes topic-based publish/consume operations and delegates all work to the injected adapter.
Public API
-
Class
BusService(adapter: IBusAdapter)- Initializes the service with a bus adapter implementation.
-
topic_publish(topic: str, routing_key: str, payload: bytes) -> None- Publishes
payloadto atopicusing the givenrouting_key. - Delegates to
IBusAdapter.topic_publish(...).
- Publishes
-
topic_consume(topic: str, routing_key: str, callback: Callable[[bytes], None]) -> Thread- Starts consuming messages for
topic/routing_key. - Invokes
callback(payload_bytes)for each received message. - Returns a
threading.Threadcreated/managed by the adapter viaIBusAdapter.topic_consume(...).
- Starts consuming messages for
Configuration/Dependencies
- Depends on:
naas_abi_core.services.bus.BusPorts.IBusAdapter(must be provided).naas_abi_core.services.ServiceBase.ServiceBase(base class).
- No configuration is defined in this module; behavior is determined by the provided adapter.
Usage
from threading import Thread
from naas_abi_core.services.bus.BusService import BusService
from naas_abi_core.services.bus.BusPorts import IBusAdapter
class DummyAdapter(IBusAdapter):
def topic_publish(self, topic: str, routing_key: str, payload: bytes) -> None:
print("publish", topic, routing_key, payload)
def topic_consume(self, topic: str, routing_key: str, callback):
def run():
callback(b"example-message")
t = Thread(target=run, daemon=True)
t.start()
return t
bus = BusService(adapter=DummyAdapter())
bus.topic_publish("events", "user.created", b'{"id": 1}')
t = bus.topic_consume("events", "user.*", lambda b: print("received", b))
t.join()
Caveats
topic_consume(...)returns aThreadproduced by the adapter; thread lifecycle (daemon/non-daemon, stopping, error handling) is adapter-defined.payloadisbytes; any serialization/deserialization is the caller’s responsibility.