Skip to content

Message Filtering

Server-side filtering allows Pub/Sub to deliver only messages that match declarative attribute expressions. For high-volume topics, this is one of the most effective controls for reducing unnecessary subscriber load.

FastPubSub maps this behavior through the subscriber parameter filter_expression.

Conceptual Model

Filtering is evaluated by Pub/Sub before message delivery to your application process. This has two direct effects:

  • Lower application-side compute, because unmatched messages are never pulled.
  • Clearer subscriber intent, because routing logic is encoded in subscription configuration.
flowchart LR
    A[Published message + attributes] --> B{Pub/Sub filter expression}
    B -->|Match| C[Deliver to subscription]
    B -->|No match| D[Not delivered to subscription]

Baseline Configuration

@broker.subscriber(
    alias="order-handler",
    topic_name="events",
    subscription_name="order-events-subscription",
    filter_expression='attributes.event_type = "order"',
    autocreate=True,
)
async def handle_orders(message: Message):
    # Only receives messages where event_type = "order"
    order_data = message.data
    await process_order(order_data)

The expression syntax follows Pub/Sub filter rules and is applied per subscription.

Attribute Discipline

Filtering only works if publishers consistently provide attributes.

@app.after_startup
async def publish_with_broker() -> None:
    await broker.publish(
        topic_name="events",
        data={"user_id": "123", "action": "login"},
        attributes={"event_type": "user_login", "priority": "high"},
    )

Engineering Implication

Define and version an attribute contract in the same way you define payload schemas. Inconsistent attribute naming is a common source of silent routing failures.

Expression Patterns

Boolean Conjunction (AND)

FILTER_EXP_PREMIUM = (
    'attributes.priority = "high" AND attributes.customer_tier = "premium"'
)


@broker.subscriber(
    alias="premium-urgent",
    topic_name="tickets",
    subscription_name="premium-urgent-subscription",
    filter_expression=FILTER_EXP_PREMIUM,
)
async def handle_premium_urgent(message: Message):
    # Only receives high-priority tickets from premium customers
    await escalate_to_senior_support(message.data)

Boolean Disjunction (OR)

FILTER_EXP_ALERTS = (
    'attributes.severity = "critical" OR attributes.severity = "high"'
)


@broker.subscriber(
    alias="critical-alerts",
    topic_name="alerts",
    subscription_name="critical-alerts-subscription",
    filter_expression=FILTER_EXP_ALERTS,
)
async def handle_critical_alerts(message: Message):
    # Receives both critical and high severity alerts
    await page_on_call_engineer(message.data)

Attribute Presence

@broker.subscriber(
    alias="labeled-handler",
    topic_name="labeled-events",
    subscription_name="labeled-subscription",
    filter_expression='hasPrefix(attributes.label, "")',
)
async def handle_labeled(message: Message):
    # Receives any message that has a "label" attribute
    pass

Multi-Subscriber Fan-Out by Filter

A standard architecture is one topic with multiple subscriptions, each owning a filter.

# Handler for order events
@broker.subscriber(
    alias="order-events-handler",
    topic_name="multi-events",
    subscription_name="order-events-sub",
    filter_expression='attributes.event_type = "order"',
)
async def handle_order_events(message: Message):
    await process_order(message.data)


# Handler for user events
@broker.subscriber(
    alias="user-handler",
    topic_name="multi-events",
    subscription_name="user-events-sub",
    filter_expression='attributes.event_type = "user"',
)
async def handle_users(message: Message):
    await process_user_event(message.data)


# Handler for ALL events (no filter)
@broker.subscriber(
    alias="audit-handler",
    topic_name="multi-events",
    subscription_name="audit-sub",
)
async def audit_all_events(message: Message):
    await log_to_audit_trail(message.data)

This pattern keeps publisher logic simple while allowing independent consumer pipelines.

Comparison Semantics

Operator Meaning Example
= Equality attributes.type = "order"
!= Inequality attributes.status != "cancelled"
> Lexicographic greater than attributes.priority > "5"
< Lexicographic less than attributes.priority < "5"
>= Lexicographic greater/equal attributes.level >= "warn"
<= Lexicographic less/equal attributes.level <= "warn"

Attribute Values Are Strings

Pub/Sub attributes are string-valued. Numeric-looking expressions are still string comparisons unless you normalize values (for example, left-pad numeric strings).

Validation with PubSubTestClient

Filter behavior can be validated in tests by asserting which handlers produced results.

@pytest.mark.asyncio
async def test_filter_expression_routes_expected_messages() -> None:
    async with PubSubTestClient(broker) as client:
        await client.publish(
            {"order_id": "ord-1"},
            topic="events",
            attributes={"event_type": "order"},
        )
        await client.publish(
            {"user_id": "usr-1"},
            topic="events",
            attributes={"event_type": "user"},
        )

        results = client.get_results()

    assert len(results) == 1
    assert results[0].message.attributes["event_type"] == "order"

Reference application fixture:

broker = PubSubBroker(project_id="test-project")
app = FastPubSub(broker)


@broker.subscriber(
    alias="order-handler",
    topic_name="events",
    subscription_name="order-sub",
    filter_expression='attributes.event_type = "order"',
)
async def handle_orders(message: Message) -> str:
    return message.data.decode("utf-8")

Design Recommendations

Use Stable Attribute Names

Maintain a small shared vocabulary (for example, event_type, source, tenant_id, priority). Avoid synonyms for the same business concept.

Keep Expressions Readable

If a filter becomes difficult to reason about, split responsibilities into separate subscriptions. Operationally, two clear filters are preferable to one opaque filter.

Include an Audit Stream When Needed

For compliance or incident analysis, maintain one unfiltered subscriber that captures all traffic for archival. Do this only when volume and retention cost are acceptable.

Common Failure Modes

  • Publishing payload-only messages without required attributes.
  • Drifting attribute conventions between producer teams.
  • Assuming numeric comparison semantics on string values.
  • Modifying filters in production without validating impact on downstream consumers.

Recap

  • filter_expression enables server-side selective delivery.
  • Publisher attributes are mandatory inputs for filter correctness.
  • Use simple, explicit expressions and stable attribute contracts.
  • Validate routing behavior early with PubSubTestClient.
  • Treat filtering as a first-class part of subscription design, not a late optimization.