Skip to content

Performance Tuning

Performance tuning in FastPubSub is the process of matching subscription and runtime parameters to workload shape. The objective is not maximum raw throughput in isolation, but stable throughput with predictable latency and bounded resource usage.

Control Surface Overview

Parameter Scope Primary Effect
max_messages Subscriber client-side Concurrency and memory pressure
ack_deadline_seconds Subscription server-side Processing window before redelivery
min_backoff_delay_secs / max_backoff_delay_secs Subscription server-side Retry pacing under failure
max_delivery_attempts + dead_letter_topic Subscription server-side Bounded failure handling
shutdown_timeout Broker runtime Graceful drain on shutdown

Tuning Workflow

Apply tuning in a controlled loop:

  1. Measure baseline metrics (latency, throughput, failure rates, memory).
  2. Change one major parameter group at a time.
  3. Re-run under representative load.
  4. Keep changes that improve SLOs and rollback regressions.

Concurrency with max_messages

max_messages sets the upper bound of in-flight messages per subscriber task.

@broker.subscriber(
    alias="high-throughput",
    topic_name="high-events",
    subscription_name="high-events-subscription",
    max_messages=500,
)
async def high_throughput_handler(message: Message):
    await fast_async_operation(message.data)

A compact load-probe setup for this parameter:

@broker.subscriber(
    "test-alias",
    topic_name="test-topic",
    subscription_name="test-basic-subscription",
    max_messages=MAX_MESSAGES,
)
async def process_message(message: Message) -> None:
    logger.info(f"Processed message: {message}")
    value = random.randint(1, 5)
    await asyncio.sleep(value)



@app.after_startup
async def test_publish() -> None:
    async with TaskGroup() as tg:
        for _ in range(MAX_MESSAGES * 5):
            tg.create_task(broker.publish("test-topic", "hi!"))

Workload-Specific Profiles

I/O-Bound Workloads

# High concurrency for I/O-bound tasks (API calls, database queries)
@broker.subscriber(
    alias="api-caller",
    topic_name="api-requests",
    subscription_name="api-requests-subscription",
    max_messages=500,  # High - most time is spent waiting
)
async def call_external_api(message: Message):
    await http_client.post("/api/endpoint", json={"data": message.data})

CPU-Bound Workloads

# Low concurrency for CPU-bound tasks
@broker.subscriber(
    alias="data-processor",
    topic_name="processing-jobs",
    subscription_name="processing-subscription",
    max_messages=10,  # Low - use multiple workers instead
)
async def process_data(message: Message):
    result = compute_heavy_operation(message.data)
    await save_result(result)

Rate-Limited Dependencies

# Match the API rate limit
@broker.subscriber(
    alias="rate-limited-api",
    topic_name="rate-limited-requests",
    subscription_name="rate-limited-subscription",
    max_messages=50,  # Match API's rate limit
)
async def call_rate_limited_api(message: Message):
    await rate_limited_client.call(message.data)

Practical Heuristics

Workload Typical max_messages Range Rationale
Async I/O dominant 100-1000 Waiting time dominates, concurrency pays off
DB-bound 50-200 Align with pool limits and transaction pressure
CPU-heavy 10-50 Prefer process scaling over coroutine fan-out
External API quotas Depends on quota Prevent downstream throttling cascades

Acknowledgment Deadline

ack_deadline_seconds must exceed expected processing time with safety margin.

@broker.subscriber(
    alias="slow-processor",
    topic_name="heavy-tasks",
    subscription_name="heavy-tasks-subscription",
    ack_deadline_seconds=600,
    max_messages=10,
)
async def slow_handler(message: Message):
    await complex_ml_inference(message.data)

If configured too low, valid in-flight work can be redelivered before completion, creating duplicate processing pressure.

Expected Processing Time Suggested ack_deadline_seconds
< 10 seconds 30
10-60 seconds 60
1-5 minutes 300
5-10 minutes 600

Upper Bound

Pub/Sub caps acknowledgment deadline at 600 seconds. Workloads beyond this window should be decomposed into smaller steps.

Retry Backoff and Failure Pacing

Tune retry policy to the failure mode.

@broker.subscriber(
    alias="api-with-backoff",
    topic_name="api-calls",
    subscription_name="api-calls-subscription",
    min_backoff_delay_secs=10,
    max_backoff_delay_secs=600,
    max_delivery_attempts=10,
    dead_letter_topic="api-calls-dlq",
)
async def call_api_with_backoff(message: Message):
    await external_service.process(message.data)

Short Backoff for Transient Instability

# Short backoff for transient issues (network blips)
@broker.subscriber(
    alias="network-sensitive",
    topic_name="transient-events",
    subscription_name="transient-events-subscription",
    min_backoff_delay_secs=5,
    max_backoff_delay_secs=60,
    max_delivery_attempts=5,
)
async def handle_transient_event(message: Message):
    await send_notification(message.data)

Long Backoff for External Outages

# Longer backoff for external service outages
@broker.subscriber(
    alias="external-api",
    topic_name="external-api-calls",
    subscription_name="external-api-subscription",
    min_backoff_delay_secs=30,
    max_backoff_delay_secs=600,
    max_delivery_attempts=10,
)
async def call_external_service(message: Message):
    await external_service.process(message.data)

Longer backoff reduces retry storms and protects downstream dependencies during partial outages.

Multi-Process Scaling

For CPU-constrained workloads, scale workers before inflating max_messages.

fastpubsub run myapp:app --workers 4

Effective theoretical concurrency is:

workers x max_messages

Monitor total memory because each worker process has independent state.

Integrated Configuration Example

tuned_broker = PubSubBroker(
    project_id="fastpubsub-pubsub-local",
    shutdown_timeout=30.0,
    middlewares=[Middleware(GZipMiddleware, compresslevel=6)],
)
tuned_app = FastPubSub(tuned_broker)


@tuned_broker.subscriber(
    alias="optimized-processor",
    topic_name="optimized-events",
    subscription_name="optimized-events-subscription",
    # Concurrency
    max_messages=200,
    # Timeouts
    ack_deadline_seconds=120,
    # Retry policy
    min_backoff_delay_secs=10,
    max_backoff_delay_secs=300,
    max_delivery_attempts=5,
    # Error handling
    dead_letter_topic="events-dlq",
    autocreate=True,
)
async def process_optimized_event(message: Message):
    await handle_event(message.data)

This profile combines graceful shutdown, GZipMiddleware, bounded retries, and dead-letter isolation.

Metrics to Observe During Tuning

Track these continuously while iterating:

  • Throughput (messages/s).
  • Latency distribution (p50, p95, p99).
  • Retry rate and retry burst shape.
  • Dead-letter ingress count.
  • Worker CPU and memory saturation.

Validation Approach

For behavior-level checks, PubSubTestClient can validate handler correctness. For performance conclusions, rely on controlled load tests because in-memory tests do not represent broker/network pressure.

Common Failure Modes

  • Increasing max_messages without observing memory growth.
  • Setting acknowledgment deadline below realistic processing time.
  • Applying CPU-bound profile to I/O-bound workload (or inverse).
  • Using aggressive retry with no dead-letter strategy.

Recap

  • Tune for workload characteristics, not generic defaults.
  • Start with max_messages, then align deadlines and retry policy.
  • Use dead-letter and backoff to bound and shape failures.
  • Scale by workers for CPU pressure, not coroutine count alone.
  • Validate behavior with tests and validate performance with load measurement.