Skip to content

Commit 2cce696

Browse files
Respect flush_queue_size when calling Workers.flush
1 parent 811e947 commit 2cce696

File tree

2 files changed

+13
-11
lines changed

2 files changed

+13
-11
lines changed

src/amplitude/timeline.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ def flush(self):
4444
destination_futures = []
4545
for destination in self.plugins[PluginType.DESTINATION]:
4646
try:
47-
destination_futures.append(destination.flush())
47+
destination_futures.extend(destination.flush())
4848
except Exception:
4949
self.logger.exception("Error for flush events")
5050
return destination_futures

src/amplitude/worker.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88

99

1010
class Workers:
11-
1211
def __init__(self):
1312
self.threads_pool = ThreadPoolExecutor(max_workers=16)
1413
self.is_active = True
@@ -37,9 +36,13 @@ def stop(self):
3736
self.threads_pool.shutdown()
3837

3938
def flush(self):
40-
events = self.storage.pull_all()
41-
if events:
42-
return self.threads_pool.submit(self.send, events)
39+
futures = []
40+
while self.storage.total_events:
41+
events = self.storage.pull(self.configuration.flush_queue_size)
42+
if events:
43+
future = self.threads_pool.submit(self.send, events)
44+
futures.append(future)
45+
return futures
4346

4447
def send(self, events):
4548
url = self.configuration.server_url
@@ -51,23 +54,22 @@ def send(self, events):
5154
self.configuration.logger.error("Invalid API Key")
5255

5356
def get_payload(self, events) -> bytes:
54-
payload_body = {
55-
"api_key": self.configuration.api_key,
56-
"events": []
57-
}
57+
payload_body = {"api_key": self.configuration.api_key, "events": []}
5858
for event in events:
5959
event_body = event.get_event_body()
6060
if event_body:
6161
payload_body["events"].append(event_body)
6262
if self.configuration.options:
6363
payload_body["options"] = self.configuration.options
64-
return json.dumps(payload_body, sort_keys=True).encode('utf8')
64+
return json.dumps(payload_body, sort_keys=True).encode("utf8")
6565

6666
def buffer_consumer(self):
6767
try:
6868
if self.is_active:
6969
with self.storage.lock:
70-
self.storage.lock.wait(self.configuration.flush_interval_millis / 1000)
70+
self.storage.lock.wait(
71+
self.configuration.flush_interval_millis / 1000
72+
)
7173
while True:
7274
if not self.storage.total_events:
7375
break

0 commit comments

Comments
 (0)