Skip to content

Commit fe0ef5c

Browse files
authored
feat: Log ingester background tasks (#41759)
1 parent 75597fd commit fe0ef5c

File tree

2 files changed

+31
-24
lines changed

2 files changed

+31
-24
lines changed

plugin-server/src/logs-ingestion/logs-ingestion-consumer.test.ts

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,16 @@ describe('LogsIngestionConsumer', () => {
9999
jest.useRealTimers()
100100
})
101101

102+
const waitForBackgroundTasks = async (
103+
promise: Promise<{
104+
backgroundTask?: Promise<any>
105+
}>
106+
) => {
107+
await (
108+
await promise
109+
).backgroundTask
110+
}
111+
102112
describe('general', () => {
103113
it('should have the correct config', () => {
104114
expect(consumer['name']).toEqual('LogsIngestionConsumer')
@@ -130,7 +140,7 @@ describe('LogsIngestionConsumer', () => {
130140
token: team.api_token,
131141
})
132142

133-
await consumer.processKafkaBatch(messages)
143+
await waitForBackgroundTasks(consumer.processKafkaBatch(messages))
134144

135145
expect(forSnapshot(mockProducerObserver.getProducedKafkaMessages())).toMatchSnapshot()
136146
})
@@ -145,7 +155,7 @@ describe('LogsIngestionConsumer', () => {
145155
token: team.api_token,
146156
})
147157

148-
await consumer.processKafkaBatch(messages)
158+
await waitForBackgroundTasks(consumer.processKafkaBatch(messages))
149159

150160
const producedMessages = mockProducerObserver.getProducedKafkaMessages()
151161
expect(producedMessages).toHaveLength(3)
@@ -160,7 +170,7 @@ describe('LogsIngestionConsumer', () => {
160170
// missing token
161171
})
162172

163-
await consumer.processKafkaBatch(messages)
173+
await waitForBackgroundTasks(consumer.processKafkaBatch(messages))
164174

165175
expect(mockProducerObserver.getProducedKafkaMessages()).toHaveLength(0)
166176
})
@@ -171,7 +181,7 @@ describe('LogsIngestionConsumer', () => {
171181
token: 'invalid-token',
172182
})
173183

174-
await consumer.processKafkaBatch(messages)
184+
await waitForBackgroundTasks(consumer.processKafkaBatch(messages))
175185

176186
expect(mockProducerObserver.getProducedKafkaMessages()).toHaveLength(0)
177187
})
@@ -183,7 +193,7 @@ describe('LogsIngestionConsumer', () => {
183193
token: team.api_token,
184194
})
185195

186-
await consumer.processKafkaBatch(messages)
196+
await waitForBackgroundTasks(consumer.processKafkaBatch(messages))
187197

188198
const producedMessages = mockProducerObserver.getProducedKafkaMessages()
189199
expect(forSnapshot(producedMessages)).toMatchSnapshot()
@@ -196,7 +206,7 @@ describe('LogsIngestionConsumer', () => {
196206
team_id: '999',
197207
})
198208

199-
await consumer.processKafkaBatch(messages)
209+
await waitForBackgroundTasks(consumer.processKafkaBatch(messages))
200210
expect(forSnapshot(mockProducerObserver.getProducedKafkaMessages())).toMatchSnapshot()
201211
})
202212

@@ -214,7 +224,7 @@ describe('LogsIngestionConsumer', () => {
214224
} as Message,
215225
]
216226

217-
await consumer.processKafkaBatch(messages)
227+
await waitForBackgroundTasks(consumer.processKafkaBatch(messages))
218228

