Skip to content

Commit 46e967d

Browse files
committed
Refactor GhostTaskHelper and improve performance benchmarking
- Simplified GhostTaskHelper to static class design with per-loop task storage - Removed complex WeakKeyDictionary-based instance management - Streamlined eager task factory implementation with fewer parameters - Fixed benchmark eager_start parameter detection using runtime testing - Added 2-sigma outlier filtering for more accurate performance measurements - Corrected min/max statistical aggregation in benchmark results - Updated documentation to remove misleading claims about eager_start parameter
1 parent 91b39f2 commit 46e967d

File tree

3 files changed

+214
-84
lines changed

3 files changed

+214
-84
lines changed

src/asynkit/coroutine.py

Lines changed: 42 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import inspect
77
import sys
88
import types
9-
import weakref
109
from asyncio import Future, Task
1110
from collections.abc import (
1211
AsyncGenerator,
@@ -731,14 +730,11 @@ async def sync_coro():
731730
732731
Notes:
733732
- This is a different mechanism from Python 3.12's native eager execution
734-
feature. Python 3.12 provides `eager_start=True` parameter for
735-
`asyncio.create_task()` and `asyncio.eager_task_factory()`. Our
733+
feature. Python 3.12 provides `asyncio.eager_task_factory`. Our
736734
implementation works on all Python versions but may not always create
737735
a real Task - synchronous coroutines get a TaskLikeFuture instead.
738736
- All kwargs from asyncio.create_task() are properly forwarded to the
739737
inner factory when delegation occurs.
740-
- This is experimental functionality that modifies global task creation
741-
behavior for the entire event loop.
742738
- If you want to preserve an existing task factory, explicitly pass it
743739
as inner_factory rather than relying on automatic detection.
744740
@@ -770,23 +766,7 @@ def real_task_factory(coro: Coroutine[Any, Any, Any]) -> asyncio.Task[Any]:
770766
else:
771767
return asyncio.Task(coro, loop=loop, **kwargs)
772768

773-
ghost_task_getter = get_ghost_task_getter(loop)
774-
775-
return coro_eager_task_helper(
776-
loop, coro, name, context, ghost_task_getter, real_task_factory
777-
)
778-
779-
# cache GhostTaskHelper instances per event loop
780-
helpers: weakref.WeakKeyDictionary[asyncio.AbstractEventLoop, GhostTaskHelper] = (
781-
weakref.WeakKeyDictionary()
782-
)
783-
784-
def get_ghost_task_getter(
785-
loop: asyncio.AbstractEventLoop,
786-
) -> Callable[[], asyncio.Task[Any]]:
787-
if loop not in helpers:
788-
helpers[loop] = GhostTaskHelper(lambda coro: asyncio.Task(coro, loop=loop))
789-
return helpers[loop].get
769+
return coro_eager_task_helper(loop, coro, name, context, real_task_factory)
790770

791771
return factory
792772

@@ -865,7 +845,6 @@ def real_task_factory(coro_arg: Coroutine[Any, Any, T]) -> asyncio.Task[T]:
865845
coro,
866846
name,
867847
context,
868-
default_ghost_task_getter,
869848
real_task_factory,
870849
)
871850

