PythonQueueAdapter
What it is
- A process-local bus adapter implementing
IBusAdapterusing SQLite as a durable queue. - Provides at-least-once delivery:
- Messages are persisted before
topic_publishreturns. - Messages can be replayed after restart when using a file-backed SQLite database.
- Messages are persisted before
Public API
class PythonQueueAdapter(IBusAdapter)__init__(persistence_path: str | None = None, journal_mode: str = "WAL", busy_timeout_ms: int = 5000, poll_interval_seconds: float = 0.05, lock_timeout_seconds: float = 1.0)- Creates/opens a SQLite database and ensures the
bus_messagestable exists. persistence_path=Noneuses a shared in-memory SQLite URI.
- Creates/opens a SQLite database and ensures the
topic_publish(topic: str, routing_key: str, payload: bytes) -> None- Persists a message to the queue.
topic_consume(topic: str, routing_key: str, callback: Callable[[bytes], None]) -> threading.Thread- Starts a daemon thread that polls for matching messages and invokes
callback(payload). - Acknowledges messages on successful callback completion.
- If
callbackraisesStopIteration, the message is acknowledged and the consumer stops.
- Starts a daemon thread that polls for matching messages and invokes
Configuration/Dependencies
- Dependencies:
- Standard library:
sqlite3,threading,time,os. naas_abi_core.services.bus.BusPorts.IBusAdapter.
- Standard library:
- SQLite pragmas configured on init:
busy_timeout(frombusy_timeout_ms)journal_mode(default"WAL")synchronous=NORMAL
- Persistence:
persistence_path=None→ shared in-memory DB (file:python_queue_adapter?mode=memory&cache=shared,uri=True).persistence_path="path/to/db.sqlite"→ file-backed DB; parent directory is created if needed.
Usage
import time
from naas_abi_core.services.bus.adapters.secondary.PythonQueueAdapter import PythonQueueAdapter
adapter = PythonQueueAdapter(persistence_path="queue.sqlite")
def on_message(payload: bytes) -> None:
print("got:", payload.decode())
adapter.topic_consume(topic="orders.*", routing_key="created.#", callback=on_message)
adapter.topic_publish(topic="orders.eu", routing_key="created.web", payload=b"hello")
time.sleep(0.2) # allow background consumer to process
Caveats
- Delivery is at-least-once:
- If the callback raises an exception (other than
StopIteration), the message is released (unlock) and will be retried; the consumer thread re-raises the exception.
- If the callback raises an exception (other than
- Routing key matching supports:
*matches exactly one dot-separated token.#matches zero or more tokens (when at end, matches the remainder).
topic_consume(...)returns a daemonThreadbut does not expose the internal stop event; stopping is only supported by raisingStopIterationinside the callback.