Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/sentry/integrations/data_forwarding/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from collections.abc import Mapping

from sentry.integrations.data_forwarding.amazon_sqs.forwarder import AmazonSQSForwarder
from sentry.integrations.data_forwarding.base import BaseDataForwarder
from sentry.integrations.data_forwarding.segment.forwarder import SegmentForwarder
from sentry.integrations.data_forwarding.splunk.forwarder import SplunkForwarder
from sentry.integrations.types import DataForwarderProviderSlug

FORWARDER_REGISTRY = {
FORWARDER_REGISTRY: Mapping[str, type[BaseDataForwarder]] = {
DataForwarderProviderSlug.SEGMENT.value: SegmentForwarder,
DataForwarderProviderSlug.SQS.value: AmazonSQSForwarder,
DataForwarderProviderSlug.SPLUNK.value: SplunkForwarder,
Expand Down
42 changes: 13 additions & 29 deletions src/sentry/integrations/data_forwarding/amazon_sqs/forwarder.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,36 +9,27 @@
from botocore.client import Config
from botocore.exceptions import ClientError, ParamValidationError

from sentry.api.serializers import serialize
from sentry.integrations.data_forwarding.base import BaseDataForwarder
from sentry.integrations.models.data_forwarder_project import DataForwarderProject
from sentry.integrations.types import DataForwarderProviderSlug
from sentry.services.eventstore.models import Event
from sentry.services.eventstore.models import Event, GroupEvent

logger = logging.getLogger(__name__)

# AWS SQS maximum message size limit
AWS_SQS_MAX_MESSAGE_SIZE = 256 * 1024 # 256 KB

DESCRIPTION = """
Send Sentry events to Amazon SQS.

This integration allows you to forward Sentry events to an Amazon SQS queue for further processing.

Amazon SQS is a fully managed message queuing service that enables you to decouple and scale microservices.
"""


class AmazonSQSForwarder(BaseDataForwarder):
provider = DataForwarderProviderSlug.SQS
rate_limit = None
description = DESCRIPTION
rate_limit = (0, 0)

@classmethod
def get_event_payload(cls, event: Event) -> dict[str, Any]:
return event.as_dict()
def get_event_payload(
self, event: Event | GroupEvent, config: dict[str, Any]
) -> dict[str, Any]:
return serialize(event)

@classmethod
def is_unrecoverable_client_error(cls, error: ClientError) -> bool:
def is_unrecoverable_client_error(self, error: ClientError) -> bool:
error_str = str(error)

# Invalid or missing AWS credentials
Expand Down Expand Up @@ -70,12 +61,11 @@ def is_unrecoverable_client_error(cls, error: ClientError) -> bool:

return False

@classmethod
def send_payload(
cls,
def forward_event(
self,
event: Event | GroupEvent,
payload: dict[str, Any],
config: dict[str, Any],
event: Event,
) -> bool:
queue_url = config["queue_url"]
region = config["region"]
Expand Down Expand Up @@ -125,7 +115,7 @@ def sqs_send_message(message):
payload = {"s3Url": url, "eventID": event.event_id}

except ClientError as e:
if cls.is_unrecoverable_client_error(e):
if self.is_unrecoverable_client_error(e):
return False
raise
except ParamValidationError:
Expand All @@ -141,14 +131,8 @@ def sqs_send_message(message):
try:
sqs_send_message(message)
except ClientError as e:
if cls.is_unrecoverable_client_error(e):
if self.is_unrecoverable_client_error(e):
return False
raise

return True

@classmethod
def forward_event(cls, event: Event, data_forwarder_project: DataForwarderProject) -> bool:
config = data_forwarder_project.get_config()
payload = cls.get_event_payload(event)
return cls.send_payload(payload, config, event)
67 changes: 57 additions & 10 deletions src/sentry/integrations/data_forwarding/base.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,68 @@
from __future__ import annotations

import logging
from abc import ABC, abstractmethod
from typing import ClassVar
from typing import Any, ClassVar

from sentry import ratelimits
from sentry.integrations.models.data_forwarder_project import DataForwarderProject
from sentry.integrations.types import DataForwarderProviderSlug
from sentry.services.eventstore.models import Event
from sentry.services.eventstore.models import Event, GroupEvent

logger = logging.getLogger(__name__)


class BaseDataForwarder(ABC):
"""Base class for all data forwarders."""
"""
Base class for all data forwarders.
Copied directly from legacy plugin system, unchanged.
"""

provider: ClassVar[DataForwarderProviderSlug]
rate_limit: ClassVar[tuple[int, int] | None] = None
description: ClassVar[str] = ""
rate_limit: ClassVar[tuple[int, int]] = (50, 1)
"""
Tuple of (Number of Requests, Window in Seconds)
"""

def get_rl_key(self, event: Event | GroupEvent) -> str:
return f"{self.provider.value}:{event.project.organization_id}"

def is_ratelimited(self, event: Event | GroupEvent) -> bool:
rl_key = self.get_rl_key(event)
limit, window = self.rate_limit
if limit and window and ratelimits.backend.is_limited(rl_key, limit=limit, window=window):
logger.info(
"data_forwarding.skip_rate_limited",
extra={
"event_id": event.event_id,
"issue_id": event.group_id,
"project_id": event.project_id,
"organization_id": event.project.organization_id,
},
)
return True
return False

def initialize_variables(self, event: Event | GroupEvent, config: dict[str, Any]) -> None:
"""This is only necessary for migrating Splunk plugin, needed for rate limiting"""
return

@classmethod
@abstractmethod
def forward_event(cls, event: Event, data_forwarder_project: DataForwarderProject) -> bool:
pass
def forward_event(
self, event: Event | GroupEvent, payload: dict[str, Any], config: dict[str, Any]
) -> bool:
raise NotImplementedError

@abstractmethod
def get_event_payload(
self, event: Event | GroupEvent, config: dict[str, Any]
) -> dict[str, Any]:
raise NotImplementedError

def post_process(
self, event: Event | GroupEvent, data_forwarder_project: DataForwarderProject
) -> None:
config = data_forwarder_project.get_config()
if self.is_ratelimited(event):
return

payload = self.get_event_payload(event=event, config=config)
self.forward_event(event=event, payload=payload, config=config)
79 changes: 26 additions & 53 deletions src/sentry/integrations/data_forwarding/segment/forwarder.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,47 +5,20 @@

from sentry import VERSION, http
from sentry.integrations.data_forwarding.base import BaseDataForwarder
from sentry.integrations.models.data_forwarder_project import DataForwarderProject
from sentry.integrations.types import DataForwarderProviderSlug
from sentry.services.eventstore.models import Event
from sentry.services.eventstore.models import Event, GroupEvent

logger = logging.getLogger(__name__)

DESCRIPTION = """
Send Sentry events to Segment.

This integration allows you to forward Sentry error events to Segment for unified analytics.

Segment is a customer data platform (CDP) that helps you collect, clean, and control your customer data.
"""


class SegmentForwarder(BaseDataForwarder):
provider = DataForwarderProviderSlug.SEGMENT
rate_limit = (200, 1)
description = DESCRIPTION
endpoint: ClassVar[str] = "https://api.segment.io/v1/track"

@classmethod
def validate_event(cls, event: Event) -> bool:
# we currently only support errors
if event.get_event_type() != "error":
return False

# we avoid instantiating interfaces here as they're only going to be
# used if there's a User present
user_interface = event.interfaces.get("user")
if not user_interface:
return False

# if the user id is not present, we can't forward the event
if not user_interface.id:
return False

return True

@classmethod
def get_event_payload(cls, event: Event) -> dict[str, Any]:
def get_event_payload(
self, event: Event | GroupEvent, config: dict[str, Any]
) -> dict[str, Any]:
context = {"library": {"name": "sentry", "version": VERSION}}

props = {
Expand Down Expand Up @@ -95,43 +68,43 @@ def get_event_payload(cls, event: Event) -> dict[str, Any]:
"timestamp": event.datetime.isoformat() + "Z",
}

@classmethod
def send_payload(
cls,
def forward_event(
self,
event: Event | GroupEvent,
payload: dict[str, Any],
config: dict[str, Any],
event: Event,
data_forwarder_project: DataForwarderProject,
) -> bool:
# we currently only support errors
if event.get_event_type() != "error":
return False

# we avoid instantiating interfaces here as they're only going to be
# used if there's a User present
user_interface = event.interfaces.get("user")
if not user_interface:
return False

# if the user id is not present, we can't forward the event
if not user_interface.id:
return False

write_key = config["write_key"]
if not write_key:
return False

try:
with http.build_session() as session:
response = session.post(
cls.endpoint,
self.endpoint,
json=payload,
auth=(write_key, ""),
timeout=10,
)
response.raise_for_status()
return True

except Exception:
logger.exception(
"segment.send_payload.error",
extra={
"event_id": event.event_id,
"project_id": event.project_id,
"data_forwarder_id": data_forwarder_project.data_forwarder_id,
},
extra={"event_id": event.event_id, "project_id": event.project_id},
)
return False

@classmethod
def forward_event(cls, event: Event, data_forwarder_project: DataForwarderProject) -> bool:
if not cls.validate_event(event):
return False

config = data_forwarder_project.get_config()
payload = cls.get_event_payload(event)
return cls.send_payload(payload, config, event, data_forwarder_project)
return True
Loading
Loading