Skip to content

Conversation

@sojingle
Copy link
Contributor

@sojingle sojingle commented Nov 19, 2025

Summary

Fix: Respect flush_queue_size in Worker.flush() to prevent payload size errors

Problem

The current flush() implementation uses pull_all() and sends all events in a single HTTP request, completely ignoring the flush_queue_size configuration. This causes:

  1. Payload Too Large errors when flushing large numbers of events
  2. Wasted retry overhead as events are re-queued and retried
  3. Inefficient use of adaptive batching - the SDK has logic to reduce flush_queue_size when payloads are too large, but flush() bypassed this entirely

Solution

Thanks to @dsaxton-1password for raising this issue and submitting the PR #63. Based on that, I made further optimizations and fixed some problems caused by the modification.

  1. Modified Worker.flush() to batch events
    Using pull_all to fetch all the data first and then splitting it can reduce the number and duration of locks.
  2. Added _create_combined_future() helper
    Changing the return type of flush to a Feature list would be an API change, so this helper is added to merge them back into a single feature.
  3. Fixed exponential flush_queue_size reduction
    When multiple batches from the same flush() fail, the old code would reduce the divider for EVERY failure, causing exponential over-reduction.

Checklist

  • Does your PR title have the correct title format?
  • Does your PR have a breaking change?: no

Note

Batch flushes by flush_queue_size with a combined future and guard flush-divider increases on payload-too-large; update tests accordingly.

  • Worker (src/amplitude/worker.py):
    • Batch flush() by configuration.flush_queue_size, submitting each batch to the thread pool.
    • Return a single future via _create_combined_future() that waits for all batch futures, logs failures, and raises on error; early-return when no storage or no events.
  • Processor (src/amplitude/processor.py):
    • On PAYLOAD_TOO_LARGE, only call configuration._increase_flush_divider() when len(events) <= configuration.flush_queue_size to avoid multiple reductions; re-queue events as before.
  • Tests (src/test/test_worker.py):
    • Update stop to verify storage is flushed and HTTP sends occur.
    • Add/adjust tests for batched flush() and guarded divider increase under PAYLOAD_TOO_LARGE.

Written by Cursor Bugbot for commit 954e288. Configure here.

@sojingle sojingle requested review from a team, Mercy811 and crleona November 19, 2025 00:47
@Mercy811
Copy link
Contributor

bugbot run

@cursor
Copy link

cursor bot commented Nov 19, 2025

Skipping Bugbot: Bugbot is disabled for this repository

Copy link
Contributor

@Mercy811 Mercy811 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @sojingle, LGTM! Let's wait for bugbot run. I'm requesting access at https://amplitude.slack.com/archives/C08PQE45N68/p1763513616629679

@Mercy811
Copy link
Contributor

bugbot run

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment @cursor review or bugbot run to trigger another review on this PR

Copy link

@igor-amp igor-amp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@sojingle
Copy link
Contributor Author

Hi @dsaxton-1password , are you able to test this branch in a real environment? If testing is difficult, I will just proceed with the release.

@dsaxton-1password
Copy link

Hi @dsaxton-1password , are you able to test this branch in a real environment? If testing is difficult, I will just proceed with the release.

I could try, but it would likely take a while to actually trigger the issue (it has been happening maybe once a week) and you may not want to wait that long to release.

If you're confident this doesn't introduce breaking changes I would go ahead and release it and we can start running the new version in a staging environment (and then in production if everything runs smoothly).

@sojingle sojingle merged commit d9accf0 into main Nov 19, 2025
10 checks passed
@sojingle sojingle deleted the flush_with_size_limit branch November 19, 2025 18:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants