Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 14 additions & 20 deletions ddtrace/profiling/collector/asyncio.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,27 @@
from asyncio.locks import Lock
import typing
import asyncio

from .. import collector
from . import _lock


class _ProfiledAsyncioLock(_lock._ProfiledLock):
pass


class _ProfiledAsyncioSemaphore(_lock._ProfiledLock):
pass


class AsyncioLockCollector(_lock.LockCollector):
"""Record asyncio.Lock usage."""

PROFILED_LOCK_CLASS = _ProfiledAsyncioLock
MODULE = asyncio
PATCHED_LOCK_NAME = "Lock"


class AsyncioSemaphoreCollector(_lock.LockCollector):
"""Record asyncio.Semaphore usage."""

def _start_service(self) -> None:
"""Start collecting lock usage."""
try:
import asyncio
except ImportError as e:
raise collector.CollectorUnavailable(e)
self._asyncio_module = asyncio
return super(AsyncioLockCollector, self)._start_service()

def _get_patch_target(self) -> typing.Type[Lock]:
return self._asyncio_module.Lock

def _set_patch_target(
self,
value: typing.Any,
) -> None:
self._asyncio_module.Lock = value # type: ignore[misc]
PROFILED_LOCK_CLASS = _ProfiledAsyncioSemaphore
MODULE = asyncio
PATCHED_LOCK_NAME = "Semaphore"
1 change: 1 addition & 0 deletions ddtrace/profiling/profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ def start_collector(collector_class: Type[collector.Collector]) -> None:
("threading", lambda _: start_collector(threading.ThreadingSemaphoreCollector)),
("threading", lambda _: start_collector(threading.ThreadingBoundedSemaphoreCollector)),
("asyncio", lambda _: start_collector(asyncio.AsyncioLockCollector)),
("asyncio", lambda _: start_collector(asyncio.AsyncioSemaphoreCollector)),
]

for module, hook in self._collectors_on_import:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
features:
- |
profiling: Add support for ``asyncio.Semaphore`` locking type profiling in Python.
Also refactored asyncio collectors to use base class attributes (``MODULE`` and ``PATCHED_LOCK_NAME``)
for consistency with the threading module collectors.

116 changes: 89 additions & 27 deletions tests/profiling/collector/test_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
import glob
import os
import sys
from typing import Type
from typing import Union
import uuid

import pytest

from ddtrace import ext
from ddtrace.internal.datadog.profiling import ddup
from ddtrace.profiling.collector import asyncio as collector_asyncio
from ddtrace.profiling.collector.asyncio import AsyncioLockCollector
from ddtrace.profiling.collector.asyncio import AsyncioSemaphoreCollector
from tests.profiling.collector import pprof_utils
from tests.profiling.collector import test_collector
from tests.profiling.collector.lock_utils import get_lock_linenos
Expand All @@ -20,16 +23,49 @@

PY_311_OR_ABOVE = sys.version_info[:2] >= (3, 11)


def test_repr():
test_collector._test_repr(
collector_asyncio.AsyncioLockCollector,
"AsyncioLockCollector(status=<ServiceStatus.STOPPED: 'stopped'>, capture_pct=1.0, nframes=64, tracer=None)", # noqa: E501
)
# Type aliases for collector and lock types
CollectorType = Union[
Type[AsyncioLockCollector],
Type[AsyncioSemaphoreCollector],
]
LockType = Union[Type[asyncio.Lock], Type[asyncio.Semaphore]]


@pytest.mark.parametrize(
"collector_class,expected_repr",
[
(
AsyncioLockCollector,
"AsyncioLockCollector(status=<ServiceStatus.STOPPED: 'stopped'>, capture_pct=1.0, nframes=64, tracer=None)",
),
(
AsyncioSemaphoreCollector,
"AsyncioSemaphoreCollector(status=<ServiceStatus.STOPPED: 'stopped'>, capture_pct=1.0, nframes=64, tracer=None)", # noqa: E501
),
],
)
def test_collector_repr(collector_class: CollectorType, expected_repr: str) -> None:
test_collector._test_repr(collector_class, expected_repr)


@pytest.mark.asyncio
class TestAsyncioLockCollector:
class BaseAsyncioLockCollectorTest:
"""Base test class for asyncio lock collectors.

Child classes must implement:
- collector_class: The collector class to test
- lock_class: The asyncio lock class to test
- lock_init_args: Arguments to pass to lock constructor (default: ())
"""

@property
def collector_class(self) -> CollectorType:
raise NotImplementedError("Child classes must implement collector_class")

@property
def lock_class(self) -> LockType:
raise NotImplementedError("Child classes must implement lock_class")

def setup_method(self, method):
self.test_name = method.__qualname__ if PY_311_OR_ABOVE else method.__name__
self.output_prefix = "/tmp" + os.sep + self.test_name
Expand All @@ -51,16 +87,17 @@ def teardown_method(self):
except Exception as e:
print("Error while deleting file: ", e)

