Skip to content

Commit ad844b9

Browse files
authored
Merge pull request #305 from Sanketika-Obsrv/data-ingestion-fix
fix #OBS-I479 ingestion batch structure fix
2 parents b78add1 + ce5b774 commit ad844b9

File tree

1 file changed

+9
-9
lines changed

1 file changed

+9
-9
lines changed

api-service/src/controllers/DataIngestion/DataIngestionController.ts

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,34 +39,34 @@ const dataIn = async (req: Request, res: Response) => {
3939
logger.error({ apiId, message: `Dataset with id ${datasetId} not found in live table`, code: "DATASET_NOT_FOUND" })
4040
return ResponseHandler.errorResponse(errorObject.datasetNotFound, req, res);
4141
}
42-
const { entry_topic, dataset_config, api_version } = dataset
42+
const { entry_topic, dataset_config, extraction_config, api_version } = dataset
4343
const entryTopic = api_version !== "v2" ? _.get(dataset_config, "entry_topic") : entry_topic
4444
if (!entryTopic) {
4545
logger.error({ apiId, message: "Entry topic not found", code: "TOPIC_NOT_FOUND" })
4646
return ResponseHandler.errorResponse(errorObject.topicNotFound, req, res);
4747
}
48-
await send(addMetadataToEvents(datasetId, requestBody), entryTopic)
48+
await send(addMetadataToEvents(datasetId, requestBody, extraction_config), entryTopic)
4949
ResponseHandler.successResponse(req, res, { status: 200, data: { message: "Data ingested successfully" } });
5050

5151
}
5252

53-
const addMetadataToEvents = (datasetId: string, payload: any) => {
53+
const addMetadataToEvents = (datasetId: string, payload: any, extraction_config: any) => {
5454
const validData = _.get(payload, "data");
5555
const now = Date.now();
5656
const mid = _.get(payload, "params.msgid");
5757
const source = { id: "api.data.in", version: config?.version, entry_source: "api" };
5858
const obsrvMeta = { syncts: now, flags: {}, timespans: {}, error: {}, source: source };
5959
if (Array.isArray(validData)) {
60-
const payloadRef = validData.map((event: any) => {
61-
const payload = {
62-
event,
60+
const extraction_key: string = _.get(extraction_config, "extraction_key", 'events');
61+
const dedup_key: string = _.get(extraction_config, "dedup_config.dedup_key", 'id');
62+
const payload: any = {
6363
"obsrv_meta": obsrvMeta,
6464
"dataset": datasetId,
6565
"msgid": mid
66-
}
66+
};
67+
payload[extraction_key] = validData;
68+
payload[dedup_key] = mid
6769
return payload;
68-
})
69-
return payloadRef;
7070
}
7171
else {
7272
return ({

0 commit comments

Comments
 (0)