Skip to content

Commit a305a6e

Browse files
authored
perf(eventstream): Add caching layer, raise sort (#103756)
Two changes: 1. Add a caching layer so that we can avoid hitting the DB for hot groups. 2. We don't need the sort in 95% of cases. Let's raise that into Python and only do it when necessary.
1 parent f644ccf commit a305a6e

File tree

2 files changed

+96
-36
lines changed

2 files changed

+96
-36
lines changed
Lines changed: 79 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
1-
from collections.abc import Iterable
1+
from collections import defaultdict
2+
from collections.abc import Iterable, Mapping
3+
from datetime import UTC, datetime, timedelta
4+
from typing import Any
25

36
import sentry_sdk
7+
from django.core.cache import cache
48
from django.db.models import Q, QuerySet
59

610
from sentry.models.groupredirect import GroupRedirect
@@ -14,44 +18,92 @@
1418
SIZE_THRESHOLD_FOR_CLICKHOUSE = 2500
1519

1620

21+
def _build_group_redirect_by_group_id_cache_key(group_id: str | int) -> str:
22+
return f"groupredirectsforgroupid:{group_id}"
23+
24+
1725
def _get_all_related_redirects_query(
1826
group_ids: set[str | int],
19-
) -> QuerySet[GroupRedirect, tuple[int, int]]:
20-
return (
21-
GroupRedirect.objects.filter(
22-
Q(group_id__in=group_ids) | Q(previous_group_id__in=group_ids)
23-
).values_list("group_id", "previous_group_id")
24-
# This order returns the newest redirects first. i.e. we're implicitly dropping
25-
# the oldest redirects if we have >THRESHOLD. We choose to drop the oldest
26-
# because they're least likely to have data in retention.
27-
# Technically id != date_added, but it's a close appx (& much faster).
28-
.order_by("-id")
27+
) -> QuerySet[GroupRedirect, Any]:
28+
return GroupRedirect.objects.filter(
29+
Q(group_id__in=group_ids) | Q(previous_group_id__in=group_ids)
30+
).values_list("date_added", "group_id", "previous_group_id", named=True)
31+
32+
33+
def _try_get_from_cache(
34+
group_ids: Iterable[str | int],
35+
) -> tuple[set[tuple[str | int, datetime]], set[str | int]]:
36+
"""
37+
CACHE STRUCTURE:
38+
group_id ==> set[(group_id, date_added)]
39+
40+
Returns (all merged IDs from cache hits, all redirect IDs, all uncached input IDs)
41+
"""
42+
# CACHE STRUCTURE:
43+
# group_id ==> set[ tuple[group_id, redirect_id] ]
44+
id_to_keys = {
45+
group_id: _build_group_redirect_by_group_id_cache_key(group_id) for group_id in group_ids
46+
}
47+
cache_results: Mapping[str | int, set[tuple[str | int, datetime]]] = cache.get_many(
48+
id_to_keys.values()
2949
)
3050

51+
cached_data = set().union(*cache_results.values())
52+
uncached_group_ids = {
53+
group_id for group_id in group_ids if id_to_keys[group_id] not in cache_results.keys()
54+
}
55+
56+
return (cached_data, uncached_group_ids)
57+
3158

3259
def get_all_merged_group_ids(
3360
group_ids: Iterable[str | int], threshold=SIZE_THRESHOLD_FOR_CLICKHOUSE
3461
) -> set[str | int]:
3562
with sentry_sdk.start_span(op="get_all_merged_group_ids") as span:
36-
group_id_set = set(group_ids)
37-
all_related_rows = _get_all_related_redirects_query(group_id_set)
63+
# Initialize all IDs with a future time to ensure they aren't filtered out.
64+
running_data = {
65+
(group_id, datetime.now(UTC) + timedelta(minutes=1)) for group_id in group_ids
66+
}
67+
68+
# Step 1: Try to get data from cache
69+
cached_data, uncached_group_ids = _try_get_from_cache(group_ids)
70+
running_data.update(cached_data)
71+
72+
# Step 2: Get unordered uncached data from Postgres
73+
all_related_rows = _get_all_related_redirects_query(uncached_group_ids)
74+
id_to_related = defaultdict(set)
3875

39-
threshold_breaker_set = None
76+
for row in all_related_rows:
77+
if row.date_added is None:
78+
continue
79+
running_data.add((row.group_id, row.date_added))
80+
running_data.add((row.previous_group_id, row.date_added))
4081

41-
for r in all_related_rows:
42-
group_id_set.update(r)
82+
id_to_related[row.group_id].add((row.previous_group_id, row.date_added))
83+
id_to_related[row.previous_group_id].add((row.group_id, row.date_added))
4384

44-
# We only want to set the threshold_breaker the first time that we cross
45-
# the threshold.
46-
if threshold_breaker_set is None and len(group_id_set) >= threshold:
47-
# Because we're incrementing the size of group_id_set by either one or two
48-
# each iteration, it's fine if we're a bit over. That's negligible compared
49-
# to the scale-of-thousands Clickhouse threshold.
50-
threshold_breaker_set = group_id_set.copy()
85+
# Step 3: Set cache-missed data into cache
86+
cache.set_many(
87+
data={
88+
_build_group_redirect_by_group_id_cache_key(group_id): id_to_related[group_id]
89+
for group_id in uncached_group_ids
90+
},
91+
timeout=300, # 5 minutes
92+
)
5193

52-
out = group_id_set if threshold_breaker_set is None else threshold_breaker_set
94+
# Step 4: If and only if result size is greater than threshold, sort by
95+
# date_added and only return newest threshold # of results.
96+
output_set = {datum[0] for datum in running_data}
97+
span.set_data("true_group_id_len", len(output_set))
5398

54-
span.set_data("true_group_id_len", len(group_id_set))
55-
span.set_data("returned_group_id_len", len(out))
99+
if len(output_set) > threshold:
100+
# Sort by datetime, decreasing, and then take first threshold results
101+
output_set = {
102+
datum[0]
103+
for datum in sorted(running_data, key=lambda datum: datum[1], reverse=True)[
104+
:threshold
105+
]
106+
}
107+
span.set_data("returned_group_id_len", len(output_set))
56108

57-
return out
109+
return output_set

tests/sentry/services/eventstore/test_query_preprocessing.py

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22

33
from sentry.models.groupredirect import GroupRedirect
44
from sentry.services.eventstore.query_preprocessing import (
5-
_get_all_related_redirects_query,
5+
_build_group_redirect_by_group_id_cache_key,
6+
_try_get_from_cache,
67
get_all_merged_group_ids,
78
)
89
from sentry.testutils.cases import TestCase
@@ -16,7 +17,7 @@ def setUp(self) -> None:
1617
self.g1 = self.create_group(id=1)
1718
self.g2 = self.create_group(id=2)
1819
self.g3 = self.create_group(id=3)
19-
self.create_group(id=4)
20+
self.g4 = self.create_group(id=4)
2021

2122
self.gr31 = GroupRedirect.objects.create(
2223
id=10001,
@@ -33,13 +34,6 @@ def setUp(self) -> None:
3334
date_added=datetime.now(UTC) - timedelta(hours=1),
3435
)
3536

36-
def test_get_all_related_groups_query(self) -> None:
37-
"""
38-
What we want is for this to return the newest redirects first.
39-
What we're technically doing is taking the redirects with the highest IDs.
40-
"""
41-
assert _get_all_related_redirects_query({self.g1.id})[0] == (self.g1.id, self.g2.id)
42-
4337
def test_get_all_merged_group_ids(self) -> None:
4438
assert get_all_merged_group_ids([self.g1.id]) == {self.g1.id, self.g2.id, self.g3.id}
4539
assert get_all_merged_group_ids([self.g2.id]) == {self.g1.id, self.g2.id}
@@ -67,3 +61,17 @@ def test_threshold(self) -> None:
6761
)
6862

6963
assert len(get_all_merged_group_ids([group.id], local_threshold)) <= local_threshold + 2
64+
65+
def test_cache(self) -> None:
66+
from django.core.cache import cache
67+
68+
cache.set(
69+
_build_group_redirect_by_group_id_cache_key(self.g1.id),
70+
{(self.g2.id, self.gr21.date_added), (self.g3.id, self.gr31.date_added)},
71+
)
72+
73+
res = _try_get_from_cache({self.g1.id, self.g4.id})
74+
assert res == (
75+
{(self.g2.id, self.gr21.date_added), (self.g3.id, self.gr31.date_added)},
76+
{self.g4.id},
77+
)

0 commit comments

Comments
 (0)