Skip to content

Commit c36972c

Browse files
authored
feat: add sampling service #(357)
Signed-off-by: Tsonglew <[email protected]>
1 parent 55dc5ff commit c36972c

File tree

9 files changed

+263
-10
lines changed

9 files changed

+263
-10
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
- Feature:
66
- Drop support for 3.7 (#356)
7+
- Support sampling rate setup. Provide `SW_SAMPLE_N_PER_3_SECS` environment variable to control it (#357)
78

89
- Fixes:
910
- Fix: user/password replacement is not allowed for relative URLs (#349)

docs/en/contribution/CodingStyle.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ Since Python 3.5 is end of life, we fully utilize the clarity and performance bo
66
Please do not use other styles - `+`, `%` or `.format` unless f-string is absolutely unfeasible in the context, or
77
it is a logger message, which is [optimized](https://docs.python.org/3/howto/logging.html#optimization) for the `%` style
88

9-
Run `make dev-fix` to invoke [flynt](https://github.com/ikamensh/flynt) to convert other formats to f-string, pay **extra care** to possible corner
9+
Run `make fix` to invoke [flynt](https://github.com/ikamensh/flynt) to convert other formats to f-string, pay **extra care** to possible corner
1010
cases leading to a semantically different conversion.
1111

1212
### Quotes
@@ -23,7 +23,7 @@ foo = f"I'm a string"
2323
bar = f"This repo is called 'skywalking-python'"
2424
```
2525

26-
Run `make dev-fix` to invoke [unify](https://github.com/myint/unify) to deal with your quotes if flake8 complaints about it.
26+
Run `make fix` to invoke [unify](https://github.com/myint/unify) to deal with your quotes if flake8 complaints about it.
2727

2828
## Debug messages
2929
Please import the `logger_debug_enabled` variable and wrap your debug messages with a check.

docs/en/setup/Configuration.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,3 +97,7 @@ export SW_AGENT_YourConfiguration=YourValue
9797
| plugin_fastapi_collect_http_params | SW_PLUGIN_FASTAPI_COLLECT_HTTP_PARAMS | <class 'bool'> | False | This config item controls that whether the FastAPI plugin should collect the parameters of the request. |
9898
| plugin_bottle_collect_http_params | SW_PLUGIN_BOTTLE_COLLECT_HTTP_PARAMS | <class 'bool'> | False | This config item controls that whether the Bottle plugin should collect the parameters of the request. |
9999
| plugin_celery_parameters_length | SW_PLUGIN_CELERY_PARAMETERS_LENGTH | <class 'int'> | 512 | The maximum length of `celery` functions parameters, longer than this will be truncated, 0 turns off |
100+
### Sampling Configurations
101+
| Configuration | Environment Variable | Type | Default Value | Description |
102+
| :------------ | :------------ | :------------ | :------------ | :------------ |
103+
| sample_n_per_3_secs | SW_SAMPLE_N_PER_3_SECS | <class 'int'> | 0 | The number of samples to take in every 3 seconds, 0 turns off |

skywalking/agent/__init__.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,16 @@
1515
# limitations under the License.
1616
#
1717

18+
import asyncio
1819
import atexit
1920
import functools
2021
import os
2122
import sys
22-
import asyncio
23-
from queue import Queue, Full
24-
from threading import Thread, Event
23+
from queue import Full, Queue
24+
from threading import Event, Thread
2525
from typing import TYPE_CHECKING, Optional
2626

27-
from skywalking import config, plugins
28-
from skywalking import loggings
29-
from skywalking import meter
30-
from skywalking import profile
27+
from skywalking import config, loggings, meter, plugins, profile, sampling
3128
from skywalking.agent.protocol import Protocol, ProtocolAsync
3229
from skywalking.command import command_service, command_service_async
3330
from skywalking.loggings import logger
@@ -306,6 +303,8 @@ def start(self) -> None:
306303
profile.init()
307304
if config.agent_meter_reporter_active:
308305
meter.init(force=True) # force re-init after fork()
306+
if config.sample_n_per_3_secs > 0:
307+
sampling.init(force=True)
309308

310309
self.__bootstrap() # calls init_threading
311310

@@ -517,6 +516,8 @@ async def __start_event_loop_async(self) -> None:
517516
if config.agent_meter_reporter_active:
518517
# meter.init(force=True)
519518
await meter.init_async()
519+
if config.sample_n_per_3_secs > 0:
520+
await sampling.init_async()
520521

521522
self.__bootstrap() # gather all coroutines
522523

skywalking/config.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@
3232
import os
3333
import re
3434
import uuid
35-
from typing import List, Pattern
3635
import warnings
36+
from typing import List, Pattern
3737

3838
RE_IGNORE_PATH: Pattern = re.compile('^$')
3939
RE_HTTP_IGNORE_METHOD: Pattern = RE_IGNORE_PATH
@@ -213,6 +213,10 @@
213213
# The maximum length of `celery` functions parameters, longer than this will be truncated, 0 turns off
214214
plugin_celery_parameters_length: int = int(os.getenv('SW_PLUGIN_CELERY_PARAMETERS_LENGTH', '512'))
215215

216+
# BEGIN: Sampling Configurations
217+
# The number of samples to take in every 3 seconds, 0 turns off
218+
sample_n_per_3_secs: int = int(os.getenv('SW_SAMPLE_N_PER_3_SECS', '0'))
219+
216220
# THIS MUST FOLLOW DIRECTLY AFTER LIST OF CONFIG OPTIONS!
217221
options = [key for key in globals() if key not in options] # THIS MUST FOLLOW DIRECTLY AFTER LIST OF CONFIG OPTIONS!
218222

skywalking/sampling/__init__.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
import asyncio
18+
from typing import Optional
19+
20+
21+
sampling_service = None
22+
23+
24+
def init(force: bool = False):
25+
"""
26+
If the sampling service is not initialized, initialize it.
27+
if force, we are in a fork(), we force re-initialization
28+
"""
29+
from skywalking.sampling.sampling_service import SamplingService
30+
from skywalking.log import logger
31+
32+
global sampling_service
33+
if sampling_service and not force:
34+
return
35+
36+
logger.debug('Initializing sampling service')
37+
sampling_service = SamplingService()
38+
sampling_service.start()
39+
40+
41+
async def init_async(async_event: Optional[asyncio.Event] = None):
42+
from skywalking.sampling.sampling_service import SamplingServiceAsync
43+
44+
global sampling_service
45+
46+
sampling_service = SamplingServiceAsync()
47+
if async_event is not None:
48+
async_event.set()
49+
task = asyncio.create_task(sampling_service.start())
50+
sampling_service.strong_ref_set.add(task)
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
from threading import Lock, Thread
19+
20+
import time
21+
from typing import Set
22+
from skywalking import config
23+
from skywalking.log import logger
24+
25+
import asyncio
26+
27+
28+
class SamplingServiceBase:
29+
30+
def __init__(self):
31+
self.sampling_factor = 0
32+
33+
@property
34+
def reset_sampling_factor_interval(self) -> int:
35+
return 3
36+
37+
@property
38+
def can_sampling(self):
39+
return self.sampling_factor < config.sample_n_per_3_secs
40+
41+
def _try_sampling(self) -> bool:
42+
if self.can_sampling:
43+
self._incr_sampling_factor()
44+
return True
45+
logger.debug('%s try_sampling return false, sampling_factor: %d', self.__class__.__name__, self.sampling_factor)
46+
return False
47+
48+
def _set_sampling_factor(self, val: int):
49+
logger.debug('Set sampling factor to %d', val)
50+
self.sampling_factor = val
51+
52+
def _incr_sampling_factor(self):
53+
self.sampling_factor += 1
54+
55+
56+
class SamplingService(Thread, SamplingServiceBase):
57+
58+
def __init__(self):
59+
Thread.__init__(self, name='SamplingService', daemon=True)
60+
SamplingServiceBase.__init__(self)
61+
self.lock = Lock()
62+
63+
def run(self):
64+
logger.debug('Started sampling service sampling_n_per_3_secs: %d', config.sample_n_per_3_secs)
65+
while True:
66+
self.reset_sampling_factor()
67+
time.sleep(self.reset_sampling_factor_interval)
68+
69+
def try_sampling(self) -> bool:
70+
with self.lock:
71+
return super()._try_sampling()
72+
73+
def force_sampled(self) -> None:
74+
with self.lock:
75+
super()._incr_sampling_factor()
76+
77+
def reset_sampling_factor(self) -> None:
78+
with self.lock:
79+
super()._set_sampling_factor(0)
80+
81+
82+
class SamplingServiceAsync(SamplingServiceBase):
83+
84+
def __init__(self):
85+
super().__init__()
86+
self.strong_ref_set: Set[asyncio.Task[None]] = set()
87+
88+
async def start(self):
89+
logger.debug('Started async sampling service sampling_n_per_3_secs: %d', config.sample_n_per_3_secs)
90+
while True:
91+
await self.reset_sampling_factor()
92+
await asyncio.sleep(self.reset_sampling_factor_interval)
93+
94+
def try_sampling(self) -> bool:
95+
return super()._try_sampling()
96+
97+
def force_sampled(self):
98+
super()._incr_sampling_factor()
99+
100+
async def reset_sampling_factor(self):
101+
super()._set_sampling_factor(0)

skywalking/trace/context.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
from skywalking import profile
2121
from skywalking.agent import agent
2222
from skywalking.profile.profile_status import ProfileStatusReference
23+
from skywalking import sampling
2324
from skywalking.trace import ID
2425
from skywalking.trace.carrier import Carrier
2526
from skywalking.trace.segment import Segment, SegmentRef
@@ -327,4 +328,7 @@ def get_context() -> SpanContext:
327328
if spans:
328329
return spans[-1].context
329330

331+
if sampling.sampling_service and not sampling.sampling_service.try_sampling():
332+
return NoopContext()
333+
330334
return SpanContext()

tests/unit/test_sampling.py

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
import unittest
19+
20+
from skywalking.sampling.sampling_service import SamplingService, SamplingServiceAsync
21+
22+
23+
class TestSampling(unittest.TestCase):
24+
25+
def test_try_sampling(self):
26+
from skywalking import config
27+
28+
config.sample_n_per_3_secs = 2
29+
sampling_service = SamplingService()
30+
assert sampling_service.try_sampling()
31+
assert sampling_service.try_sampling()
32+
assert not sampling_service.try_sampling()
33+
34+
def test_force_sampled(self):
35+
36+
from skywalking import config
37+
38+
config.sample_n_per_3_secs = 1
39+
sampling_service = SamplingService()
40+
assert sampling_service.try_sampling()
41+
sampling_service.force_sampled()
42+
assert sampling_service.sampling_factor == 2
43+
44+
def test_reset_sampling_factor(self):
45+
from skywalking import config
46+
47+
config.sample_n_per_3_secs = 1
48+
sampling_service = SamplingService()
49+
assert sampling_service.try_sampling()
50+
assert not sampling_service.try_sampling()
51+
sampling_service.reset_sampling_factor()
52+
assert sampling_service.try_sampling()
53+
54+
55+
class TestSamplingAsync(unittest.IsolatedAsyncioTestCase):
56+
57+
async def test_try_sampling(self):
58+
from skywalking import config
59+
60+
config.sample_n_per_3_secs = 2
61+
sampling_service = SamplingServiceAsync()
62+
assert sampling_service.try_sampling()
63+
assert sampling_service.try_sampling()
64+
assert not sampling_service.try_sampling()
65+
66+
async def test_force_sampled(self):
67+
68+
from skywalking import config
69+
70+
config.sample_n_per_3_secs = 1
71+
sampling_service = SamplingServiceAsync()
72+
assert sampling_service.try_sampling()
73+
sampling_service.force_sampled()
74+
assert sampling_service.sampling_factor == 2
75+
76+
async def test_reset_sampling_factor(self):
77+
from skywalking import config
78+
79+
config.sample_n_per_3_secs = 1
80+
sampling_service = SamplingServiceAsync()
81+
assert sampling_service.try_sampling()
82+
assert not sampling_service.try_sampling()
83+
await sampling_service.reset_sampling_factor()
84+
assert sampling_service.try_sampling()
85+
86+
87+
if __name__ == '__main__':
88+
unittest.main()

0 commit comments

Comments
 (0)