Skip to content

Commit a74a943

Browse files
authored
feat: add ingestion_metadata field (#36)
1 parent 8d5699e commit a74a943

File tree

6 files changed

+128
-12
lines changed

6 files changed

+128
-12
lines changed

examples/flask_example/flaskapp.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ def track_revenue(user_id):
4848
@app.route("/flush")
4949
def flush_event():
5050
amp_client.flush()
51-
return f"<p>All events flushed</p>"
51+
return "<p>All events flushed</p>"
5252

5353

5454
if __name__ == "__main__":

src/amplitude/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
from amplitude.client import Amplitude
55
from amplitude.event import BaseEvent, EventOptions, Identify, Revenue, IdentifyEvent, \
6-
GroupIdentifyEvent, RevenueEvent, Plan
6+
GroupIdentifyEvent, RevenueEvent, Plan, IngestionMetadata
77
from amplitude.config import Config
88
from amplitude.constants import PluginType
99
from amplitude.plugin import EventPlugin, DestinationPlugin

src/amplitude/config.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from typing import Optional, Callable
99

1010
from amplitude import constants
11-
from amplitude.event import BaseEvent, Plan
11+
from amplitude.event import BaseEvent, Plan, IngestionMetadata
1212
from amplitude.storage import InMemoryStorageProvider, StorageProvider, Storage
1313

1414

@@ -33,6 +33,7 @@ class Config:
3333
storage_provider (amplitude.storage.StorageProvider, optional): Default to InMemoryStorageProvider.
3434
Provide storage instance for events buffer.
3535
plan (amplitude.event.Plan, optional): Tracking plan information. Default to None.
36+
ingestion_metadata (amplitude.event.IngestionMetadata, optional): Ingestion metadata. Default to None.
3637
3738
Properties:
3839
options: A dictionary contains minimum id length information. None if min_id_length not set.
@@ -55,7 +56,8 @@ def __init__(self, api_key: str = None,
5556
use_batch: bool = False,
5657
server_url: Optional[str] = None,
5758
storage_provider: StorageProvider = InMemoryStorageProvider(),
58-
plan: Plan = None):
59+
plan: Plan = None,
60+
ingestion_metadata: IngestionMetadata = None):
5961
"""The constructor of Config class"""
6062
self.api_key: str = api_key
6163
self._flush_queue_size: int = flush_queue_size
@@ -71,6 +73,7 @@ def __init__(self, api_key: str = None,
7173
self.storage_provider: StorageProvider = storage_provider
7274
self.opt_out: bool = False
7375
self.plan: Plan = plan
76+
self.ingestion_metadata: IngestionMetadata = ingestion_metadata
7477

7578
def get_storage(self) -> Storage:
7679
"""Use configured StorageProvider to create a Storage instance then return.

src/amplitude/event.py

Lines changed: 68 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,11 @@
55
BaseEvent: Basic event class. Subclass of EventOptions.
66
Identify: A class used to create identify and group identify event.
77
IdentifyEvent: A special event class. Used to update user properties without an actual event.
8-
GroupIdentifyEvent: A special event class. Used to update group properties without an actual event
8+
GroupIdentifyEvent: A special event class. Used to update group properties without an actual event.
99
Revenue: A class used to create revenue event.
1010
RevenueEvent: A special event class. Used to record revenue information.
1111
Plan: Tracking plan info includes branch, source, version, version_id.
12+
IngestionMetadata: Ingestion metadata includes source name, source version.
1213
"""
1314

1415
import copy
@@ -71,7 +72,52 @@ def get_plan_body(self):
7172
result[PLAN_KEY_MAPPING[key][0]] = self.__dict__[key]
7273
else:
7374
logger.error(
74-
f"Plan.{key} expected {PLAN_KEY_MAPPING[key][1]} but received {type(self.__dict__[key])}.")
75+
f"{type(self).__name__}.{key} expected {PLAN_KEY_MAPPING[key][1]} but received {type(self.__dict__[key])}.")
76+
return result
77+
78+
79+
INGESTION_METADATA_KEY_MAPPING = {
80+
"source_name": ["source_name", str],
81+
"source_version": ["source_version", str],
82+
}
83+
84+
85+
class IngestionMetadata:
86+
"""IngestionMetadata holds metadata information. Instance of IngestionMetadata class can be value of event's `ingestion_metadata` attribute.
87+
88+
Args:
89+
source_name (str, optional): Name of the ingestion source in metadata.
90+
source_version (str, optional): Version of the ingestion source in metadata.
91+
92+
Methods:
93+
get_body(): return a dict object that contains ingestion metadata information.
94+
"""
95+
96+
def __init__(self, source_name: Optional[str] = None, source_version: Optional[str] = None):
97+
"""The constructor for the IngestionMetadata class
98+
99+
Args:
100+
source_name (str, optional): Name of the ingestion source in metadata.
101+
source_version (str, optional): Version of the ingestion source in metadata.
102+
"""
103+
self.source_name: Optional[str] = source_name
104+
self.source_version: Optional[str] = source_version
105+
106+
def get_body(self):
107+
"""Convert this object instance to dict instance
108+
109+
Returns:
110+
A dictionary with data of this object instance
111+
"""
112+
result = {}
113+
for key in INGESTION_METADATA_KEY_MAPPING:
114+
if not self.__dict__[key]:
115+
continue
116+
if isinstance(self.__dict__[key], INGESTION_METADATA_KEY_MAPPING[key][1]):
117+
result[INGESTION_METADATA_KEY_MAPPING[key][0]] = self.__dict__[key]
118+
else:
119+
logger.error(
120+
f"{type(self).__name__}.{key} expected {INGESTION_METADATA_KEY_MAPPING[key][1]} but received {type(self.__dict__[key])}.")
75121
return result
76122

