Skip to main content

PythonQueueAdapter

What it is

  • A process-local message bus adapter built on Python stdlib queue.Queue.
  • Supports topic + routing key filtering with wildcard matching:
    • * matches exactly one dot-separated segment
    • # matches zero or more segments

Public API

  • class PythonQueueAdapter(IBusAdapter)
    • __init__() -> None
      • Initializes internal subscriber registry and lock.
    • topic_publish(topic: str, routing_key: str, payload: bytes) -> None
      • Publishes payload to all subscribers whose topic pattern matches topic and whose routing_key pattern matches routing_key.
    • topic_consume(topic: str, routing_key: str, callback: Callable[[bytes], None]) -> Thread
      • Registers a subscriber for a topic and routing_key pattern.
      • Starts a daemon thread that pulls messages from an internal queue and invokes callback(payload).
      • Returns the started Thread.
    • _match_routing_key(pattern: str, routing_key: str) -> bool (static)
      • Implements dot-separated wildcard matching for topics and routing keys.

Configuration/Dependencies

  • No external dependencies; uses stdlib:
    • queue.Queue, threading.Thread, threading.RLock
  • Depends on naas_abi_core.services.bus.BusPorts.IBusAdapter for interface conformance.

Usage

from naas_abi_core.services.bus.adapters.secondary.PythonQueueAdapter import PythonQueueAdapter

bus = PythonQueueAdapter()

def on_message(payload: bytes) -> None:
print("got:", payload.decode())

# Subscribe to topic "orders" and routing keys like "created.eu", "created.us", etc.
bus.topic_consume(topic="orders", routing_key="created.*", callback=on_message)

bus.topic_publish(topic="orders", routing_key="created.eu", payload=b"order-123")

Caveats

  • In-process only: no inter-process or network transport.
  • topic_consume(...) runs callbacks on a daemon thread.
  • To stop a consumer, the callback must raise StopIteration (breaks the consume loop).
  • If the callback raises any other exception:
    • the payload is re-queued for that subscriber
    • the consumer thread then raises and exits (message may be redelivered if a new consumer is created).