Skip to content

Commit d9accf0

Browse files
authored
fix: Respect flush_queue_size in Worker.flush() (#64)
1 parent 811e947 commit d9accf0

File tree

3 files changed

+67
-6
lines changed

3 files changed

+67
-6
lines changed

src/amplitude/processor.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,11 @@ def process_response(self, res, events):
2222
self.callback(events, res.code, res.error)
2323
self.log(events, res.code, res.error)
2424
else:
25-
self.configuration._increase_flush_divider()
25+
# Reduce only if batch didn't exceed current limit (was expected to work).
26+
# Batches larger than limit are from old limits already deemed too large,
27+
# so failing again doesn't provide new information - skip to avoid over-reduction.
28+
if len(events) <= self.configuration.flush_queue_size:
29+
self.configuration._increase_flush_divider()
2630
self.push_to_storage(events, 0, res)
2731
elif res.status == HttpStatus.INVALID_REQUEST:
2832
if res.error.startswith("Invalid API key:"):

src/amplitude/worker.py

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,43 @@ def stop(self):
3737
self.threads_pool.shutdown()
3838

3939
def flush(self):
40+
if not self.storage:
41+
return None
42+
4043
events = self.storage.pull_all()
41-
if events:
42-
return self.threads_pool.submit(self.send, events)
44+
45+
if not events:
46+
return None
47+
48+
batch_size = self.configuration.flush_queue_size
49+
batches = [events[i:i + batch_size] for i in range(0, len(events), batch_size)]
50+
batch_futures = [self.threads_pool.submit(self.send, batch) for batch in batches]
51+
52+
if len(batch_futures) == 1:
53+
return batch_futures[0]
54+
55+
return self._create_combined_future(batch_futures)
56+
57+
def _create_combined_future(self, batch_futures):
58+
def wait_for_all():
59+
errors = []
60+
61+
for i, future in enumerate(batch_futures):
62+
try:
63+
future.result()
64+
except Exception as e:
65+
self.configuration.logger.error(
66+
f"Flush batch {i+1}/{len(batch_futures)} failed: {e}"
67+
)
68+
errors.append(e)
69+
70+
# If any batches failed, raise the first error
71+
if errors:
72+
raise errors[0]
73+
74+
return None
75+
76+
return self.threads_pool.submit(wait_for_all)
4377

4478
def send(self, events):
4579
url = self.configuration.server_url

src/test/test_worker.py

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,20 @@ def test_worker_initialize_setup_success(self):
3838
self.assertIsNotNone(self.workers.response_processor)
3939

4040
def test_worker_stop_success(self):
41-
self.workers.storage.pull_all = MagicMock()
41+
for i in range(5):
42+
self.workers.storage.push(BaseEvent(f"event_{i}", "test_user"))
43+
44+
success_response = Response(HttpStatus.SUCCESS)
45+
HttpClient.post = MagicMock(return_value=success_response)
46+
4247
self.workers.stop()
4348
self.assertFalse(self.workers.is_active)
4449
self.assertTrue(self.workers.is_started)
45-
self.workers.storage.pull_all.assert_called_once()
50+
51+
# Verify storage was flushed (all events removed)
52+
self.assertEqual(0, self.workers.storage.total_events)
53+
self.assertEqual(0, len(self.workers.storage.ready_queue))
54+
self.assertEqual(0, len(self.workers.storage.buffer_data))
4655

4756
def test_worker_get_payload_success(self):
4857
events = [BaseEvent("test_event1", "test_user"), BaseEvent("test_event2", "test_user")]
@@ -150,11 +159,25 @@ def test_worker_send_events_with_payload_too_large_response_decrease_flush_queue
150159
success_response = Response(HttpStatus.SUCCESS)
151160
payload_too_large_response = Response(HttpStatus.PAYLOAD_TOO_LARGE)
152161
HttpClient.post = MagicMock()
153-
HttpClient.post.side_effect = [payload_too_large_response, payload_too_large_response, success_response]
162+
# First send gets PAYLOAD_TOO_LARGE (divider 1→2, size 30→15)
163+
# First flush sends 2 batches of 15:
164+
# - Batch 1 (15) fails: len(15) <= 15 → TRUE → increase (divider 2→3, size 15→10)
165+
# - Batch 2 (15) fails: len(15) <= 10 → FALSE → don't increase
166+
# Second flush sends 3 batches of 10, all succeed
167+
HttpClient.post.side_effect = [
168+
payload_too_large_response, # Initial send of 30 events
169+
payload_too_large_response, # First flush batch 1 (15 events)
170+
payload_too_large_response, # First flush batch 2 (15 events)
171+
success_response, # Second flush batch 1 (10 events)
172+
success_response, # Second flush batch 2 (10 events)
173+
success_response # Second flush batch 3 (10 events)
174+
]
154175
self.workers.configuration.flush_queue_size = 30
155176
self.workers.send(events)
156177
self.assertEqual(15, self.workers.configuration.flush_queue_size)
157178
self.workers.flush().result()
179+
# After first flush, only first batch increased divider (15 <= 15)
180+
# Second batch didn't (15 > 10), so divider only went 2→3
158181
self.assertEqual(10, self.workers.configuration.flush_queue_size)
159182
self.workers.flush().result()
160183
self.assertEqual(30, len(self.events_dict[200]))

0 commit comments

Comments
 (0)