Skip to content

Commit 991fec4

Browse files
authored
ref(data-forwarding): More directly use the plugin forwarding (#103936)
Should resolve some issues with data forwarders - The SQS forwarder was using `as_dict` instead of `serialize` - The rate limiting wasn't being used at all - It was hard to compare the plugin implementation to the data forwarding since the methods had their name changed, I reused the existing ones where possible For Splunk, I opted to use the legacy SplunkApiClient, we can migrate off of it later and it's easier to match it for now.
1 parent 9970495 commit 991fec4

File tree

9 files changed

+174
-191
lines changed

9 files changed

+174
-191
lines changed

src/sentry/integrations/data_forwarding/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1+
from collections.abc import Mapping
2+
13
from sentry.integrations.data_forwarding.amazon_sqs.forwarder import AmazonSQSForwarder
4+
from sentry.integrations.data_forwarding.base import BaseDataForwarder
25
from sentry.integrations.data_forwarding.segment.forwarder import SegmentForwarder
36
from sentry.integrations.data_forwarding.splunk.forwarder import SplunkForwarder
47
from sentry.integrations.types import DataForwarderProviderSlug
58

6-
FORWARDER_REGISTRY = {
9+
FORWARDER_REGISTRY: Mapping[str, type[BaseDataForwarder]] = {
710
DataForwarderProviderSlug.SEGMENT.value: SegmentForwarder,
811
DataForwarderProviderSlug.SQS.value: AmazonSQSForwarder,
912
DataForwarderProviderSlug.SPLUNK.value: SplunkForwarder,

src/sentry/integrations/data_forwarding/amazon_sqs/forwarder.py

Lines changed: 13 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -9,36 +9,27 @@
99
from botocore.client import Config
1010
from botocore.exceptions import ClientError, ParamValidationError
1111

12+
from sentry.api.serializers import serialize
1213
from sentry.integrations.data_forwarding.base import BaseDataForwarder
13-
from sentry.integrations.models.data_forwarder_project import DataForwarderProject
1414
from sentry.integrations.types import DataForwarderProviderSlug
15-
from sentry.services.eventstore.models import Event
15+
from sentry.services.eventstore.models import Event, GroupEvent
1616

1717
logger = logging.getLogger(__name__)
1818

1919
# AWS SQS maximum message size limit
2020
AWS_SQS_MAX_MESSAGE_SIZE = 256 * 1024 # 256 KB
2121

22-
DESCRIPTION = """
23-
Send Sentry events to Amazon SQS.
24-
25-
This integration allows you to forward Sentry events to an Amazon SQS queue for further processing.
26-
27-
Amazon SQS is a fully managed message queuing service that enables you to decouple and scale microservices.
28-
"""
29-
3022

3123
class AmazonSQSForwarder(BaseDataForwarder):
3224
provider = DataForwarderProviderSlug.SQS
33-
rate_limit = None
34-
description = DESCRIPTION
25+
rate_limit = (0, 0)
3526

36-
@classmethod
37-
def get_event_payload(cls, event: Event) -> dict[str, Any]:
38-
return event.as_dict()
27+
def get_event_payload(
28+
self, event: Event | GroupEvent, config: dict[str, Any]
29+
) -> dict[str, Any]:
30+
return serialize(event)
3931

40-
@classmethod
41-
def is_unrecoverable_client_error(cls, error: ClientError) -> bool:
32+
def is_unrecoverable_client_error(self, error: ClientError) -> bool:
4233
error_str = str(error)
4334

4435
# Invalid or missing AWS credentials
@@ -70,12 +61,11 @@ def is_unrecoverable_client_error(cls, error: ClientError) -> bool:
7061

7162
return False
7263

73-
@classmethod
74-
def send_payload(
75-
cls,
64+
def forward_event(
65+
self,
66+
event: Event | GroupEvent,
7667
payload: dict[str, Any],
7768
config: dict[str, Any],
78-
event: Event,
7969
) -> bool:
8070
queue_url = config["queue_url"]
8171
region = config["region"]
@@ -125,7 +115,7 @@ def sqs_send_message(message):
125115
payload = {"s3Url": url, "eventID": event.event_id}
126116

127117
except ClientError as e:
128-
if cls.is_unrecoverable_client_error(e):
118+
if self.is_unrecoverable_client_error(e):
129119
return False
130120
raise
131121
except ParamValidationError:
@@ -141,14 +131,8 @@ def sqs_send_message(message):
141131
try:
142132
sqs_send_message(message)
143133
except ClientError as e:
144-
if cls.is_unrecoverable_client_error(e):
134+
if self.is_unrecoverable_client_error(e):
145135
return False
146136
raise
147137

148138
return True
149-
150-
@classmethod
151-
def forward_event(cls, event: Event, data_forwarder_project: DataForwarderProject) -> bool:
152-
config = data_forwarder_project.get_config()
153-
payload = cls.get_event_payload(event)
154-
return cls.send_payload(payload, config, event)
Lines changed: 58 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,69 @@
1-
from __future__ import annotations
2-
1+
import logging
32
from abc import ABC, abstractmethod
4-
from typing import ClassVar
3+
from typing import Any, ClassVar
54

5+
from sentry import ratelimits
66
from sentry.integrations.models.data_forwarder_project import DataForwarderProject
77
from sentry.integrations.types import DataForwarderProviderSlug
8-
from sentry.services.eventstore.models import Event
8+
from sentry.services.eventstore.models import Event, GroupEvent
9+
10+
logger = logging.getLogger(__name__)
911

1012

1113
class BaseDataForwarder(ABC):
12-
"""Base class for all data forwarders."""
14+
"""
15+
Base class for all data forwarders.
16+
Copied directly from legacy plugin system, unchanged.
17+
"""
1318

1419
provider: ClassVar[DataForwarderProviderSlug]
15-
rate_limit: ClassVar[tuple[int, int] | None] = None
16-
description: ClassVar[str] = ""
20+
rate_limit: ClassVar[tuple[int, int]] = (50, 1)
21+
"""
22+
Tuple of (Number of Requests, Window in Seconds)
23+
"""
24+
25+
def get_rl_key(self, event: Event | GroupEvent) -> str:
26+
return f"{self.provider.value}:{event.project.organization_id}"
27+
28+
def is_ratelimited(self, event: Event | GroupEvent) -> bool:
29+
rl_key = self.get_rl_key(event)
30+
limit, window = self.rate_limit
31+
if limit and window and ratelimits.backend.is_limited(rl_key, limit=limit, window=window):
32+
logger.info(
33+
"data_forwarding.skip_rate_limited",
34+
extra={
35+
"event_id": event.event_id,
36+
"issue_id": event.group_id,
37+
"project_id": event.project_id,
38+
"organization_id": event.project.organization_id,
39+
},
40+
)
41+
return True
42+
return False
43+
44+
def initialize_variables(self, event: Event | GroupEvent, config: dict[str, Any]) -> None:
45+
"""This is only necessary for migrating Splunk plugin, needed for rate limiting"""
46+
return
1747

18-
@classmethod
1948
@abstractmethod
20-
def forward_event(cls, event: Event, data_forwarder_project: DataForwarderProject) -> bool:
21-
pass
49+
def forward_event(
50+
self, event: Event | GroupEvent, payload: dict[str, Any], config: dict[str, Any]
51+
) -> bool:
52+
raise NotImplementedError
53+
54+
@abstractmethod
55+
def get_event_payload(
56+
self, event: Event | GroupEvent, config: dict[str, Any]
57+
) -> dict[str, Any]:
58+
raise NotImplementedError
59+
60+
def post_process(
61+
self, event: Event | GroupEvent, data_forwarder_project: DataForwarderProject
62+
) -> None:
63+
config = data_forwarder_project.get_config()
64+
self.initialize_variables(event, config)
65+
if self.is_ratelimited(event):
66+
return
67+
68+
payload = self.get_event_payload(event=event, config=config)
69+
self.forward_event(event=event, payload=payload, config=config)

src/sentry/integrations/data_forwarding/segment/forwarder.py

Lines changed: 26 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -5,47 +5,20 @@
55

66
from sentry import VERSION, http
77
from sentry.integrations.data_forwarding.base import BaseDataForwarder
8-
from sentry.integrations.models.data_forwarder_project import DataForwarderProject
98
from sentry.integrations.types import DataForwarderProviderSlug
10-
from sentry.services.eventstore.models import Event
9+
from sentry.services.eventstore.models import Event, GroupEvent
1110

1211
logger = logging.getLogger(__name__)
1312

14-
DESCRIPTION = """
15-
Send Sentry events to Segment.
16-
17-
This integration allows you to forward Sentry error events to Segment for unified analytics.
18-
19-
Segment is a customer data platform (CDP) that helps you collect, clean, and control your customer data.
20-
"""
21-
2213

2314
class SegmentForwarder(BaseDataForwarder):
2415
provider = DataForwarderProviderSlug.SEGMENT
2516
rate_limit = (200, 1)
26-
description = DESCRIPTION
2717
endpoint: ClassVar[str] = "https://api.segment.io/v1/track"
2818

29-
@classmethod
30-
def validate_event(cls, event: Event) -> bool:
31-
# we currently only support errors
32-
if event.get_event_type() != "error":
33-
return False
34-
35-
# we avoid instantiating interfaces here as they're only going to be
36-
# used if there's a User present
37-
user_interface = event.interfaces.get("user")
38-
if not user_interface:
39-
return False
40-
41-
# if the user id is not present, we can't forward the event
42-
if not user_interface.id:
43-
return False
44-
45-
return True
46-
47-
@classmethod
48-
def get_event_payload(cls, event: Event) -> dict[str, Any]:
19+
def get_event_payload(
20+
self, event: Event | GroupEvent, config: dict[str, Any]
21+
) -> dict[str, Any]:
4922
context = {"library": {"name": "sentry", "version": VERSION}}
5023

5124
props = {
@@ -95,43 +68,43 @@ def get_event_payload(cls, event: Event) -> dict[str, Any]:
9568
"timestamp": event.datetime.isoformat() + "Z",
9669
}
9770

98-
@classmethod
99-
def send_payload(
100-
cls,
71+
def forward_event(
72+
self,
73+
event: Event | GroupEvent,
10174
payload: dict[str, Any],
10275
config: dict[str, Any],
103-
event: Event,
104-
data_forwarder_project: DataForwarderProject,
10576
) -> bool:
77+
# we currently only support errors
78+
if event.get_event_type() != "error":
79+
return False
80+
81+
# we avoid instantiating interfaces here as they're only going to be
82+
# used if there's a User present
83+
user_interface = event.interfaces.get("user")
84+
if not user_interface:
85+
return False
86+
87+
# if the user id is not present, we can't forward the event
88+
if not user_interface.id:
89+
return False
90+
10691
write_key = config["write_key"]
92+
if not write_key:
93+
return False
10794

10895
try:
10996
with http.build_session() as session:
11097
response = session.post(
111-
cls.endpoint,
98+
self.endpoint,
11299
json=payload,
113100
auth=(write_key, ""),
114101
timeout=10,
115102
)
116103
response.raise_for_status()
117-
return True
118-
119104
except Exception:
120105
logger.exception(
121106
"segment.send_payload.error",
122-
extra={
123-
"event_id": event.event_id,
124-
"project_id": event.project_id,
125-
"data_forwarder_id": data_forwarder_project.data_forwarder_id,
126-
},
107+
extra={"event_id": event.event_id, "project_id": event.project_id},
127108
)
128109
return False
129-
130-
@classmethod
131-
def forward_event(cls, event: Event, data_forwarder_project: DataForwarderProject) -> bool:
132-
if not cls.validate_event(event):
133-
return False
134-
135-
config = data_forwarder_project.get_config()
136-
payload = cls.get_event_payload(event)
137-
return cls.send_payload(payload, config, event, data_forwarder_project)
110+
return True

0 commit comments

Comments
 (0)