219229
expect(mockProducerObserver.getProducedMessages()).toHaveLength(0)
220230
expect(logMessageDroppedCounterSpy).toHaveBeenCalledWith({ reason: 'team_not_found' })
@@ -225,7 +235,7 @@ describe('LogsIngestionConsumer', () => {
225235
it('should handle empty batch', async () => {
226236
const result = await consumer.processBatch([])
227237

228-
expect(result.backgroundTask).toBeDefined()
238+
expect(result.backgroundTask).toBeUndefined()
229239
expect(result.messages).toEqual([])
230240
await result.backgroundTask
231241
})
@@ -236,7 +246,7 @@ describe('LogsIngestionConsumer', () => {
236246
token: team.api_token,
237247
})
238248

239-
await consumer.processKafkaBatch(messages)
249+
await waitForBackgroundTasks(consumer.processKafkaBatch(messages))
240250

241251
const producedMessages = mockProducerObserver.getProducedKafkaMessages()
242252
expect(producedMessages).toHaveLength(1)
@@ -253,7 +263,7 @@ describe('LogsIngestionConsumer', () => {
253263
token: team.api_token,
254264
})
255265

256-
await consumer.processKafkaBatch(messages)
266+
await waitForBackgroundTasks(consumer.processKafkaBatch(messages))
257267

258268
const producedMessages = mockProducerObserver.getProducedMessages()
259269
expect(producedMessages).toHaveLength(1)
@@ -296,7 +306,7 @@ describe('LogsIngestionConsumer', () => {
296306
const originalProduce = consumer['kafkaProducer']!.produce
297307
consumer['kafkaProducer']!.produce = jest.fn().mockRejectedValue(new Error('Producer error'))
298308

299-
await expect(consumer.processKafkaBatch(messages)).rejects.toThrow('Producer error')
309+
await expect(waitForBackgroundTasks(consumer.processKafkaBatch(messages))).rejects.toThrow('Producer error')
300310

301311
// Restore original method
302312
consumer['kafkaProducer']!.produce = originalProduce
@@ -310,7 +320,7 @@ describe('LogsIngestionConsumer', () => {
310320
token: team.api_token,
311321
})
312322

313-
await consumer.processKafkaBatch(messages)
323+
await waitForBackgroundTasks(consumer.processKafkaBatch(messages))
314324

315325
const producedMessages = mockProducerObserver.getProducedKafkaMessages()
316326
expect(producedMessages).toHaveLength(1)
@@ -330,7 +340,7 @@ describe('LogsIngestionConsumer', () => {
330340
}),
331341
]
332342

333-
await consumer.processKafkaBatch(messages)
343+
await waitForBackgroundTasks(consumer.processKafkaBatch(messages))
334344

335345
const producedMessages = mockProducerObserver.getProducedKafkaMessages()
336346
expect(producedMessages).toHaveLength(2)
@@ -360,7 +370,7 @@ describe('LogsIngestionConsumer', () => {
360370
token: team.api_token,
361371
})
362372

363-
await consumer.processKafkaBatch(messages)
373+
await waitForBackgroundTasks(consumer.processKafkaBatch(messages))
364374

365375
const producedMessages = mockProducerObserver.getProducedMessages()
366376
expect(producedMessages).toHaveLength(1)
@@ -383,7 +393,7 @@ describe('LogsIngestionConsumer', () => {
383393
} as Message,
384394
]
385395

386-
await consumer.processKafkaBatch(messages)
396+
await waitForBackgroundTasks(consumer.processKafkaBatch(messages))
387397

388398
const producedMessages = mockProducerObserver.getProducedMessages()
389399
expect(producedMessages).toHaveLength(1)

plugin-server/src/logs-ingestion/logs-ingestion-consumer.ts

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -58,16 +58,15 @@ export class LogsIngestionConsumer {
5858

5959
public async processBatch(
6060
messages: LogsIngestionMessage[]
61-
): Promise<{ backgroundTask: Promise<any>; messages: LogsIngestionMessage[] }> {
61+
): Promise<{ backgroundTask?: Promise<any>; messages: LogsIngestionMessage[] }> {
62+
await Promise.resolve() // NOTE: Just to keep the signature consistent as we will need this to be async in the future
6263
if (!messages.length) {
63-
return { backgroundTask: Promise.resolve(), messages: [] }
64+
return { messages: [] }
6465
}
6566

66-
await this.produceValidLogMessages(messages)
67-
6867
return {
6968
// This is all IO so we can set them off in the background and start processing the next batch
70-
backgroundTask: Promise.resolve(),
69+
backgroundTask: this.produceValidLogMessages(messages),
7170
messages,
7271
}
7372
}
@@ -100,9 +99,8 @@ export class LogsIngestionConsumer {
10099
const token = headers.token
101100

102101
if (!token) {
103-
logger.error('missing_token_or_distinct_id')
104-
// Write to DLQ topic maybe?
105-
logMessageDroppedCounter.inc({ reason: 'missing_token_or_distinct_id' })
102+
logger.error('missing_token')
103+
logMessageDroppedCounter.inc({ reason: 'missing_token' })
106104
return
107105
}
108106

@@ -113,7 +111,6 @@ export class LogsIngestionConsumer {
113111
}
114112

115113
if (!team) {
116-
// Write to DLQ topic maybe?
117114
logger.error('team_not_found', { token_with_no_team: token })
118115
logMessageDroppedCounter.inc({ reason: 'team_not_found' })
119116
return

0 commit comments

Comments
 (0)