async def test_asyncio_lock_events(self):
with collector_asyncio.AsyncioLockCollector(capture_pct=100):
lock = asyncio.Lock() # !CREATE! test_asyncio_lock_events
await lock.acquire() # !ACQUIRE! test_asyncio_lock_events
async def test_lock_events(self):
"""Test basic acquire/release event profiling."""
with self.collector_class(capture_pct=100):
lock = self.lock_class() # !CREATE! asyncio_test_lock_events
await lock.acquire() # !ACQUIRE! asyncio_test_lock_events
assert lock.locked()
lock.release() # !RELEASE! test_asyncio_lock_events
lock.release() # !RELEASE! asyncio_test_lock_events

ddup.upload()

linenos = get_lock_linenos("test_asyncio_lock_events")
linenos = get_lock_linenos("asyncio_test_lock_events")
profile = pprof_utils.parse_newest_profile(self.output_filename)
expected_thread_id = _thread.get_ident()
pprof_utils.assert_lock_events(
Expand All @@ -85,29 +122,30 @@ async def test_asyncio_lock_events(self):
],
)

async def test_asyncio_lock_events_tracer(self, tracer):
async def test_lock_events_tracer(self, tracer):
"""Test event profiling with tracer integration."""
tracer._endpoint_call_counter_span_processor.enable()
resource = str(uuid.uuid4())
span_type = ext.SpanTypes.WEB

with collector_asyncio.AsyncioLockCollector(capture_pct=100, tracer=tracer):
lock = asyncio.Lock() # !CREATE! test_asyncio_lock_events_tracer_1
await lock.acquire() # !ACQUIRE! test_asyncio_lock_events_tracer_1
with self.collector_class(capture_pct=100, tracer=tracer):
lock = self.lock_class() # !CREATE! asyncio_test_lock_events_tracer_1
await lock.acquire() # !ACQUIRE! asyncio_test_lock_events_tracer_1
with tracer.trace("test", resource=resource, span_type=span_type) as t:
lock2 = asyncio.Lock() # !CREATE! test_asyncio_lock_events_tracer_2
await lock2.acquire() # !ACQUIRE! test_asyncio_lock_events_tracer_2
lock.release() # !RELEASE! test_asyncio_lock_events_tracer_1
lock2 = self.lock_class() # !CREATE! asyncio_test_lock_events_tracer_2
await lock2.acquire() # !ACQUIRE! asyncio_test_lock_events_tracer_2
lock.release() # !RELEASE! asyncio_test_lock_events_tracer_1
span_id = t.span_id
lock2.release() # !RELEASE! test_asyncio_lock_events_tracer_2
lock2.release() # !RELEASE! asyncio_test_lock_events_tracer_2

lock_ctx = asyncio.Lock() # !CREATE! test_asyncio_lock_events_tracer_3
async with lock_ctx: # !ACQUIRE! !RELEASE! test_asyncio_lock_events_tracer_3
lock_ctx = self.lock_class() # !CREATE! asyncio_test_lock_events_tracer_3
async with lock_ctx: # !ACQUIRE! !RELEASE! asyncio_test_lock_events_tracer_3
pass
ddup.upload(tracer=tracer)

linenos_1 = get_lock_linenos("test_asyncio_lock_events_tracer_1")
linenos_2 = get_lock_linenos("test_asyncio_lock_events_tracer_2")
linenos_3 = get_lock_linenos("test_asyncio_lock_events_tracer_3", with_stmt=True)
linenos_1 = get_lock_linenos("asyncio_test_lock_events_tracer_1")
linenos_2 = get_lock_linenos("asyncio_test_lock_events_tracer_2")
linenos_3 = get_lock_linenos("asyncio_test_lock_events_tracer_3", with_stmt=True)

profile = pprof_utils.parse_newest_profile(self.output_filename)
expected_thread_id = _thread.get_ident()
Expand Down Expand Up @@ -167,3 +205,27 @@ async def test_asyncio_lock_events_tracer(self, tracer):
),
],
)


class TestAsyncioLockCollector(BaseAsyncioLockCollectorTest):
"""Test asyncio.Lock profiling."""

@property
def collector_class(self):
return AsyncioLockCollector

@property
def lock_class(self):
return asyncio.Lock


class TestAsyncioSemaphoreCollector(BaseAsyncioLockCollectorTest):
"""Test asyncio.Semaphore profiling."""

@property
def collector_class(self):
return AsyncioSemaphoreCollector

@property
def lock_class(self):
return asyncio.Semaphore
1 change: 1 addition & 0 deletions tests/profiling/test_profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ def test_default_collectors():
pass
else:
assert any(isinstance(c, asyncio.AsyncioLockCollector) for c in p._profiler._collectors)
assert any(isinstance(c, asyncio.AsyncioSemaphoreCollector) for c in p._profiler._collectors)
p.stop(flush=False)


Expand Down
Loading