Skip to content

FastPubSub class

fastpubsub.FastPubSub

FastPubSub(
    broker,
    *,
    on_startup=None,
    on_shutdown=None,
    after_startup=None,
    after_shutdown=None,
    liveness_url="/consumers/alive",
    readiness_url="/consumers/ready",
    **extras,
)

Bases: FastAPI, Application

A FastAPI integration application for managing Pub/Sub consumers.

Initializes the FastPubSub application.

PARAMETER DESCRIPTION
broker

The PubSubBroker instance.

TYPE: PubSubBroker

on_startup

A sequence of callables to run on startup.

TYPE: Sequence[NoArgAsyncCallable] | None DEFAULT: None

on_shutdown

A sequence of callables to run on shutdown.

TYPE: Sequence[NoArgAsyncCallable] | None DEFAULT: None

after_startup

A sequence of callables to run after startup.

TYPE: Sequence[NoArgAsyncCallable] | None DEFAULT: None

after_shutdown

A sequence of callables to run after shutdown.

TYPE: Sequence[NoArgAsyncCallable] | None DEFAULT: None

liveness_url

A url path for the readiness endpoint.

TYPE: str DEFAULT: '/consumers/alive'

readiness_url

A url path for the readiness endpoint.

TYPE: str DEFAULT: '/consumers/ready'

**extras

Extra arguments to pass to the FastAPI constructor.

TYPE: Any DEFAULT: {}

Source code in fastpubsub/applications.py
def __init__(
    self,
    broker: PubSubBroker,
    *,
    on_startup: Sequence[NoArgAsyncCallable] | None = None,
    on_shutdown: Sequence[NoArgAsyncCallable] | None = None,
    after_startup: Sequence[NoArgAsyncCallable] | None = None,
    after_shutdown: Sequence[NoArgAsyncCallable] | None = None,
    liveness_url: str = "/consumers/alive",
    readiness_url: str = "/consumers/ready",
    **extras: Any,
):
    """Initializes the FastPubSub application.

    Args:
        broker: The PubSubBroker instance.
        on_startup: A sequence of callables to run on startup.
        on_shutdown: A sequence of callables to run on shutdown.
        after_startup: A sequence of callables to run after startup.
        after_shutdown: A sequence of callables to run after shutdown.
        liveness_url: A url path for the readiness endpoint.
        readiness_url: A url path for the readiness endpoint.
        **extras: Extra arguments to pass to the FastAPI constructor.
    """
    self.lifespan_context = extras.pop("lifespan", None)
    super().__init__(
        lifespan=self._run,
        **extras,
    )

    super(Starlette, self).__init__(
        broker,
        on_startup=on_startup,
        on_shutdown=on_shutdown,
        after_startup=after_startup,
        after_shutdown=after_shutdown,
    )

    self.add_api_route(path=liveness_url, endpoint=self._get_liveness, methods=["GET"])
    self.add_api_route(path=readiness_url, endpoint=self._get_readiness, methods=["GET"])

broker instance-attribute

broker = broker

lifespan_context instance-attribute

lifespan_context = pop('lifespan', None)

on_startup

on_startup(func)

Decorator to register a function to run on startup.

PARAMETER DESCRIPTION
func

The function to run on startup.

TYPE: NoArgAsyncCallable

RETURNS DESCRIPTION
NoArgAsyncCallable

The decorated function.

Source code in fastpubsub/applications.py
@validate_call(config=ConfigDict(strict=True))
def on_startup(self, func: NoArgAsyncCallable) -> NoArgAsyncCallable:
    """Decorator to register a function to run on startup.

    Args:
        func: The function to run on startup.

    Returns:
        The decorated function.
    """
    ensure_async_callable_function(func)
    self._on_startup.append(func)
    return func

on_shutdown

on_shutdown(func)

Decorator to register a function to run on shutdown.

PARAMETER DESCRIPTION
func

The function to run on shutdown.

TYPE: NoArgAsyncCallable

RETURNS DESCRIPTION
NoArgAsyncCallable

The decorated function.

Source code in fastpubsub/applications.py
@validate_call(config=ConfigDict(strict=True))
def on_shutdown(self, func: NoArgAsyncCallable) -> NoArgAsyncCallable:
    """Decorator to register a function to run on shutdown.

    Args:
        func: The function to run on shutdown.

    Returns:
        The decorated function.
    """
    ensure_async_callable_function(func)
    self._on_shutdown.append(func)
    return func

after_startup

after_startup(func)

Decorator to register a function to run after startup.

PARAMETER DESCRIPTION
func

The function to run after startup.

TYPE: NoArgAsyncCallable

RETURNS DESCRIPTION
NoArgAsyncCallable

The decorated function.

Source code in fastpubsub/applications.py
@validate_call(config=ConfigDict(strict=True))
def after_startup(self, func: NoArgAsyncCallable) -> NoArgAsyncCallable:
    """Decorator to register a function to run after startup.

    Args:
        func: The function to run after startup.

    Returns:
        The decorated function.
    """
    ensure_async_callable_function(func)
    self._after_startup.append(func)
    return func

after_shutdown

after_shutdown(func)

Decorator to register a function to run after shutdown.

PARAMETER DESCRIPTION
func

The function to run after shutdown.

TYPE: NoArgAsyncCallable

RETURNS DESCRIPTION
NoArgAsyncCallable

The decorated function.

Source code in fastpubsub/applications.py
@validate_call(config=ConfigDict(strict=True))
def after_shutdown(self, func: NoArgAsyncCallable) -> NoArgAsyncCallable:
    """Decorator to register a function to run after shutdown.

    Args:
        func: The function to run after shutdown.

    Returns:
        The decorated function.
    """
    ensure_async_callable_function(func)
    self._after_shutdown.append(func)
    return func