From ed9bb2726533fe6781b887f71c3cacf7d0da8c8f Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Wed, 1 Oct 2025 09:52:49 +0200 Subject: [PATCH 01/13] Add drop_error_types option to not retry after certain error types --- docs/index.asciidoc | 12 +++ .../elasticsearch/api_configs.rb | 4 + .../plugin_mixins/elasticsearch/common.rb | 4 +- spec/unit/outputs/elasticsearch_spec.rb | 73 +++++++++++++++++++ 4 files changed, 92 insertions(+), 1 deletion(-) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index 1cb062d1..780ee380 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -644,6 +644,18 @@ If you don't set a value for this option: - for elasticsearch clusters 8.x: no value will be used; - for elasticsearch clusters 7.x: the value of '_doc' will be used. +[id="plugins-{type}s-{plugin}-drop_error_types"] +===== `drop_error_types` + + * Value type is <> + * Default value is `[]` + +In most cases, a bulk request is retried when Elasticsearch returns an error. +This config option defines a list of error types for which the bulk request will not be retried. +A warning message will be logged indicating that the request had failed, unless the error type is +listed in the <> config option. +Note that the events are not added to the Dead Letter Queue (DLQ), regardless of whether one is enabled. + [id="plugins-{type}s-{plugin}-ecs_compatibility"] ===== `ecs_compatibility` diff --git a/lib/logstash/plugin_mixins/elasticsearch/api_configs.rb b/lib/logstash/plugin_mixins/elasticsearch/api_configs.rb index 29cd4fb8..0d5f95d6 100644 --- a/lib/logstash/plugin_mixins/elasticsearch/api_configs.rb +++ b/lib/logstash/plugin_mixins/elasticsearch/api_configs.rb @@ -204,6 +204,10 @@ module APIConfigs # if enabled, failed index name interpolation events go into dead letter queue. :dlq_on_failed_indexname_interpolation => { :validate => :boolean, :default => true }, + # The bulk request will not be retried for these error types; the events will be dropped. + # The events won't be added to the DLQ either. + :drop_error_types => { :validate => :string, :list => true, :default => [] }, + # Obsolete Settings :ssl => { :obsolete => "Set 'ssl_enabled' instead." }, :ssl_certificate_verification => { :obsolete => "Set 'ssl_verification_mode' instead." }, diff --git a/lib/logstash/plugin_mixins/elasticsearch/common.rb b/lib/logstash/plugin_mixins/elasticsearch/common.rb index 977ef204..0ba00533 100644 --- a/lib/logstash/plugin_mixins/elasticsearch/common.rb +++ b/lib/logstash/plugin_mixins/elasticsearch/common.rb @@ -278,16 +278,18 @@ def submit(actions) status = action_props["status"] error = action_props["error"] + type = error["type"] if error action = actions[idx] # Retry logic: If it is success, we move on. If it is a failure, we have 3 paths: # - For 409, we log and drop. there is nothing we can do + # - For any error types set in the 'drop_error_types' config, log and drop. # - For a mapping error, we send to dead letter queue for a human to intervene at a later point. # - For everything else there's mastercard. Yep, and we retry indefinitely. This should fix #572 and other transient network issues if DOC_SUCCESS_CODES.include?(status) @document_level_metrics.increment(:successes) next - elsif DOC_CONFLICT_CODE == status + elsif DOC_CONFLICT_CODE == status || @drop_error_types.include?(type) @document_level_metrics.increment(:non_retryable_failures) @logger.warn "Failed action", status: status, action: action, response: response if log_failure_type?(error) next diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index 5d1d6628..babf4dd5 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -1499,6 +1499,79 @@ end end + context 'drop_error_types config option' do + + let(:options) { super().merge('drop_error_types' => ['role_restriction_exception']) } + + let(:events) { [ LogStash::Event.new("foo" => "bar") ] } + + let(:dlq_writer) { subject.instance_variable_get(:@dlq_writer) } + + let(:error_code) { 403 } + + let(:bulk_response) do + { + "took"=>1, "ingest_took"=>11, "errors"=>true, "items"=> + [{ + "index"=>{"_index"=>"bar", "_type"=>"_doc", "_id"=>'bar', "status" => error_code, + "error"=>{"type" => "role_restriction_exception", "reason" => "TEST" } + } + }] + } + end + + before(:each) do + allow(subject.client).to receive(:bulk_send).and_return(bulk_response) + end + + context 'DLQ is enabled' do + + it 'does not write the event to the DLQ' do + allow(subject).to receive(:dlq_enabled?).and_return(true) + expect(dlq_writer).not_to receive(:write) + + event_action_tuples = subject.map_events(events) + subject.send(:submit, event_action_tuples) + end + end + + context 'DLQ is not enabled' do + it 'does not write the event to the DLQ' do + allow(subject).to receive(:dlq_enabled?).and_return(false) + expect(dlq_writer).not_to receive(:write) + + event_action_tuples = subject.map_events(events) + subject.send(:submit, event_action_tuples) + end + end + + context 'the error type is not in `silence_errors_in_log`' do + + let(:logger) { subject.logger } + + it 'logs the error' do + expect(logger).to receive(:warn).with(a_string_including("Failed action"), anything).and_call_original + + event_action_tuples = subject.map_events(events) + subject.send(:submit, event_action_tuples) + end + end + + context 'the error type is in `silence_errors_in_log`' do + + let(:logger) { subject.logger } + + let(:options) { super().merge('silence_errors_in_log' => ['role_restriction_exception']) } + + it 'does not log the error' do + expect(logger).not_to receive(:warn) + + event_action_tuples = subject.map_events(events) + subject.send(:submit, event_action_tuples) + end + end + end + describe "custom headers" do let(:manticore_options) { subject.client.pool.adapter.manticore.instance_variable_get(:@options) } From 0d2f8f38b24c983279d0cedc98599607821b0b51 Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Wed, 1 Oct 2025 09:58:52 +0200 Subject: [PATCH 02/13] Add code example to docs --- docs/index.asciidoc | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index 780ee380..ad24039f 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -656,6 +656,13 @@ A warning message will be logged indicating that the request had failed, unless listed in the <> config option. Note that the events are not added to the Dead Letter Queue (DLQ), regardless of whether one is enabled. +[source,ruby] + output { + elasticsearch { + drop_error_types => ["role_restriction_exception"] + } + } + [id="plugins-{type}s-{plugin}-ecs_compatibility"] ===== `ecs_compatibility` From 05bd467a5d832fb2e4e502e5bbda0ff74060dd64 Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Wed, 1 Oct 2025 10:15:58 +0200 Subject: [PATCH 03/13] Use variable for error type in specs --- spec/unit/outputs/elasticsearch_spec.rb | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index babf4dd5..9edc9e01 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -1501,7 +1501,9 @@ context 'drop_error_types config option' do - let(:options) { super().merge('drop_error_types' => ['role_restriction_exception']) } + let(:error_type) { 'role_restriction_exception' } + + let(:options) { super().merge('drop_error_types' => [error_type]) } let(:events) { [ LogStash::Event.new("foo" => "bar") ] } @@ -1514,7 +1516,7 @@ "took"=>1, "ingest_took"=>11, "errors"=>true, "items"=> [{ "index"=>{"_index"=>"bar", "_type"=>"_doc", "_id"=>'bar', "status" => error_code, - "error"=>{"type" => "role_restriction_exception", "reason" => "TEST" } + "error"=>{"type" => error_type, "reason" => "TEST" } } }] } @@ -1536,6 +1538,7 @@ end context 'DLQ is not enabled' do + it 'does not write the event to the DLQ' do allow(subject).to receive(:dlq_enabled?).and_return(false) expect(dlq_writer).not_to receive(:write) @@ -1561,7 +1564,7 @@ let(:logger) { subject.logger } - let(:options) { super().merge('silence_errors_in_log' => ['role_restriction_exception']) } + let(:options) { super().merge('silence_errors_in_log' => [error_type]) } it 'does not log the error' do expect(logger).not_to receive(:warn) From 36517f845c5f7400403d9821aa2bb91579c73515 Mon Sep 17 00:00:00 2001 From: Emily S Date: Wed, 1 Oct 2025 16:58:11 +0200 Subject: [PATCH 04/13] Update lib/logstash/plugin_mixins/elasticsearch/api_configs.rb MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: João Duarte --- lib/logstash/plugin_mixins/elasticsearch/api_configs.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/logstash/plugin_mixins/elasticsearch/api_configs.rb b/lib/logstash/plugin_mixins/elasticsearch/api_configs.rb index 0d5f95d6..b8773b2e 100644 --- a/lib/logstash/plugin_mixins/elasticsearch/api_configs.rb +++ b/lib/logstash/plugin_mixins/elasticsearch/api_configs.rb @@ -204,7 +204,7 @@ module APIConfigs # if enabled, failed index name interpolation events go into dead letter queue. :dlq_on_failed_indexname_interpolation => { :validate => :boolean, :default => true }, - # The bulk request will not be retried for these error types; the events will be dropped. + # Failures on actions from a bulk request will not be retried for these error types; the events will be dropped. # The events won't be added to the DLQ either. :drop_error_types => { :validate => :string, :list => true, :default => [] }, From 7ab12e166fd625afffdbef40307d4f3a6b005c8c Mon Sep 17 00:00:00 2001 From: Emily S Date: Wed, 1 Oct 2025 16:58:27 +0200 Subject: [PATCH 05/13] Update docs/index.asciidoc MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: João Duarte --- docs/index.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index ad24039f..3363ec1b 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -659,7 +659,7 @@ Note that the events are not added to the Dead Letter Queue (DLQ), regardless of [source,ruby] output { elasticsearch { - drop_error_types => ["role_restriction_exception"] + drop_error_types => ["index_closed_exception"] } } From 98d27780c0f8d23ded9b3608d8b3e1ba56e90bd3 Mon Sep 17 00:00:00 2001 From: Emily S Date: Wed, 1 Oct 2025 17:00:26 +0200 Subject: [PATCH 06/13] Update docs/index.asciidoc MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: João Duarte --- docs/index.asciidoc | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index 3363ec1b..3df69bb8 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -650,11 +650,10 @@ If you don't set a value for this option: * Value type is <> * Default value is `[]` -In most cases, a bulk request is retried when Elasticsearch returns an error. -This config option defines a list of error types for which the bulk request will not be retried. -A warning message will be logged indicating that the request had failed, unless the error type is +Lists the set of error types for which individual bulk request actions will not be retried. Unless an individual - document level - action returns 409 or an error from this list, failures will be retried indefinitely. +A warning message will be logged indicating that the action failed, unless the error type is listed in the <> config option. -Note that the events are not added to the Dead Letter Queue (DLQ), regardless of whether one is enabled. +Note that the events are not added to the Dead Letter Queue (DLQ), regardless of whether it is enabled. [source,ruby] output { From 8c8219d329576977d9793c6628493db20d7899b6 Mon Sep 17 00:00:00 2001 From: Emily S Date: Wed, 1 Oct 2025 17:00:58 +0200 Subject: [PATCH 07/13] Update lib/logstash/plugin_mixins/elasticsearch/common.rb MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: João Duarte --- lib/logstash/plugin_mixins/elasticsearch/common.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/logstash/plugin_mixins/elasticsearch/common.rb b/lib/logstash/plugin_mixins/elasticsearch/common.rb index 0ba00533..8e7f9240 100644 --- a/lib/logstash/plugin_mixins/elasticsearch/common.rb +++ b/lib/logstash/plugin_mixins/elasticsearch/common.rb @@ -281,7 +281,7 @@ def submit(actions) type = error["type"] if error action = actions[idx] - # Retry logic: If it is success, we move on. If it is a failure, we have 3 paths: + # Retry logic: If it is success, we move on. If it is a failure, we have the following paths: # - For 409, we log and drop. there is nothing we can do # - For any error types set in the 'drop_error_types' config, log and drop. # - For a mapping error, we send to dead letter queue for a human to intervene at a later point. From 38ddf9994e8058a05e8bad01b9e20b8e764ef856 Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Wed, 1 Oct 2025 17:10:24 +0200 Subject: [PATCH 08/13] Add drop_error_types to the table of options --- docs/index.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index 3df69bb8..3611c04b 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -373,6 +373,7 @@ Please check out <> for details. | <> |<>|No | <> |<>|No | <> |<>|No +| <> |<>|No | <> | <>|No | <> |<>|No | <> |<>|No From c5a71016315075446f3a6bbfe1ecd2ab4f23eebd Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Wed, 1 Oct 2025 17:21:40 +0200 Subject: [PATCH 09/13] Refactor specs after code review --- spec/unit/outputs/elasticsearch_spec.rb | 30 ++++++++++++------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index 9edc9e01..c81adba6 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -1499,9 +1499,9 @@ end end - context 'drop_error_types config option' do + describe 'drop_error_types' do - let(:error_type) { 'role_restriction_exception' } + let(:error_type) { 'index_closed_exception' } let(:options) { super().merge('drop_error_types' => [error_type]) } @@ -1511,6 +1511,10 @@ let(:error_code) { 403 } + let(:event_action_tuples) { subject.map_events(events) } + + let(:logger) { subject.logger } + let(:bulk_response) do { "took"=>1, "ingest_took"=>11, "errors"=>true, "items"=> @@ -1528,48 +1532,42 @@ context 'DLQ is enabled' do - it 'does not write the event to the DLQ' do + before(:each) do allow(subject).to receive(:dlq_enabled?).and_return(true) - expect(dlq_writer).not_to receive(:write) + end - event_action_tuples = subject.map_events(events) + it 'does not write the event to the DLQ' do + expect(dlq_writer).not_to receive(:write) subject.send(:submit, event_action_tuples) end end context 'DLQ is not enabled' do - it 'does not write the event to the DLQ' do + before(:each) do allow(subject).to receive(:dlq_enabled?).and_return(false) - expect(dlq_writer).not_to receive(:write) + end - event_action_tuples = subject.map_events(events) + it 'does not write the event to the DLQ' do + expect(dlq_writer).not_to receive(:write) subject.send(:submit, event_action_tuples) end end context 'the error type is not in `silence_errors_in_log`' do - let(:logger) { subject.logger } - it 'logs the error' do expect(logger).to receive(:warn).with(a_string_including("Failed action"), anything).and_call_original - - event_action_tuples = subject.map_events(events) subject.send(:submit, event_action_tuples) end end context 'the error type is in `silence_errors_in_log`' do - let(:logger) { subject.logger } - let(:options) { super().merge('silence_errors_in_log' => [error_type]) } it 'does not log the error' do expect(logger).not_to receive(:warn) - - event_action_tuples = subject.map_events(events) subject.send(:submit, event_action_tuples) end end From d91af0c75eb2b0e5cfaff27ee0ce8f77f686dda0 Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Thu, 2 Oct 2025 14:05:07 +0200 Subject: [PATCH 10/13] Test more specific code paths --- spec/unit/outputs/elasticsearch_spec.rb | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index c81adba6..7a71e2e8 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -1513,8 +1513,6 @@ let(:event_action_tuples) { subject.map_events(events) } - let(:logger) { subject.logger } - let(:bulk_response) do { "took"=>1, "ingest_took"=>11, "errors"=>true, "items"=> @@ -1532,9 +1530,7 @@ context 'DLQ is enabled' do - before(:each) do - allow(subject).to receive(:dlq_enabled?).and_return(true) - end + let(:options) { super().merge("dlq_custom_codes" => [403]) } it 'does not write the event to the DLQ' do expect(dlq_writer).not_to receive(:write) @@ -1548,16 +1544,16 @@ allow(subject).to receive(:dlq_enabled?).and_return(false) end - it 'does not write the event to the DLQ' do - expect(dlq_writer).not_to receive(:write) - subject.send(:submit, event_action_tuples) + it 'does not retry indexing the event' do + expect(subject).to receive(:submit).with(event_action_tuples).once.and_call_original + subject.send(:retrying_submit, event_action_tuples) end end context 'the error type is not in `silence_errors_in_log`' do it 'logs the error' do - expect(logger).to receive(:warn).with(a_string_including("Failed action"), anything).and_call_original + expect(subject.logger).to receive(:warn).with(a_string_including("Failed action"), anything) subject.send(:submit, event_action_tuples) end end @@ -1566,8 +1562,12 @@ let(:options) { super().merge('silence_errors_in_log' => [error_type]) } + before(:each) do + # ensure that neither warn nor info is called on the logger by using a test double + subject.instance_variable_set("@logger", double('logger')) + end + it 'does not log the error' do - expect(logger).not_to receive(:warn) subject.send(:submit, event_action_tuples) end end From bc410107c3411bb9b46533783725894959e083c9 Mon Sep 17 00:00:00 2001 From: Emily Stolfo Date: Mon, 6 Oct 2025 08:58:40 +0200 Subject: [PATCH 11/13] Update CHANGELOG and bump version in gemspec --- CHANGELOG.md | 3 +++ logstash-output-elasticsearch.gemspec | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 88779ffc..0887c05e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 12.0.8 +- Add drop_error_types config option to not retry after certain error types [#1228](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1228) + ## 12.0.7 - Support both, encoded and non encoded api-key formats on plugin configuration [#1223](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1223) diff --git a/logstash-output-elasticsearch.gemspec b/logstash-output-elasticsearch.gemspec index 8569ae72..6c3a1bb9 100644 --- a/logstash-output-elasticsearch.gemspec +++ b/logstash-output-elasticsearch.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-output-elasticsearch' - s.version = '12.0.7' + s.version = '12.0.8' s.licenses = ['apache-2.0'] s.summary = "Stores logs in Elasticsearch" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" From 06cfbd66805a030501fac4f97be2b1920c0573e0 Mon Sep 17 00:00:00 2001 From: Emily S Date: Mon, 6 Oct 2025 10:15:16 +0200 Subject: [PATCH 12/13] Update CHANGELOG.md MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: João Duarte --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0887c05e..cac004e9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,4 @@ -## 12.0.8 +## 12.1.0 - Add drop_error_types config option to not retry after certain error types [#1228](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1228) ## 12.0.7 From 0d163d5a0fa0583ed88464e22a49c470910cf1b3 Mon Sep 17 00:00:00 2001 From: Emily S Date: Mon, 6 Oct 2025 10:15:26 +0200 Subject: [PATCH 13/13] Update logstash-output-elasticsearch.gemspec MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: João Duarte --- logstash-output-elasticsearch.gemspec | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/logstash-output-elasticsearch.gemspec b/logstash-output-elasticsearch.gemspec index 6c3a1bb9..f884bdeb 100644 --- a/logstash-output-elasticsearch.gemspec +++ b/logstash-output-elasticsearch.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-output-elasticsearch' - s.version = '12.0.8' + s.version = '12.1.0' s.licenses = ['apache-2.0'] s.summary = "Stores logs in Elasticsearch" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"