Public-facing controller for managing a fleet of subscriber tasks.
Initializes the AsyncTaskManager.
Source code in fastpubsub/concurrency/manager.py
| def __init__(self) -> None:
"""Initializes the AsyncTaskManager."""
self._tasks: list[PubSubStreamingPullTask] = []
|
create_task
Registers a subscriber configuration to be managed.
Source code in fastpubsub/concurrency/manager.py
| def create_task(self, subscriber: Subscriber) -> None:
"""Registers a subscriber configuration to be managed."""
self._tasks.append(PubSubStreamingPullTask(subscriber))
|
start
async
Starts the subscribers tasks process using a task group.
Source code in fastpubsub/concurrency/manager.py
| async def start(self) -> None:
"""Starts the subscribers tasks process using a task group."""
for task in self._tasks:
await task.start()
|
alive
Checks if the tasks are alive.
| RETURNS |
DESCRIPTION |
dict[str, bool]
|
A dictionary mapping task names to their liveness status.
|
Source code in fastpubsub/concurrency/manager.py
| def alive(self) -> dict[str, bool]:
"""Checks if the tasks are alive.
Returns:
A dictionary mapping task names to their liveness status.
"""
liveness: dict[str, bool] = {}
for pull_task in self._tasks:
liveness[pull_task.subscriber.name] = pull_task.task_alive()
return liveness
|
ready
Checks if the tasks are ready.
| RETURNS |
DESCRIPTION |
dict[str, bool]
|
A dictionary mapping task names to their readiness status.
|
Source code in fastpubsub/concurrency/manager.py
| def ready(self) -> dict[str, bool]:
"""Checks if the tasks are ready.
Returns:
A dictionary mapping task names to their readiness status.
"""
readiness: dict[str, bool] = {}
for task in self._tasks:
readiness[task.subscriber.name] = task.task_ready()
return readiness
|
shutdown
async
Gracefully shuts down all tasks, waiting for message completion.
| PARAMETER |
DESCRIPTION |
timeout
|
Maximum time towait for in-flight messages
per subscription in seconds.
TYPE:
float
DEFAULT:
30.0
|
Source code in fastpubsub/concurrency/manager.py
| async def shutdown(self, timeout: float = 30.0) -> None:
"""Gracefully shuts down all tasks, waiting for message completion.
Args:
timeout: Maximum time towait for in-flight messages
per subscription in seconds.
"""
logger.info(f"Starting graceful shutdown with {timeout}s timeout...")
try:
async with asyncio.timeout(delay=timeout):
async with asyncio.TaskGroup() as tg:
for task in self._tasks:
if task.task_alive():
tg.create_task(task.shutdown(timeout=timeout))
except TimeoutError as e:
logger.warning(
f"A timeout happened while turning of a subscriber {e}"
)
finally:
self._tasks.clear()
await PubSubClientFactory.close_all()
|