Skip to content

Commit a7baa36

Browse files
fix(profiling): do not remove links for just-created Tasks
1 parent 73210fe commit a7baa36

File tree

3 files changed

+80
-13
lines changed

3 files changed

+80
-13
lines changed

ddtrace/internal/datadog/profiling/stack_v2/echion/echion/threads.h

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ ThreadInfo::unwind_tasks()
209209
std::unordered_set<PyObject*> parent_tasks;
210210
std::unordered_map<PyObject*, TaskInfo::Ref> waitee_map; // Indexed by task origin
211211
std::unordered_map<PyObject*, TaskInfo::Ref> origin_map; // Indexed by task origin
212+
static std::unordered_set<PyObject*> previous_task_objects;
212213

213214
auto maybe_all_tasks = get_all_tasks(reinterpret_cast<PyObject*>(asyncio_loop));
214215
if (!maybe_all_tasks) {
@@ -232,14 +233,25 @@ ThreadInfo::unwind_tasks()
232233
if (all_task_origins.find(kv.first) == all_task_origins.end())
233234
to_remove.push_back(kv.first);
234235
}
235-
for (auto key : to_remove)
236-
task_link_map.erase(key);
236+
for (auto key : to_remove) {
237+
// Only remove the link if the Child Task previously existed; otherwise it's a Task that
238+
// has just been created and that wasn't in all_tasks when we took the snapshot.
239+
if (previous_task_objects.find(key) != previous_task_objects.end()) {
240+
task_link_map.erase(key);
241+
}
242+
}
237243

238244
// Determine the parent tasks from the gather links.
239245
std::transform(task_link_map.cbegin(),
240246
task_link_map.cend(),
241247
std::inserter(parent_tasks, parent_tasks.begin()),
242248
[](const std::pair<PyObject*, PyObject*>& kv) { return kv.second; });
249+
250+
// Copy all Task object pointers into previous_task_objects
251+
previous_task_objects.clear();
252+
for (const auto& task : all_tasks) {
253+
previous_task_objects.insert(task->origin);
254+
}
243255
}
244256

245257
for (auto& task : all_tasks) {
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
fixes:
2+
- |
3+
profiling: This fix resolves a race condition leading to incorrect stacks being reported
4+
for asyncio parent/child Tasks (e.g. when using ``asyncio.gather``).

tests/profiling/collector/test_asyncio_as_completed.py

Lines changed: 62 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,12 @@ def test_asyncio_as_completed() -> None:
1515
from sys import version_info as PYVERSION
1616

1717
from ddtrace.internal.datadog.profiling import stack_v2
18+
from ddtrace.internal.logger import get_logger
1819
from ddtrace.profiling import profiler
1920
from tests.profiling.collector import pprof_utils
2021

22+
LOG = get_logger(__name__)
23+
2124
assert stack_v2.is_available, stack_v2.failure_msg
2225

2326
async def other(t: float) -> None:
@@ -30,9 +33,12 @@ async def wait_and_return_delay(t: float) -> float:
3033
async def main() -> None:
3134
# Create a mix of Tasks and Coroutines
3235
futures = [
33-
asyncio.create_task(wait_and_return_delay(i / 10)) if i % 2 == 0 else wait_and_return_delay(i / 10)
34-
for i in range(10)
36+
asyncio.create_task(wait_and_return_delay(float(i) / 10))
37+
if i % 2 == 0
38+
else wait_and_return_delay(float(i) / 10)
39+
for i in range(2, 12)
3540
]
41+
assert len(futures) == 10
3642

3743
# Randomize the order of the futures
3844
random.shuffle(futures)
@@ -61,6 +67,17 @@ async def main() -> None:
6167

6268
profile = pprof_utils.parse_newest_profile(output_filename)
6369

70+
task_names_in_profile = sorted(
71+
set(
72+
[
73+
(profile.string_table[label.str])
74+
for sample in profile.sample
75+
for label in sample.label
76+
if profile.string_table[label.key] == "task name"
77+
]
78+
)
79+
)
80+
6481
samples = pprof_utils.get_samples_with_label_key(profile, "task name")
6582
assert len(samples) > 0
6683

@@ -73,7 +90,7 @@ async def main() -> None:
7390
pprof_utils.StackLocation(
7491
function_name="main",
7592
filename="test_asyncio_as_completed.py",
76-
line_no=main.__code__.co_firstlineno + 14,
93+
line_no=main.__code__.co_firstlineno + 17,
7794
),
7895
]
7996

@@ -91,11 +108,45 @@ async def main() -> None:
91108
),
92109
] + locations
93110

94-
pprof_utils.assert_profile_has_sample(
95-
profile,
96-
samples,
97-
expected_sample=pprof_utils.StackEvent(
98-
thread_name="MainThread",
99-
locations=locations,
100-
),
101-
)
111+
# Now, check that we have seen those locations for each Task we've created.
112+
# (They should be named Task-2 .. Task-11, which is the automatic name assigned to Tasks by asyncio.create_task)
113+
# Note: we expect one Task to not be seen (and thus accept to recover from one failure). The reason
114+
# is that there is a bug in ddtrace that makes one Task (randomly picked) appear "as part of" the Parent Task,
115+
# and this Task thus gets the Parent Task's name and not its own.
116+
seen_all_except_one = True
117+
seen_task_names: set[str] = set()
118+
for i in range(2, 12):
119+
try:
120+
pprof_utils.assert_profile_has_sample(
121+
profile,
122+
samples,
123+
expected_sample=pprof_utils.StackEvent(
124+
task_name=f"Task-{i}",
125+
thread_name="MainThread",
126+
locations=locations,
127+
),
128+
)
129+
130+
seen_task_names.add(f"Task-{i}")
131+
except AssertionError:
132+
if not seen_all_except_one:
133+
LOG.error(
134+
f"More than one Task has not been seen; i = {i} " # noqa: G004
135+
f"seen_task_names = {seen_task_names} "
136+
f"task_names_in_profile = {task_names_in_profile}"
137+
)
138+
raise
139+
140+
# This is the bug situation.
141+
# Check that we have seen the expected locations for the Parent Task (Task-1)
142+
# If that isn't the case, then something else is broken.
143+
pprof_utils.assert_profile_has_sample(
144+
profile,
145+
samples,
146+
expected_sample=pprof_utils.StackEvent(
147+
task_name="Task-1",
148+
thread_name="MainThread",
149+
locations=locations,
150+
),
151+
)
152+
seen_all_except_one = False

0 commit comments

Comments
 (0)