@@ -889,36 +868,38 @@ class GhostTaskHelper:
889868
temporary task contexts if creating eager tasks in a non-task context.
890869
"""
891870

892-
cleanup: set[asyncio.Task[Any]] = set()
893-
894-
def __init__(
895-
self, raw_create: Callable[[Coroutine[Any, Any, Any]], asyncio.Task[Any]]
896-
) -> None:
897-
self.raw_create = raw_create
898-
self.ghost_task: asyncio.Task[Any] | None = None
871+
# this could be a WeakKeyDictionary but we want maximum performance here
872+
# and there are unlikely to be many event loops in practice.
873+
tasks: dict[asyncio.AbstractEventLoop, asyncio.Task[Any]] = {}
899874

900-
def get(self) -> asyncio.Task[Any]:
901-
if self.ghost_task is None:
902-
903-
async def ghost_coro() -> None:
904-
return None
875+
@classmethod
876+
def get(
877+
cls,
878+
loop: asyncio.AbstractEventLoop,
879+
raw_create: Callable[[Coroutine[Any, Any, Any]], asyncio.Task[Any]],
880+
) -> asyncio.Task[Any]:
881+
"""
882+
Get the GhostTaskHelper instance for the given event loop.
883+
"""
884+
try:
885+
return cls.tasks[loop]
886+
except KeyError:
887+
cls.tasks[loop] = raw_create(cls.task_coro())
888+
return cls.tasks[loop]
905889

906-
self.ghost_task = self.raw_create(ghost_coro())
907-
GhostTaskHelper.cleanup.add(self.ghost_task)
908-
self.ghost_task.add_done_callback(GhostTaskHelper.cleanup.discard)
909-
return self.ghost_task
890+
@classmethod
891+
async def task_coro(cls) -> None:
892+
pass
910893

911894

912-
# default instance for the create_task helper
913-
default_ghost_task_getter = GhostTaskHelper(_create_task).get
895+
_get_ghost_task = GhostTaskHelper.get # for easier access
914896

915897

916898
def coro_eager_task_helper(
917899
loop: asyncio.AbstractEventLoop,
918900
coro: Coroutine[Any, Any, T],
919901
name: str | None,
920902
context: Context | None,
921-
get_fake_task: Callable[[], asyncio.Task[Any]],
922903
real_task_factory: Callable[[Coroutine[Any, Any, T]], asyncio.Task[T]],
923904
) -> asyncio.Task[T] | TaskLikeFuture[T]:
924905
"""
@@ -936,38 +917,32 @@ def coro_eager_task_helper(
936917
"""
937918
# In Python < 3.11, context parameter doesn't exist for create_task()
938919
# so we ignore any provided context and let CoroStart manage its own
939-
940920
if sys.version_info < (3, 11):
941921
context = None
942922

943-
# start the coroutine in the task context
944-
def start() -> CoroStart[T]:
945-
if context is not None:
946-
# Enter the context only for the initial start, then use None for CoroStart
947-
# This way the continuation won't try to re-enter the context
948-
def start_in_context() -> CoroStart[T]:
949-
return CoroStart(coro, context=None)
950-
951-
cs = context.run(start_in_context)
952-
else:
953-
# No explicit context - use copy_context() as before
954-
cs = CoroStart(coro, context=copy_context())
955-
return cs
956-
957-
current_task = asyncio.current_task(loop)
958-
# if there is no current task, then we need a fake task to run it in
959-
# this is so that asyncio.get_current_task() returns a valid task during
960-
# eager start. This is not the same task as will be created later. This
961-
# is purely to satisfy get_current_task() calls during eager start, such
962-
# as for anyio that wants to detect the current async framework.
963-
964923
cs: CoroStart[T]
924+
current_task = asyncio.current_task(loop)
965925
if current_task is not None:
966-
cs = start()
926+
if context is None:
927+
cs = CoroStart(coro, context=copy_context())
928+
else:
929+
# Enter the context only for the initial start, then use None for CoroStart
930+
# This way the continuation won't try to re-enter the context
931+
cs = context.run(lambda: CoroStart(coro, context=None))
967932
else:
968-
old = swap_current_task(loop, get_fake_task())
933+
# if there is no current task, then we need a fake task to run it in
934+
# this is so that asyncio.get_current_task() returns a valid task during
935+
# eager start. This is not the same task as will be created later. This
936+
# is purely to satisfy get_current_task() calls during eager start, such
937+
# as for anyio that wants to detect the current async framework.
938+
old = swap_current_task(loop, _get_ghost_task(loop, real_task_factory))
969939
try:
970-
cs = start()
940+
if context is None:
941+
cs = CoroStart(coro, context=copy_context())
942+
else:
943+
# Enter the context only for the initial start, then use None for CoroStart
944+
# This way the continuation won't try to re-enter the context
945+
cs = context.run(lambda: CoroStart(coro, context=None))
971946
finally:
972947
swap_current_task(loop, old)
973948

@@ -976,7 +951,6 @@ def start_in_context() -> CoroStart[T]:
976951
if not cs.done():
977952
return real_task_factory(cs.as_coroutine())
978953
else:
979-
# Return a TaskLikeFuture wrapping the result
980954
return TaskLikeFuture(cs, name=name, context=context)
981955

982956

tests/misc/eager_task_factory_benchmark.py

Lines changed: 78 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010

1111
import asyncio
1212
import contextlib
13-
import inspect
1413
import statistics
1514
import sys
1615
import time
@@ -19,6 +18,31 @@
1918

2019
import asynkit
2120

21+
22+
def filter_outliers(
23+
data: list[float], sigma_threshold: float = 4.0
24+
) -> tuple[list[float], int]:
25+
"""
26+
Filter outliers from data using standard deviation method.
27+
28+
Returns:
29+
tuple: (filtered_data, num_outliers_removed)
30+
"""
31+
if len(data) <= 2:
32+
return data, 0
33+
34+
mean = statistics.mean(data)
35+
std_dev = statistics.stdev(data)
36+
37+
if std_dev == 0:
38+
return data, 0
39+
40+
filtered = [x for x in data if abs(x - mean) <= sigma_threshold * std_dev]
41+
num_removed = len(data) - len(filtered)
42+
43+
return filtered, num_removed
44+
45+
2246
# Test parameters
2347
NUM_TASKS = 100
2448
NUM_SLEEPS_PER_TASK = 100
@@ -161,13 +185,20 @@ async def run_tests(self) -> dict[str, Any]:
161185
adjustment_factor = 1000 if self.is_non_eager else 1
162186
adjusted_latencies = [lat / adjustment_factor for lat in latencies]
163187

164-
# Calculate statistics for this run
188+
# Filter outliers before calculating statistics
189+
filtered_latencies, num_outliers = filter_outliers(
190+
adjusted_latencies, sigma_threshold=2.0
191+
)
192+
193+
# Calculate statistics for this run (using filtered data)
165194
run_latency_stats = {
166-
"mean": statistics.mean(adjusted_latencies) * 1_000_000,
167-
"median": statistics.median(adjusted_latencies) * 1_000_000,
168-
"min": min(adjusted_latencies) * 1_000_000,
169-
"max": max(adjusted_latencies) * 1_000_000,
170-
"std_dev": statistics.stdev(adjusted_latencies) * 1_000_000,
195+
"mean": statistics.mean(filtered_latencies) * 1_000_000,
196+
"median": statistics.median(filtered_latencies) * 1_000_000,
197+
"min": min(filtered_latencies) * 1_000_000,
198+
"max": max(filtered_latencies) * 1_000_000,
199+
"std_dev": statistics.stdev(filtered_latencies) * 1_000_000,
200+
"outliers_removed": num_outliers,
201+
"total_samples": len(adjusted_latencies),
171202
}
172203

173204
# Measure throughput for this run
@@ -176,9 +207,12 @@ async def run_tests(self) -> dict[str, Any]:
176207
if is_warmup:
177208
print(" Warmup run completed (discarded)")
178209
else:
210+
outlier_info = ""
211+
if run_latency_stats["outliers_removed"] > 0:
212+
outlier_info = f" ({run_latency_stats['outliers_removed']} outliers filtered)"
179213
print(
180214
f"latency {run_latency_stats['mean']:.2f}μs, "
181-
f"throughput {throughput:.0f} ops/s"
215+
f"throughput {throughput:.0f} ops/s{outlier_info}"
182216
)
183217
all_latency_results.append(run_latency_stats)
184218
all_throughput_results.append(throughput)
@@ -200,8 +234,8 @@ async def run_tests(self) -> dict[str, Any]:
200234
"median_std": statistics.stdev(median_latencies)
201235
if len(median_latencies) > 1
202236
else 0,
203-
"min": statistics.mean(min_latencies),
204-
"max": statistics.mean(max_latencies),
237+
"min": min(min_latencies),
238+
"max": max(max_latencies),
205239
"std_dev": statistics.mean(std_dev_latencies),
206240
"runs": len(all_latency_results),
207241
}
@@ -214,6 +248,17 @@ async def run_tests(self) -> dict[str, Any]:
214248
else 0
215249
)
216250

251+
# Calculate total outliers filtered
252+
total_outliers = sum(
253+
result.get("outliers_removed", 0) for result in all_latency_results
254+
)
255+
total_samples = sum(
256+
result.get("total_samples", 0) for result in all_latency_results
257+
)
258+
outlier_percentage = (
259+
(total_outliers / total_samples * 100) if total_samples > 0 else 0
260+
)
261+
217262
# Display final results
218263
print(
219264
f"\nFinal Results (averaged over {final_latency_stats['runs']} runs):"
@@ -233,6 +278,10 @@ async def run_tests(self) -> dict[str, Any]:
233278
print(
234279
f" Throughput: {final_throughput:.0f} ± {throughput_std:.0f} operations/second"
235280
)
281+
if total_outliers > 0:
282+
print(
283+
f" Outliers filtered: {total_outliers}/{total_samples} ({outlier_percentage:.1f}%) using 2σ threshold"
284+
)
236285

237286
return {
238287
"factory_name": self.factory_name,
@@ -309,15 +358,28 @@ async def measure_throughput(self) -> float:
309358

310359

311360
async def compare_eager_start_parameter():
312-
"""Test Python 3.12's per-task eager_start parameter if available."""
361+
"""Test Python 3.14's per-task eager_start parameter if available."""
362+
363+
# Check if eager_start parameter is available by testing it
364+
python_version = f"{sys.version_info.major}.{sys.version_info.minor}"
365+
366+
# Test if eager_start actually works (it may be handled via **kwargs)
367+
try:
368+
369+
async def test_coro():
370+
return "test"
371+
372+
# Try creating a task with eager_start parameter
373+
task = asyncio.create_task(test_coro(), eager_start=True)
374+
await task
375+
eager_start_available = True
376+
except TypeError:
377+
eager_start_available = False
313378

314-
# Check if eager_start parameter is available
315-
sig = inspect.signature(asyncio.create_task)
316-
if "eager_start" not in sig.parameters:
317-
python_version = f"{sys.version_info.major}.{sys.version_info.minor}"
379+
if not eager_start_available:
318380
print(f"\n=== Python {python_version} eager_start Parameter ===")
319381
print(" eager_start parameter not available in this Python version")
320-
print(" (eager_start was added in Python 3.12.0a7+)")
382+
print(" (eager_start was added to asyncio.create_task() in Python 3.14)")
321383
return
322384

323385
python_version = f"{sys.version_info.major}.{sys.version_info.minor}"

0 commit comments

Comments
 (0)