Skip to content

Commit 809980b

Browse files
jurosmUros Malesevic
andauthored
Elastic connection resurrect on split destroy (#161)
* Add resurrect client connection on split destroy * Remove request id * Detail comment * Use long appropriate variable names * Fix failing tests * Add test for connection pool resurrection Co-authored-by: Uros Malesevic <[email protected]>
1 parent 4df08f6 commit 809980b

File tree

2 files changed

+86
-46
lines changed

2 files changed

+86
-46
lines changed

lib.js

Lines changed: 56 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,60 @@
55
const split = require('split2')
66
const { Client } = require('@elastic/elasticsearch')
77

8+
function initializeBulkHandler (opts, client, splitter) {
9+
const esVersion = Number(opts['es-version']) || 7
10+
const index = opts.index || 'pino'
11+
const buildIndexName = typeof index === 'function' ? index : null
12+
const type = esVersion >= 7 ? undefined : (opts.type || 'log')
13+
const opType = esVersion >= 7 ? opts.op_type : undefined
14+
15+
// Resurrect connection pool on destroy
16+
splitter.destroy = () => {
17+
if (typeof client.connectionPool.resurrect === 'function') {
18+
client.connectionPool.resurrect({ name: 'elasticsearch-js' })
19+
initializeBulkHandler(opts, client, splitter)
20+
}
21+
}
22+
23+
const bulkInsert = client.helpers.bulk({
24+
datasource: splitter,
25+
flushBytes: opts['flush-bytes'] || 1000,
26+
flushInterval: opts['flush-interval'] || 30000,
27+
refreshOnCompletion: getIndexName(),
28+
onDocument (doc) {
29+
const date = doc.time || doc['@timestamp']
30+
if (opType === 'create') {
31+
doc['@timestamp'] = date
32+
}
33+
34+
return {
35+
index: {
36+
_index: getIndexName(date),
37+
_type: type,
38+
op_type: opType
39+
}
40+
}
41+
},
42+
onDrop (doc) {
43+
const error = new Error('Dropped document')
44+
error.document = doc
45+
splitter.emit('insertError', error)
46+
}
47+
})
48+
49+
bulkInsert.then(
50+
(stats) => splitter.emit('insert', stats),
51+
(err) => splitter.emit('error', err)
52+
)
53+
54+
function getIndexName (time = new Date().toISOString()) {
55+
if (buildIndexName) {
56+
return buildIndexName(time)
57+
}
58+
return index.replace('%{DATE}', time.substring(0, 10))
59+
}
60+
}
61+
862
function pinoElasticSearch (opts) {
963
if (opts['bulk-size']) {
1064
process.emitWarning('The "bulk-size" option has been deprecated, "flush-bytes" instead')
@@ -60,58 +114,14 @@ function pinoElasticSearch (opts) {
60114
cloud: opts.cloud,
61115
ssl: { rejectUnauthorized: opts.rejectUnauthorized }
62116
}
117+
63118
if (opts.Connection) {
64119
clientOpts.Connection = opts.Connection
65120
}
66121

67122
const client = new Client(clientOpts)
68123

69-
const esVersion = Number(opts['es-version']) || 7
70-
const index = opts.index || 'pino'
71-
const buildIndexName = typeof index === 'function' ? index : null
72-
const type = esVersion >= 7 ? undefined : (opts.type || 'log')
73-
const opType = esVersion >= 7 ? opts.op_type : undefined
74-
const b = client.helpers.bulk({
75-
datasource: splitter,
76-
flushBytes: opts['flush-bytes'] || 1000,
77-
flushInterval: opts['flush-interval'] || 30000,
78-
refreshOnCompletion: getIndexName(),
79-
onDocument (doc) {
80-
const date = doc.time || doc['@timestamp']
81-
if (opType === 'create') {
82-
doc['@timestamp'] = date
83-
}
84-
85-
return {
86-
index: {
87-
_index: getIndexName(date),
88-
_type: type,
89-
op_type: opType
90-
}
91-
}
92-
},
93-
onDrop (doc) {
94-
const error = new Error('Dropped document')
95-
error.document = doc
96-
splitter.emit('insertError', error)
97-
}
98-
})
99-
100-
b.then(
101-
(stats) => splitter.emit('insert', stats),
102-
(err) => splitter.emit('error', err)
103-
)
104-
105-
splitter._destroy = function (err, cb) {
106-
b.then(() => cb(err), (e2) => cb(e2 || err))
107-
}
108-
109-
function getIndexName (time = new Date().toISOString()) {
110-
if (buildIndexName) {
111-
return buildIndexName(time)
112-
}
113-
return index.replace('%{DATE}', time.substring(0, 10))
114-
}
124+
initializeBulkHandler(opts, client, splitter)
115125

116126
return splitter
117127
}

test/unit.test.js

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,3 +274,33 @@ test('make sure `@timestamp` is correctly set when `op_type` is `create`', (t) =
274274
const log = pino(instance)
275275
log.info(['info'], 'abc')
276276
})
277+
278+
test('resurrect client connection pool when datasource split is destroyed', (t) => {
279+
let isResurrected = false
280+
const Client = function (config) {}
281+
282+
Client.prototype.helpers = {
283+
bulk: async function (opts) {
284+
if (!isResurrected) {
285+
opts.datasource.destroy()
286+
}
287+
}
288+
}
289+
290+
Client.prototype.connectionPool = {
291+
resurrect: function () {
292+
isResurrected = true
293+
t.end()
294+
}
295+
}
296+
297+
const elastic = proxyquire('../', {
298+
'@elastic/elasticsearch': { Client }
299+
})
300+
301+
const instance = elastic({ ...options })
302+
const log = pino(instance)
303+
304+
const prettyLog = 'Example of a log'
305+
log.info(['info'], prettyLog)
306+
})

0 commit comments

Comments
 (0)