Skip to content

Commit 863ec29

Browse files
Collect & report task metadata for Celery
This additional task metadata will help troubleshooting an issue with some queue time spikes that we're investigating. With each queue we'll add information about the task and the timestamps that were used to calculate queue time, so it can hopefully help us identify the source of the queue time spikes.
1 parent f005893 commit 863ec29

File tree

5 files changed

+75
-4
lines changed

5 files changed

+75
-4
lines changed

judoscale/celery/collector.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -154,10 +154,19 @@ def collect(self) -> List[Metric]:
154154
for queue in self.queues:
155155
if result := self.oldest_task_and_timestamp(queue):
156156
task, timestamp = result
157+
logger.debug(f"Task: {timestamp} | {task}")
157158
if timestamp:
158-
metrics.append(
159-
Metric.for_queue(queue_name=queue, oldest_job_ts=timestamp)
160-
)
159+
metric = Metric.for_queue(queue_name=queue, oldest_job_ts=timestamp)
160+
metric.report_metadata = {
161+
"task": task.get("headers", {}).get("task"),
162+
"id": task.get("headers", {}).get("id"),
163+
"eta": task.get("headers", {}).get("eta"),
164+
"retries": task.get("headers", {}).get("retries"),
165+
"published_at": task.get("properties", {}).get("published_at"),
166+
"timestamp": timestamp,
167+
}
168+
169+
metrics.append(metric)
161170
else:
162171
task_id = task.get("id", None)
163172
logger.warning(

judoscale/core/metric.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ class Metric:
3434
value: int
3535
queue_name: Optional[str] = None
3636
measurement: str = "qt"
37+
report_metadata: Optional[dict] = None
3738

3839
@property
3940
def as_tuple(self) -> Tuple[int, int, str, Optional[str]]:

judoscale/core/reporter.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,11 @@ def _build_report(self, metrics: List[Metric]):
128128
"config": self.config.for_report,
129129
"adapters": dict(adapter.as_tuple for adapter in self.adapters),
130130
"metrics": [metric.as_tuple for metric in metrics],
131+
"metadata": dict(
132+
(metric.queue_name, metric.report_metadata)
133+
for metric in metrics
134+
if metric.report_metadata is not None
135+
),
131136
}
132137

133138

tests/test_collectors.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,43 @@ def mock_lindex(queue, _):
405405
assert metrics[3].queue_name == "foo"
406406
assert metrics[3].value == approx(60000, abs=100)
407407

408+
def test_collect_metadata(self, worker_1, celery):
409+
now = time.time()
410+
celery.connection_for_read().channel().client.scan_iter.return_value = [b"foo"]
411+
celery.connection_for_read().channel().client.lindex.return_value = bytes(
412+
json.dumps(
413+
{
414+
"id": "123abc",
415+
"headers": {
416+
"eta": None,
417+
"retries": 0,
418+
"id": "123abc",
419+
"task": "my.task",
420+
},
421+
"properties": {"published_at": now - 60},
422+
}
423+
),
424+
"utf-8",
425+
)
426+
427+
collector = CeleryMetricsCollector(worker_1, celery)
428+
metrics = collector.collect()
429+
metrics = sorted(metrics, key=lambda m: m.queue_name)
430+
431+
assert len(metrics) == 1
432+
433+
assert metrics[0].measurement == "qt"
434+
assert metrics[0].queue_name == "foo"
435+
assert metrics[0].value == approx(60000, abs=100)
436+
assert metrics[0].report_metadata == {
437+
"id": "123abc",
438+
"task": "my.task",
439+
"eta": None,
440+
"retries": 0,
441+
"published_at": now - 60,
442+
"timestamp": now - 60,
443+
}
444+
408445

409446
class TestRQMetricsCollector:
410447
def test_adapter_config(self, render_worker):

tests/test_reporter.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,29 @@ def test_build_report(self, reporter):
3232
report = reporter._build_report([metric])
3333

3434
assert sorted(list(report.keys())) == sorted(
35-
["adapters", "config", "container", "pid", "metrics"]
35+
["adapters", "config", "container", "pid", "metrics", "metadata"]
3636
)
3737
assert len(report["metrics"]) == 1
3838
assert report["metrics"][0] == (1355314320, 123, "test", None)
39+
assert report["metadata"] == dict()
40+
41+
def test_build_report_metadata(self, reporter):
42+
ts = datetime.fromisoformat("2012-12-12T12:12:00+00:00").timestamp()
43+
44+
metric = Metric(measurement="qt", timestamp=ts, value=123, queue_name="q1")
45+
metric.report_metadata = {"task": "my.task1", "id": "1", "published_at": 999}
46+
47+
metric2 = Metric(measurement="qt", timestamp=ts, value=456, queue_name="q2")
48+
metric2.report_metadata = {"task": "my.task2", "id": "2", "published_at": 987}
49+
50+
metric3 = Metric(measurement="qt", timestamp=ts, value=789, queue_name="nope")
51+
52+
report = reporter._build_report([metric, metric2, metric3])
53+
54+
assert report["metadata"] == {
55+
"q1": {"task": "my.task1", "id": "1", "published_at": 999},
56+
"q2": {"task": "my.task2", "id": "2", "published_at": 987},
57+
}
3958

4059
def test_no_explicit_adapter(self, reporter):
4160
assert len(reporter.adapters) == 1

0 commit comments

Comments
 (0)