Skip to content

Commit 954e288

Browse files
committed
fix: Respect flush_queue_size in Workers.flush() to prevent payload size errors
1 parent 811e947 commit 954e288

File tree

3 files changed

+74
-6
lines changed

3 files changed

+74
-6
lines changed

src/amplitude/processor.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@ def process_response(self, res, events):
2222
self.callback(events, res.code, res.error)
2323
self.log(events, res.code, res.error)
2424
else:
25-
self.configuration._increase_flush_divider()
25+
# Only reduce if batch was at or below current limit
26+
# This prevents multiple reductions from same flush operation
27+
if len(events) <= self.configuration.flush_queue_size:
28+
self.configuration._increase_flush_divider()
2629
self.push_to_storage(events, 0, res)
2730
elif res.status == HttpStatus.INVALID_REQUEST:
2831
if res.error.startswith("Invalid API key:"):

src/amplitude/worker.py

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,51 @@ def stop(self):
3737
self.threads_pool.shutdown()
3838

3939
def flush(self):
40+
if not self.storage:
41+
return None
42+
4043
events = self.storage.pull_all()
41-
if events:
42-
return self.threads_pool.submit(self.send, events)
44+
45+
if not events:
46+
return None
47+
48+
batch_size = self.configuration.flush_queue_size
49+
batches = []
50+
for i in range(0, len(events), batch_size):
51+
batches.append(events[i:i + batch_size])
52+
53+
batch_futures = []
54+
for batch in batches:
55+
batch_futures.append(self.threads_pool.submit(self.send, batch))
56+
57+
if len(batch_futures) == 1:
58+
return batch_futures[0]
59+
60+
return self._create_combined_future(batch_futures)
61+
62+
def _create_combined_future(self, batch_futures):
63+
def wait_for_all():
64+
errors = []
65+
66+
for i, future in enumerate(batch_futures):
67+
try:
68+
future.result()
69+
except Exception as e:
70+
self.configuration.logger.error(
71+
f"Flush batch {i+1}/{len(batch_futures)} failed: {e}"
72+
)
73+
errors.append(e)
74+
75+
# If any batches failed, raise the first error
76+
if errors:
77+
raise errors[0]
78+
79+
self.configuration.logger.info(
80+
f"Flush completed: {len(batch_futures)} batches sent"
81+
)
82+
return None
83+
84+
return self.threads_pool.submit(wait_for_all)
4385

4486
def send(self, events):
4587
url = self.configuration.server_url

src/test/test_worker.py

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,20 @@ def test_worker_initialize_setup_success(self):
3838
self.assertIsNotNone(self.workers.response_processor)
3939

4040
def test_worker_stop_success(self):
41-
self.workers.storage.pull_all = MagicMock()
41+
for i in range(5):
42+
self.workers.storage.push(BaseEvent(f"event_{i}", "test_user"))
43+
44+
success_response = Response(HttpStatus.SUCCESS)
45+
HttpClient.post = MagicMock(return_value=success_response)
46+
4247
self.workers.stop()
4348
self.assertFalse(self.workers.is_active)
4449
self.assertTrue(self.workers.is_started)
45-
self.workers.storage.pull_all.assert_called_once()
50+
51+
# Verify storage was flushed (all events removed)
52+
self.assertEqual(0, self.workers.storage.total_events)
53+
self.assertEqual(0, len(self.workers.storage.ready_queue))
54+
self.assertEqual(0, len(self.workers.storage.buffer_data))
4655

4756
def test_worker_get_payload_success(self):
4857
events = [BaseEvent("test_event1", "test_user"), BaseEvent("test_event2", "test_user")]
@@ -150,11 +159,25 @@ def test_worker_send_events_with_payload_too_large_response_decrease_flush_queue
150159
success_response = Response(HttpStatus.SUCCESS)
151160
payload_too_large_response = Response(HttpStatus.PAYLOAD_TOO_LARGE)
152161
HttpClient.post = MagicMock()
153-
HttpClient.post.side_effect = [payload_too_large_response, payload_too_large_response, success_response]
162+
# First send gets PAYLOAD_TOO_LARGE (divider 1→2, size 30→15)
163+
# First flush sends 2 batches of 15:
164+
# - Batch 1 (15) fails: len(15) <= 15 → TRUE → increase (divider 2→3, size 15→10)
165+
# - Batch 2 (15) fails: len(15) <= 10 → FALSE → don't increase
166+
# Second flush sends 3 batches of 10, all succeed
167+
HttpClient.post.side_effect = [
168+
payload_too_large_response, # Initial send of 30 events
169+
payload_too_large_response, # First flush batch 1 (15 events)
170+
payload_too_large_response, # First flush batch 2 (15 events)
171+
success_response, # Second flush batch 1 (10 events)
172+
success_response, # Second flush batch 2 (10 events)
173+
success_response # Second flush batch 3 (10 events)
174+
]
154175
self.workers.configuration.flush_queue_size = 30
155176
self.workers.send(events)
156177
self.assertEqual(15, self.workers.configuration.flush_queue_size)
157178
self.workers.flush().result()
179+
# After first flush, only first batch increased divider (15 <= 15)
180+
# Second batch didn't (15 > 10), so divider only went 2→3
158181
self.assertEqual(10, self.workers.configuration.flush_queue_size)
159182
self.workers.flush().result()
160183
self.assertEqual(30, len(self.events_dict[200]))

0 commit comments

Comments
 (0)