77123

@@ -113,6 +159,7 @@ def get_plan_body(self):
113159
"insert_id": ["insert_id", str],
114160
"library": ["library", str],
115161
"plan": ["plan", Plan],
162+
"ingestion_metadata": ["ingestion_metadata", IngestionMetadata],
116163
"group_properties": ["group_properties", dict],
117164
"partner_id": ["partner_id", str],
118165
"version_name": ["version_name", str]
@@ -158,6 +205,7 @@ class EventOptions:
158205
insert_id (str, optional): A unique identifier for the event. Events sent with the same insert_id and device_id
159206
we have already seen before within the past 7 days will be deduplicated.
160207
plan (Plan, optional): Tracking plan properties.
208+
ingestion_metadata (IngestionMetadata, optional): Ingestion metadata.
161209
partner_id (str, optional): The partner id.
162210
version_name (str, optional): The version name.
163211
callback (callable, optional): Event level callback method. Triggered when event is sent or failed. Take three
@@ -167,7 +215,7 @@ class EventOptions:
167215
retry (int): The retry attempt of the event instance.
168216
169217
Methods:
170-
get_event_body(): Retrun a dictionary with data of the event instance
218+
get_event_body(): Return a dictionary with data of the event instance
171219
callback(code, message): Trigger callback method of the event instance.
172220
"""
173221

@@ -203,6 +251,7 @@ def __init__(self, user_id: Optional[str] = None,
203251
session_id: Optional[int] = None,
204252
insert_id: Optional[str] = None,
205253
plan: Optional[Plan] = None,
254+
ingestion_metadata: Optional[IngestionMetadata] = None,
206255
partner_id: Optional[str] = None,
207256
version_name: Optional[str] = None,
208257
callback=None):
@@ -240,6 +289,7 @@ def __init__(self, user_id: Optional[str] = None,
240289
self.insert_id: Optional[str] = None
241290
self.library: Optional[str] = None
242291
self.plan: Optional[Plan] = None
292+
self.ingestion_metadata: Optional[IngestionMetadata] = None
243293
self.partner_id: Optional[str] = None
244294
self.version_name: Optional[str] = None
245295
self["user_id"] = user_id
@@ -274,6 +324,7 @@ def __init__(self, user_id: Optional[str] = None,
274324
self["session_id"] = session_id
275325
self["insert_id"] = insert_id
276326
self["plan"] = plan
327+
self["ingestion_metadata"] = ingestion_metadata
277328
self["partner_id"] = partner_id
278329
self["version_name"] = version_name
279330
self.event_callback: Optional[Callable[[EventOptions, int, Optional[str]], None]] = callback
@@ -309,6 +360,8 @@ def get_event_body(self) -> dict:
309360
event_body[value[0]] = self[key]
310361
if "plan" in event_body:
311362
event_body["plan"] = event_body["plan"].get_plan_body()
363+
if "ingestion_metadata" in event_body:
364+
event_body["ingestion_metadata"] = event_body["ingestion_metadata"].get_body()
312365
for properties in ["user_properties", "event_properties", "group_properties"]:
313366
if properties in event_body:
314367
for key, value in event_body[properties].items():
@@ -394,6 +447,7 @@ class BaseEvent(EventOptions):
394447
insert_id (str, optional): A unique identifier for the event. Events sent with the same insert_id and device_id
395448
we have already seen before within the past 7 days will be deduplicated.
396449
plan (Plan, optional): Tracking plan properties.
450+
ingestion_metadata (IngestionMetadata, optional): Ingestion metadata.
397451
partner_id (str, optional): The partner id.
398452
callback (callable, optional): Event level callback method. Triggered when event is sent or failed. Take three
399453
parameters: an event instance, an integer code of response status, an optional string message.
@@ -439,6 +493,7 @@ def __init__(self, event_type: str,
439493
session_id: Optional[int] = None,
440494
insert_id: Optional[str] = None,
441495
plan: Optional[Plan] = None,
496+
ingestion_metadata: Optional[IngestionMetadata] = None,
442497
partner_id: Optional[str] = None,
443498
callback: Optional[Callable[[EventOptions, int, Optional[str]], None]] = None):
444499
"""The constructor of the BaseEvent class"""
@@ -474,6 +529,7 @@ def __init__(self, event_type: str,
474529
session_id=session_id,
475530
insert_id=insert_id,
476531
plan=plan,
532+
ingestion_metadata=ingestion_metadata,
477533
partner_id=partner_id,
478534
callback=callback)
479535
self.event_type: str = event_type
@@ -730,6 +786,7 @@ class GroupIdentifyEvent(BaseEvent):
730786
insert_id (str, optional): A unique identifier for the event. Events sent with the same insert_id and device_id
731787
we have already seen before within the past 7 days will be deduplicated.
732788
plan (Plan, optional): Tracking plan properties.
789+
ingestion_metadata (IngestionMetadata, optional): Ingestion metadata.
733790
partner_id (str, optional): The partner id.
734791
callback (callable, optional): Event level callback method. Triggered when event is sent or failed. Take three
735792
parameters: an event instance, an integer code of response status, an optional string message.
@@ -772,6 +829,7 @@ def __init__(self, user_id: Optional[str] = None,
772829
session_id: Optional[int] = None,
773830
insert_id: Optional[str] = None,
774831
plan: Optional[Plan] = None,
832+
ingestion_metadata: Optional[IngestionMetadata] = None,
775833
partner_id: Optional[str] = None,
776834
callback: Optional[Callable[[EventOptions, int, Optional[str]], None]] = None,
777835
identify_obj: Optional[Identify] = None):
@@ -812,6 +870,7 @@ def __init__(self, user_id: Optional[str] = None,
812870
session_id=session_id,
813871
insert_id=insert_id,
814872
plan=plan,
873+
ingestion_metadata=ingestion_metadata,
815874
partner_id=partner_id,
816875
callback=callback)
817876
if identify_obj:
@@ -862,6 +921,7 @@ class IdentifyEvent(BaseEvent):
862921
insert_id (str, optional): A unique identifier for the event. Events sent with the same insert_id and device_id
863922
we have already seen before within the past 7 days will be deduplicated.
864923
plan (Plan, optional): Tracking plan properties.
924+
ingestion_metadata (IngestionMetadata, optional): Ingestion metadata.
865925
partner_id (str, optional): The partner id.
866926
callback (callable, optional): Event level callback method. Triggered when event is sent or failed. Take three
867927
parameters: an event instance, an integer code of response status, an optional string message.
@@ -904,6 +964,7 @@ def __init__(self, user_id: Optional[str] = None,
904964
session_id: Optional[int] = None,
905965
insert_id: Optional[str] = None,
906966
plan: Optional[Plan] = None,
967+
ingestion_metadata: Optional[IngestionMetadata] = None,
907968
partner_id: Optional[str] = None,
908969
callback: Optional[Callable[[EventOptions, int, Optional[str]], None]] = None,
909970
identify_obj: Optional[Identify] = None):
@@ -943,6 +1004,7 @@ def __init__(self, user_id: Optional[str] = None,
9431004
session_id=session_id,
9441005
insert_id=insert_id,
9451006
plan=plan,
1007+
ingestion_metadata=ingestion_metadata,
9461008
partner_id=partner_id,
9471009
callback=callback)
9481010
if identify_obj:
@@ -1082,6 +1144,7 @@ class RevenueEvent(BaseEvent):
10821144
insert_id (str, optional): A unique identifier for the event. Events sent with the same insert_id and device_id
10831145
we have already seen before within the past 7 days will be deduplicated.
10841146
plan (Plan, optional): Tracking plan properties.
1147+
ingestion_metadata (IngestionMetadata, optional): Ingestion metadata.
10851148
partner_id (str, optional): The partner id.
10861149
callback (callable, optional): Event level callback method. Triggered when event is sent or failed. Take three
10871150
parameters: an event instance, an integer code of response status, an optional string message.
@@ -1124,6 +1187,7 @@ def __init__(self, user_id: Optional[str] = None,
11241187
session_id: Optional[int] = None,
11251188
insert_id: Optional[str] = None,
11261189
plan: Optional[Plan] = None,
1190+
ingestion_metadata: Optional[IngestionMetadata] = None,
11271191
partner_id: Optional[str] = None,
11281192
callback: Optional[Callable[[EventOptions, int, Optional[str]], None]] = None,
11291193
revenue_obj: Optional[Revenue] = None):
@@ -1164,6 +1228,7 @@ def __init__(self, user_id: Optional[str] = None,
11641228
session_id=session_id,
11651229
insert_id=insert_id,
11661230
plan=plan,
1231+
ingestion_metadata=ingestion_metadata,
11671232
partner_id=partner_id,
11681233
callback=callback)
11691234
if revenue_obj:

src/amplitude/plugin.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -179,8 +179,10 @@ class ContextPlugin(Plugin):
179179
180180
Methods:
181181
apply_context_data(event): Add SDK name and version to event.library.
182-
execute(event): Set event default timestamp and insert_id if not set elsewhere.
183-
Add SDK name and version to event.library.
182+
execute(event):
183+
- Set event default timestamp and insert_id if not set elsewhere.
184+
- Add SDK name and version to event.library.
185+
- Mount plan, ingestion_metadata if not yet.
184186
"""
185187

186188
def __init__(self):
@@ -201,7 +203,10 @@ def apply_context_data(self, event: BaseEvent):
201203
event.library = self.context_string
202204

203205
def execute(self, event: BaseEvent) -> BaseEvent:
204-
"""Set event default timestamp and insert_id if not set elsewhere. Add SDK name and version to event.library.
206+
"""
207+
- Set event default timestamp and insert_id if not set elsewhere.
208+
- Add SDK name and version to event.library.
209+
- Mount plan, ingestion_metadata if not yet.
205210
206211
Args:
207212
event (BaseEvent): The event to be processed.
@@ -212,6 +217,8 @@ def execute(self, event: BaseEvent) -> BaseEvent:
212217
event["insert_id"] = str(uuid.uuid4())
213218
if self.configuration.plan and (not event.plan):
214219
event["plan"] = self.configuration.plan
220+
if self.configuration.ingestion_metadata and (not event.ingestion_metadata):
221+
event["ingestion_metadata"] = self.configuration.ingestion_metadata
215222
self.apply_context_data(event)
216223
return event
217224

0 commit comments

Comments
 (0)