Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/sentry/integrations/data_forwarding/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from collections.abc import Mapping

from sentry.integrations.data_forwarding.amazon_sqs.forwarder import AmazonSQSForwarder
from sentry.integrations.data_forwarding.base import BaseDataForwarder
from sentry.integrations.data_forwarding.segment.forwarder import SegmentForwarder
from sentry.integrations.data_forwarding.splunk.forwarder import SplunkForwarder
from sentry.integrations.types import DataForwarderProviderSlug

FORWARDER_REGISTRY = {
FORWARDER_REGISTRY: Mapping[str, type[BaseDataForwarder]] = {
DataForwarderProviderSlug.SEGMENT.value: SegmentForwarder,
DataForwarderProviderSlug.SQS.value: AmazonSQSForwarder,
DataForwarderProviderSlug.SPLUNK.value: SplunkForwarder,
Expand Down
42 changes: 13 additions & 29 deletions src/sentry/integrations/data_forwarding/amazon_sqs/forwarder.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,36 +9,27 @@
from botocore.client import Config
from botocore.exceptions import ClientError, ParamValidationError

from sentry.api.serializers import serialize
from sentry.integrations.data_forwarding.base import BaseDataForwarder
from sentry.integrations.models.data_forwarder_project import DataForwarderProject
from sentry.integrations.types import DataForwarderProviderSlug
from sentry.services.eventstore.models import Event
from sentry.services.eventstore.models import Event, GroupEvent

logger = logging.getLogger(__name__)

# AWS SQS maximum message size limit
AWS_SQS_MAX_MESSAGE_SIZE = 256 * 1024 # 256 KB

DESCRIPTION = """
Send Sentry events to Amazon SQS.

This integration allows you to forward Sentry events to an Amazon SQS queue for further processing.

Amazon SQS is a fully managed message queuing service that enables you to decouple and scale microservices.
"""


class AmazonSQSForwarder(BaseDataForwarder):
provider = DataForwarderProviderSlug.SQS
rate_limit = None
description = DESCRIPTION
rate_limit = (0, 0)

@classmethod
def get_event_payload(cls, event: Event) -> dict[str, Any]:
return event.as_dict()
def get_event_payload(
self, event: Event | GroupEvent, config: dict[str, Any]
) -> dict[str, Any]:
return serialize(event)

@classmethod
def is_unrecoverable_client_error(cls, error: ClientError) -> bool:
def is_unrecoverable_client_error(self, error: ClientError) -> bool:
error_str = str(error)

# Invalid or missing AWS credentials
Expand Down Expand Up @@ -70,12 +61,11 @@ def is_unrecoverable_client_error(cls, error: ClientError) -> bool:

return False

@classmethod
def send_payload(
cls,
def forward_event(
self,
event: Event | GroupEvent,
payload: dict[str, Any],
config: dict[str, Any],
event: Event,
) -> bool:
queue_url = config["queue_url"]
region = config["region"]
Expand Down Expand Up @@ -125,7 +115,7 @@ def sqs_send_message(message):
payload = {"s3Url": url, "eventID": event.event_id}

except ClientError as e:
if cls.is_unrecoverable_client_error(e):
if self.is_unrecoverable_client_error(e):
return False
raise
except ParamValidationError:
Expand All @@ -141,14 +131,8 @@ def sqs_send_message(message):
try:
sqs_send_message(message)
except ClientError as e:
if cls.is_unrecoverable_client_error(e):
if self.is_unrecoverable_client_error(e):
return False
raise

return True

@classmethod
def forward_event(cls, event: Event, data_forwarder_project: DataForwarderProject) -> bool:
config = data_forwarder_project.get_config()
payload = cls.get_event_payload(event)
return cls.send_payload(payload, config, event)
68 changes: 58 additions & 10 deletions src/sentry/integrations/data_forwarding/base.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,69 @@
from __future__ import annotations

import logging
from abc import ABC, abstractmethod
from typing import ClassVar
from typing import Any, ClassVar

from sentry import ratelimits
from sentry.integrations.models.data_forwarder_project import DataForwarderProject
from sentry.integrations.types import DataForwarderProviderSlug
from sentry.services.eventstore.models import Event
from sentry.services.eventstore.models import Event, GroupEvent

logger = logging.getLogger(__name__)


class BaseDataForwarder(ABC):
"""Base class for all data forwarders."""
"""
Base class for all data forwarders.
Copied directly from legacy plugin system, unchanged.
"""

provider: ClassVar[DataForwarderProviderSlug]
rate_limit: ClassVar[tuple[int, int] | None] = None
description: ClassVar[str] = ""
rate_limit: ClassVar[tuple[int, int]] = (50, 1)
"""
Tuple of (Number of Requests, Window in Seconds)
"""

def get_rl_key(self, event: Event | GroupEvent) -> str:
return f"{self.provider.value}:{event.project.organization_id}"

def is_ratelimited(self, event: Event | GroupEvent) -> bool:
rl_key = self.get_rl_key(event)
limit, window = self.rate_limit
if limit and window and ratelimits.backend.is_limited(rl_key, limit=limit, window=window):
logger.info(
"data_forwarding.skip_rate_limited",
extra={
"event_id": event.event_id,
"issue_id": event.group_id,
"project_id": event.project_id,
"organization_id": event.project.organization_id,
},
)
return True
return False

def initialize_variables(self, event: Event | GroupEvent, config: dict[str, Any]) -> None:
"""This is only necessary for migrating Splunk plugin, needed for rate limiting"""
return

@classmethod
@abstractmethod
def forward_event(cls, event: Event, data_forwarder_project: DataForwarderProject) -> bool:
pass
def forward_event(
self, event: Event | GroupEvent, payload: dict[str, Any], config: dict[str, Any]
) -> bool:
raise NotImplementedError

@abstractmethod
def get_event_payload(
self, event: Event | GroupEvent, config: dict[str, Any]
) -> dict[str, Any]:
raise NotImplementedError

def post_process(
self, event: Event | GroupEvent, data_forwarder_project: DataForwarderProject
) -> None:
config = data_forwarder_project.get_config()
self.initialize_variables(event, config)
if self.is_ratelimited(event):
return

payload = self.get_event_payload(event=event, config=config)
self.forward_event(event=event, payload=payload, config=config)
79 changes: 26 additions & 53 deletions src/sentry/integrations/data_forwarding/segment/forwarder.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,47 +5,20 @@

from sentry import VERSION, http
from sentry.integrations.data_forwarding.base import BaseDataForwarder
from sentry.integrations.models.data_forwarder_project import DataForwarderProject
from sentry.integrations.types import DataForwarderProviderSlug
from sentry.services.eventstore.models import Event
from sentry.services.eventstore.models import Event, GroupEvent

logger = logging.getLogger(__name__)

DESCRIPTION = """
Send Sentry events to Segment.

This integration allows you to forward Sentry error events to Segment for unified analytics.

Segment is a customer data platform (CDP) that helps you collect, clean, and control your customer data.
"""


class SegmentForwarder(BaseDataForwarder):
provider = DataForwarderProviderSlug.SEGMENT
rate_limit = (200, 1)
description = DESCRIPTION
endpoint: ClassVar[str] = "https://api.segment.io/v1/track"

@classmethod
def validate_event(cls, event: Event) -> bool:
# we currently only support errors
if event.get_event_type() != "error":
return False

# we avoid instantiating interfaces here as they're only going to be
# used if there's a User present
user_interface = event.interfaces.get("user")
if not user_interface:
return False

# if the user id is not present, we can't forward the event
if not user_interface.id:
return False

return True

@classmethod
def get_event_payload(cls, event: Event) -> dict[str, Any]:
def get_event_payload(
self, event: Event | GroupEvent, config: dict[str, Any]
) -> dict[str, Any]:
context = {"library": {"name": "sentry", "version": VERSION}}

props = {
Expand Down Expand Up @@ -95,43 +68,43 @@ def get_event_payload(cls, event: Event) -> dict[str, Any]:
"timestamp": event.datetime.isoformat() + "Z",
}

@classmethod
def send_payload(
cls,
def forward_event(
self,
event: Event | GroupEvent,
payload: dict[str, Any],
config: dict[str, Any],
event: Event,
data_forwarder_project: DataForwarderProject,
) -> bool:
# we currently only support errors
if event.get_event_type() != "error":
return False

# we avoid instantiating interfaces here as they're only going to be
# used if there's a User present
user_interface = event.interfaces.get("user")
if not user_interface:
return False

# if the user id is not present, we can't forward the event
if not user_interface.id:
return False

write_key = config["write_key"]
if not write_key:
return False

try:
with http.build_session() as session:
response = session.post(
cls.endpoint,
self.endpoint,
json=payload,
auth=(write_key, ""),
timeout=10,
)
response.raise_for_status()
return True

except Exception:
logger.exception(
"segment.send_payload.error",
extra={
"event_id": event.event_id,
"project_id": event.project_id,
"data_forwarder_id": data_forwarder_project.data_forwarder_id,
},
extra={"event_id": event.event_id, "project_id": event.project_id},
)
return False

@classmethod
def forward_event(cls, event: Event, data_forwarder_project: DataForwarderProject) -> bool:
if not cls.validate_event(event):
return False

config = data_forwarder_project.get_config()
payload = cls.get_event_payload(event)
return cls.send_payload(payload, config, event, data_forwarder_project)
return True
Loading
Loading