Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -182,27 +182,26 @@ class TaskInfo
PyObject* origin = NULL;
PyObject* loop = NULL;

GenInfo::Ptr coro = nullptr;

StringTable::Key name;
bool is_on_cpu = false;
GenInfo::Ptr coro = nullptr;

// Information to reconstruct the async stack as best as we can
TaskInfo::Ptr waiter = nullptr;
bool is_on_cpu = false;

[[nodiscard]] static Result<TaskInfo::Ptr> create(TaskObj*);
TaskInfo(PyObject* origin, PyObject* loop, GenInfo::Ptr coro, StringTable::Key name, TaskInfo::Ptr waiter)
: origin(origin)
, loop(loop)
, coro(std::move(coro))
, name(name)
, is_on_cpu(coro && coro->is_running)
, coro(std::move(coro))
, waiter(std::move(waiter))
, is_on_cpu(this->coro && this->coro->is_running)
{
}

[[nodiscard]] static Result<TaskInfo::Ptr> current(PyObject*);
inline size_t unwind(FrameStack&);
inline size_t unwind(FrameStack&, size_t& upper_python_stack_size);
};

inline std::unordered_map<PyObject*, PyObject*> task_link_map;
Expand Down Expand Up @@ -344,7 +343,7 @@ inline std::vector<std::unique_ptr<StackInfo>> current_tasks;
// ----------------------------------------------------------------------------

inline size_t
TaskInfo::unwind(FrameStack& stack)
TaskInfo::unwind(FrameStack& stack, size_t& upper_python_stack_size)
{
// TODO: Check for running task.
std::stack<PyObject*> coro_frames;
Expand All @@ -355,14 +354,31 @@ TaskInfo::unwind(FrameStack& stack)
coro_frames.push(py_coro->frame);
}

int count = 0;
// Total number of frames added to the Stack
size_t count = 0;

// Unwind the coro frames
while (!coro_frames.empty()) {
PyObject* frame = coro_frames.top();
coro_frames.pop();

count += unwind_frame(frame, stack);
auto new_frames = unwind_frame(frame, stack);

// If this is the first Frame being unwound (we have not added any Frames to the Stack yet),
// use the number of Frames added to the Stack to determine the size of the upper Python stack.
if (count == 0) {
// The first Frame is the coroutine Frame, so the Python stack size is the number of Frames - 1
upper_python_stack_size = new_frames - 1;

// Remove the Python Frames from the Stack (they will be added back later)
// We cannot push those Frames now because otherwise they would be added once per Task,
// we only want to add them once per Leaf Task, and on top of all non-leaf Tasks.
for (size_t i = 0; i < upper_python_stack_size; i++) {
stack.pop_back();
}
}

count += new_frames;
}

return count;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,31 +264,40 @@ ThreadInfo::unwind_tasks()
}
}

// Make sure the on CPU task is first
std::sort(leaf_tasks.begin(), leaf_tasks.end(), [](const TaskInfo::Ref& a, const TaskInfo::Ref& b) {
return ((a.get().is_on_cpu ? 0 : 1) < (b.get().is_on_cpu ? 0 : 1));
});

// The size of the "pure Python" stack (before asyncio Frames), computed later by TaskInfo::unwind
size_t upper_python_stack_size = 0;
// Unused variable, will be used later by TaskInfo::unwind
size_t unused;

bool on_cpu_task_seen = false;
for (auto& leaf_task : leaf_tasks) {
on_cpu_task_seen = on_cpu_task_seen || leaf_task.get().is_on_cpu;

auto stack_info = std::make_unique<StackInfo>(leaf_task.get().name, leaf_task.get().is_on_cpu);
auto& stack = stack_info->stack;

for (auto current_task = leaf_task;;) {
auto& task = current_task.get();

size_t stack_size = task.unwind(stack);

// The task_stack_size includes both the coroutines frames and the "upper" Python synchronous frames
size_t task_stack_size = task.unwind(stack, task.is_on_cpu ? upper_python_stack_size : unused);
if (task.is_on_cpu) {
// Undo the stack unwinding
// TODO[perf]: not super-efficient :(
for (size_t i = 0; i < stack_size; i++)
stack.pop_back();

// Instead we get part of the thread stack
FrameStack temp_stack;
size_t nframes = (python_stack.size() > stack_size) ? python_stack.size() - stack_size : 0;
for (size_t i = 0; i < nframes; i++) {
auto python_frame = python_stack.front();
temp_stack.push_front(python_frame);
python_stack.pop_front();
}
while (!temp_stack.empty()) {
stack.push_front(temp_stack.front());
temp_stack.pop_front();
// Get the "bottom" part of the Python synchronous Stack, that is to say the
// synchronous functions and coroutines called by the Task's outermost coroutine
// The number of Frames to push is the total number of Frames in the Python stack, from which we
// subtract the number of Frames in the "upper Python stack" (asyncio machinery + sync entrypoint)
// This gives us [outermost coroutine, ... , innermost coroutine, outermost sync function, ... ,
// innermost sync function]
size_t frames_to_push =
(python_stack.size() > task_stack_size) ? python_stack.size() - task_stack_size : 0;
for (size_t i = 0; i < frames_to_push; i++) {
const auto& python_frame = python_stack[frames_to_push - i - 1];
stack.push_front(python_frame);
}
}

Expand Down Expand Up @@ -317,8 +326,15 @@ ThreadInfo::unwind_tasks()
}

// Finish off with the remaining thread stack
for (auto p = python_stack.begin(); p != python_stack.end(); p++)
stack.push_back(*p);
// If we have seen an on-CPU Task, then upper_python_stack_size will be set and will include the sync entry
// point and the asyncio machinery Frames. Otherwise, we are in `select` (idle) and we should push all the
// Frames.
for (size_t i = python_stack.size() - (on_cpu_task_seen ? upper_python_stack_size : python_stack.size());
i < python_stack.size();
i++) {
const auto& python_frame = python_stack[i];
stack.push_back(python_frame);
}

current_tasks.push_back(std::move(stack_info));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
fixes:
- |
profiling: This fix makes stack sampling more accurate for on-CPU asyncio Tasks.
Loading