Skip to main content
Prefect allows you to trigger deployment runs based on events. However, when multiple events occur in quick succession, you may want a single flow run to handle all of them rather than spinning up a separate run for each event. For example, when multiple files are uploaded to an S3 bucket simultaneously, or a flurry of webhook events are received by Prefect, you typically want to process all files together in one run. This pattern is called debouncing, and you can implement it using reactive triggers with the within parameter of the trigger and the schedule_after parameter of the run-deployment action.

Why debounce events?

Automations fire in response to a single event and can only pass that event’s context to the triggered deployment. This creates a challenge when multiple events arrive rapidly:
  • Each event would trigger a separate flow run
  • Each run would only have context from one event
  • You’d have multiple runs processing related work simultaneously
Debouncing solves this by:
  • Preventing multiple flow runs from being created for rapid events
  • Scheduling a single run after a time window
  • Enabling a single run to process the work from all events in the burst
Key limitationAutomations can only pass the context from the triggering event to your deployment. Design your flows to query the source system directly (like listing S3 objects) rather than relying on individual event data.

Use case: Processing S3 file uploads

Consider a scenario where you have a webhook configured to receive S3 ObjectCreated events. When users upload five files in quick succession: Without debouncing: Five separate flow runs are triggered, one for each file. With debouncing: One flow run is triggered after all uploads complete, processing all five files together.

Implementing debouncing

Use a reactive trigger with matching within and schedule_after values:

Define in prefect.yaml

deployments:
  - name: process-s3-uploads
    entrypoint: flows/s3_processor.py:process_files
    work_pool:
      name: my-work-pool
    triggers:
      - type: event
        enabled: true
        match:
          prefect.resource.id:
            - "s3-bucket-name/*"
        expect:
          - "aws:s3:ObjectCreated:*"
        for_each:
          - "prefect.resource.id"
        posture: Reactive
        threshold: 1
        within: 60  # 60 seconds
        schedule_after: "PT1M"  # Wait 1 minute before running

Define in Python with .serve

from datetime import timedelta
from prefect import flow
from prefect.events import DeploymentEventTrigger


@flow(log_prints=True)
def process_files():
    """Process all files in the S3 bucket"""
    # Query S3 directly to find all files
    # Your flow logic here to list and process all files
    print("Processing all pending files...")


if __name__ == "__main__":
    process_files.serve(
        name="process-s3-uploads",
        triggers=[
            DeploymentEventTrigger(
                enabled=True,
                match={"prefect.resource.id": "s3-bucket-name/*"},
                expect=["aws:s3:ObjectCreated:*"],
                for_each=["prefect.resource.id"],
                posture="Reactive",
                threshold=1,
                within=60,  # 60 seconds
                schedule_after=timedelta(seconds=60),  # Wait 1 minute
            )
        ],
    )

How it works

When you configure a reactive trigger with both within and schedule_after:
  1. First event arrives: The automation fires and schedules a deployment run
  2. Additional events within the window: These events are recorded but don’t trigger additional runs
  3. Deployment runs after delay: By the time the run starts (after schedule_after), all events from the burst have occurred
  4. Flow processes everything: Your flow queries the source system and processes all available items
The within parameter implements eager debouncing: it fires immediately on the first event, then ignores subsequent events for the specified duration. The schedule_after parameter delays the actual flow run, ensuring all events in the burst have completed before processing begins. This implements late debouncing. Using both parameters together prevents duplicate runs while ensuring your flow has access to all events from the burst.
Matching time windowsSet within and schedule_after to the same value. This ensures the deployment run is scheduled after the debounce window closes, so all related work is visible to your flow when it runs.

Choosing the right time window

The appropriate time window depends on your use case:
  • Rapid API events: 30-60 seconds
  • Batch file uploads: 2-5 minutes
  • Large file transfers: 15-30 minutes
Test with your actual event patterns to find the optimal window.
Time format requirementsThe schedule_after parameter accepts:
  • ISO 8601 duration format: "PT1M" (1 minute), "PT30S" (30 seconds), "PT2H" (2 hours)
  • Integer seconds: 60 (60 seconds)
  • Python timedelta objects: timedelta(minutes=1) (in code)
The within parameter accepts integer seconds only.

Design flows for batch processing

Since automations can only pass one event’s context, design your flows to discover and process all available work:
import boto3
from prefect import flow


@flow(log_prints=True)
def process_s3_files(bucket_name: str = "my-bucket"):
    """Process all files in the pending prefix of an S3 bucket"""
    s3 = boto3.client('s3')

    # List all objects in the pending prefix
    response = s3.list_objects_v2(
        Bucket=bucket_name,
        Prefix='pending/'
    )

    files = response.get('Contents', [])
    print(f"Found {len(files)} files to process")

    # Process each file
    for file in files:
        key = file['Key']
        print(f"Processing {key}")

        # Your processing logic here
        # ...

        # Move to completed prefix
        s3.copy_object(
            Bucket=bucket_name,
            CopySource={'Bucket': bucket_name, 'Key': key},
            Key=key.replace('pending/', 'completed/')
        )
        s3.delete_object(Bucket=bucket_name, Key=key)

    print(f"Processed {len(files)} files")
Key design principles:
  • Query the source system directly rather than relying on event data
  • Process all available items, not just one
  • Use idempotent operations that can safely handle re-processing

Combining with concurrency limits

For additional control, combine debouncing with deployment concurrency limits to prevent overlapping runs:
deployments:
  - name: process-s3-uploads
    entrypoint: flows/s3_processor.py:process_files
    work_pool:
      name: my-work-pool
    concurrency_limit: 1
    concurrency_options:
      collision_strategy: CANCEL_NEW
    triggers:
      - type: event
        enabled: true
        match:
          prefect.resource.id:
            - "s3-bucket-name/*"
        expect:
          - "aws:s3:ObjectCreated:*"
        posture: Reactive
        threshold: 1
        within: 60
        schedule_after: "PT1M"
This ensures:
  • Only one run executes at a time
  • New runs are cancelled if one is already running
  • Events are debounced to prevent excessive run creation

What happens to subsequent events?

Events that arrive during the within window are still recorded in Prefect’s event system:
  • You can view them in the Event Feed
  • They can be queried at the start of the flow run
  • They’re tracked for audit and debugging purposes
  • They don’t trigger additional automation actions
The automation system recognizes these as part of the same event burst and doesn’t create additional runs.

Further reading