Skip to content

Concurrency Utils

fastpubsub.concurrency.utils.apply_async async

apply_async(func, *args, **kwargs)

Transforms a blocking sync callable into a async callable.

PARAMETER DESCRIPTION
func

The sync callable to be transformed.

TYPE: Callable[P, T]

*args

The positional arguments used on the callable.

TYPE: args DEFAULT: ()

**kwargs

The keyword arguments used on the callable.

TYPE: kwargs DEFAULT: {}

RETURNS DESCRIPTION
T

The same return of the callable but after awaiting for

T

its computation.

Source code in fastpubsub/concurrency/utils.py
async def apply_async(
    func: Callable[P, T], *args: P.args, **kwargs: P.kwargs
) -> T:
    """Transforms a blocking sync callable into a async callable.

    Args:
        func: The sync callable to be transformed.
        *args: The positional arguments used on the callable.
        **kwargs: The keyword arguments used on the callable.

    Returns:
        The same return of the callable but after awaiting for
        its computation.
    """
    func = functools.partial(func, *args, **kwargs)
    return await anyio.to_thread.run_sync(func, abandon_on_cancel=False)

fastpubsub.concurrency.utils.ensure_async_middleware

ensure_async_middleware(middleware)

Ensures that a middleware is an async middleware.

PARAMETER DESCRIPTION
middleware

The middleware to check.

TYPE: type[BaseMiddleware]

Source code in fastpubsub/concurrency/utils.py
def ensure_async_middleware(middleware: type["BaseMiddleware"]) -> None:
    """Ensures that a middleware is an async middleware.

    Args:
        middleware: The middleware to check.
    """
    from fastpubsub.middlewares.base import BaseMiddleware

    if not issubclass(middleware, BaseMiddleware):
        raise TypeError(
            f"The object {middleware} must be a {BaseMiddleware.__name__}."
        )

    if not inspect.iscoroutinefunction(middleware.on_message):
        raise TypeError(
            f"The on_message method must be async on {middleware}."
        )

    if not inspect.iscoroutinefunction(middleware.on_publish):
        raise TypeError(
            f"The on_publish method must be async on {middleware}."
        )

fastpubsub.concurrency.utils.ensure_async_callable_function

ensure_async_callable_function(callable_object)

Ensures that a callable is an async function.

PARAMETER DESCRIPTION
callable_object

The callable to check.

TYPE: Callable[[], Any] | AsyncCallable | AsyncDecoratedCallable

Source code in fastpubsub/concurrency/utils.py
def ensure_async_callable_function(
    callable_object: Callable[[], Any]
    | AsyncCallable
    | AsyncDecoratedCallable,
) -> None:
    """Ensures that a callable is an async function.

    Args:
        callable_object: The callable to check.
    """
    if not isinstance(callable_object, FunctionType):
        raise TypeError(
            f"The object must be a function type but it is {callable_object}."
        )

    if not inspect.iscoroutinefunction(callable_object):
        raise TypeError(f"The function {callable_object} must be async.")