Workers
What it is
A small threaded worker pool utility for running Python callables asynchronously via Job objects, tracking status, and optionally collecting completed jobs via a results Queue.
Public API
-
JobStatus (Enum)PENDING,RUNNING,COMPLETED,FAILED,RESULT_FETCHED
-
class JobJob(queue: Optional[Queue], func: Callable, *args, **kwargs)- Creates a unit of work with a UUID
id, status tracking, and optional completion-queue push.
- Creates a unit of work with a UUID
execute()- Runs the callable, sets status, captures result or exception, signals completion, and pushes itself to
queueif set.
- Runs the callable, sets status, captures result or exception, signals completion, and pushes itself to
wait(timeout: Optional[float] = None) -> bool- Blocks until completion (or timeout). Returns
Trueif completed.
- Blocks until completion (or timeout). Returns
is_completed() -> bool- Returns
Trueif status isCOMPLETEDorFAILED.
- Returns
get_result()- Marks status as
RESULT_FETCHED, re-raises captured exception if any, otherwise returns the result.
- Marks status as
-
class WorkerPoolWorkerPool(num_workers: int)- Starts
num_workersdaemon threads consuming jobs from an internal queue.
- Starts
submit(job: Job)- Enqueues a single job for execution.
submit_all(jobs: List[Job]) -> Queue[Job]- Enqueues multiple jobs; assigns a shared results queue to jobs whose
job.queueisNone. Returns that queue.
- Enqueues multiple jobs; assigns a shared results queue to jobs whose
shutdown()- Signals workers to stop and joins their threads.
Configuration/Dependencies
- Uses standard library:
threading,queue,uuid,enum,typing. - Logs failures and shutdown debug messages via
naas_abi_core.logger.
Usage
from queue import Empty
from naas_abi_core.utils.Workers import Job, WorkerPool
def add(a, b):
return a + b
pool = WorkerPool(num_workers=2)
# Submit multiple jobs and collect them from the returned results queue
jobs = [Job(None, add, i, i) for i in range(3)]
results_q = pool.submit_all(jobs)
completed = []
while len(completed) < len(jobs):
try:
job = results_q.get(timeout=2)
completed.append((job.id, job.get_result()))
except Empty:
break
print(completed)
pool.shutdown()Caveats
- Worker threads are daemon threads; call
shutdown()to join them cleanly. submit_all()only assigns the returned results queue to jobs wherejob.queue is None; jobs constructed with an existingqueuewill push completion to that queue instead.Job.get_result()updates status toRESULT_FETCHEDeven if the job later raises its captured exception.