Skip to content

AsyncTaskManager class

fastpubsub.concurrency.manager.AsyncTaskManager

AsyncTaskManager()

Public-facing controller for managing a fleet of subscriber tasks.

Initializes the AsyncTaskManager.

Source code in fastpubsub/concurrency/manager.py
def __init__(self) -> None:
    """Initializes the AsyncTaskManager."""
    self._tasks: list[PubSubStreamingPullTask] = []

create_task

create_task(subscriber)

Registers a subscriber configuration to be managed.

Source code in fastpubsub/concurrency/manager.py
def create_task(self, subscriber: Subscriber) -> None:
    """Registers a subscriber configuration to be managed."""
    self._tasks.append(PubSubStreamingPullTask(subscriber))

start async

start()

Starts the subscribers tasks process using a task group.

Source code in fastpubsub/concurrency/manager.py
async def start(self) -> None:
    """Starts the subscribers tasks process using a task group."""
    for task in self._tasks:
        await task.start()

alive

alive()

Checks if the tasks are alive.

RETURNS DESCRIPTION
dict[str, bool]

A dictionary mapping task names to their liveness status.

Source code in fastpubsub/concurrency/manager.py
def alive(self) -> dict[str, bool]:
    """Checks if the tasks are alive.

    Returns:
        A dictionary mapping task names to their liveness status.
    """
    liveness: dict[str, bool] = {}
    for pull_task in self._tasks:
        liveness[pull_task.subscriber.name] = pull_task.task_alive()
    return liveness

ready

ready()

Checks if the tasks are ready.

RETURNS DESCRIPTION
dict[str, bool]

A dictionary mapping task names to their readiness status.

Source code in fastpubsub/concurrency/manager.py
def ready(self) -> dict[str, bool]:
    """Checks if the tasks are ready.

    Returns:
        A dictionary mapping task names to their readiness status.
    """
    readiness: dict[str, bool] = {}
    for task in self._tasks:
        readiness[task.subscriber.name] = task.task_ready()
    return readiness

shutdown async

shutdown(timeout=30.0)

Gracefully shuts down all tasks, waiting for message completion.

PARAMETER DESCRIPTION
timeout

Maximum time towait for in-flight messages per subscription in seconds.

TYPE: float DEFAULT: 30.0

Source code in fastpubsub/concurrency/manager.py
async def shutdown(self, timeout: float = 30.0) -> None:
    """Gracefully shuts down all tasks, waiting for message completion.

    Args:
        timeout: Maximum time towait for in-flight messages
            per subscription in seconds.
    """
    logger.info(f"Starting graceful shutdown with {timeout}s timeout...")

    try:
        async with asyncio.timeout(delay=timeout):
            async with asyncio.TaskGroup() as tg:
                for task in self._tasks:
                    if task.task_alive():
                        tg.create_task(task.shutdown(timeout=timeout))
    except TimeoutError as e:
        logger.warning(
            f"A timeout happened while turning of a subscriber {e}"
        )
    finally:
        self._tasks.clear()
        await PubSubClientFactory.close_all()