From db211536085769dd7c912d08f650ebb3b29181cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81lex=20C=C3=A1mara?= Date: Thu, 30 Oct 2025 09:09:26 +0100 Subject: [PATCH 1/4] CI-Fix: Improve thread safety in integration tests to avoid race conditions during CI execution --- lib/logstash/inputs/rabbitmq.rb | 2 +- spec/inputs/rabbitmq_spec.rb | 32 +++++++++++++++++++------------- 2 files changed, 20 insertions(+), 14 deletions(-) diff --git a/lib/logstash/inputs/rabbitmq.rb b/lib/logstash/inputs/rabbitmq.rb index 1654692..0ec4a6e 100644 --- a/lib/logstash/inputs/rabbitmq.rb +++ b/lib/logstash/inputs/rabbitmq.rb @@ -302,7 +302,7 @@ def decorate(event, metadata, data) end def stop - @internal_queue.put(INTERNAL_QUEUE_POISON) + @internal_queue&.put(INTERNAL_QUEUE_POISON) shutdown_consumer close_connection end diff --git a/spec/inputs/rabbitmq_spec.rb b/spec/inputs/rabbitmq_spec.rb index 1736970..0d6ff26 100644 --- a/spec/inputs/rabbitmq_spec.rb +++ b/spec/inputs/rabbitmq_spec.rb @@ -33,7 +33,6 @@ let(:connection) { double("MarchHare Connection") } let(:channel) { double("Channel") } let(:exchange) { double("Exchange") } - let(:channel) { double("Channel") } let(:queue) { double("queue") } # Doing this in a before block doesn't give us enough control over scope @@ -384,24 +383,30 @@ let(:hare_info) { instance.instance_variable_get(:@hare_info) } let(:output_queue) { Queue.new } - # Spawn a connection in the bg and wait up (n) seconds + # Spawn a connection and waits for it to be ready. def spawn_and_wait(instance) instance.register output_queue # materialize in this thread - Thread.new { + @consumer_thread = Thread.new { instance.run(output_queue) } - 20.times do - instance.send(:connection_open?) ? break : sleep(0.1) + # Ensure that the connection and channel are fully started. + Timeout.timeout(2, Timeout::Error, "Timeout waiting for connection to open and channel to be available") do + until instance.send(:connection_open?) && instance.instance_variable_get(:@hare_info)&.channel + sleep 0.05 + end end - # Extra time to make sure the consumer can attach - # Without this there's a chance the shutdown code will execute - # before consumption begins. This is tricky to do more elegantly - sleep 4 + # Ensure that the consumer is fully started. + hare_info = instance.instance_variable_get(:@hare_info) + Timeout.timeout(10, Timeout::Error, "Timeout waiting for channel to have consumers.") do + until hare_info.channel.consumers && !hare_info.channel.consumers.empty? + sleep 0.05 + end + end end let(:test_connection) { MarchHare.connect(instance.send(:rabbitmq_settings)) } @@ -418,6 +423,11 @@ def spawn_and_wait(instance) after do instance.stop() + # Stop the thread gracefully before tearing down connections. + @consumer_thread.join + # Exchange deletion needs to be placed after thread is finished, + # as the exchange may be dynamically created during the test context initialization. + exchange&.delete if defined?(exchange) test_channel.close test_connection.close end @@ -440,10 +450,6 @@ def spawn_and_wait(instance) #let(:queue) { test_channel.queue(queue_name, :auto_delete => true) } let(:queue_name) { "logstash-input-rabbitmq-#{rand(0xFFFFFFFF)}" } - after do - exchange.delete - end - context "when the message has a payload but no message headers" do before do exchange.publish(message) From d8ebe28ae9818c1994f2d279a3f83de700576a84 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81lex=20C=C3=A1mara?= Date: Thu, 30 Oct 2025 17:09:22 +0100 Subject: [PATCH 2/4] Add sync mechanism between stop function and consumer thread to avoid race conditions --- lib/logstash/inputs/rabbitmq.rb | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/lib/logstash/inputs/rabbitmq.rb b/lib/logstash/inputs/rabbitmq.rb index 0ec4a6e..b644010 100644 --- a/lib/logstash/inputs/rabbitmq.rb +++ b/lib/logstash/inputs/rabbitmq.rb @@ -59,6 +59,7 @@ module Inputs class RabbitMQ < LogStash::Inputs::Threadable java_import java.util.concurrent.TimeUnit + java_import java.util.concurrent.CountDownLatch include ::LogStash::PluginMixins::RabbitMQConnection @@ -189,6 +190,7 @@ def setup! connect! declare_queue! bind_exchange! + @terminated = CountDownLatch.new(1) @hare_info.channel.prefetch = @prefetch_count rescue => e # when encountering an exception during shut-down, @@ -268,7 +270,11 @@ def internal_queue_consume! next end - break if payload == INTERNAL_QUEUE_POISON + if payload == INTERNAL_QUEUE_POISON + @logger.info("RabbitMQ consumer thread received shutdown signal, exiting") + @terminated.countDown + break + end metadata, data = payload @codec.decode(data) do |event| @@ -302,11 +308,22 @@ def decorate(event, metadata, data) end def stop - @internal_queue&.put(INTERNAL_QUEUE_POISON) + stop_consumer_thread shutdown_consumer close_connection end + def stop_consumer_thread + # After sending the poison pill, we need to wait for the consumer thread to actually exit. + # Closing channel right away may lead to messages not being acked properly or a race condition + # where the consumer thread tries to ack a message on a closed channel. + @internal_queue.put(INTERNAL_QUEUE_POISON) + timed_out = !@terminated.await(2, TimeUnit::SECONDS) + if timed_out + @logger.warn("Timeout waiting for RabbitMQ consumer thread to terminate") + end + end + def shutdown_consumer # There are two possible flows to shutdown consumers. When the plugin is the one shutting down, it should send a channel # cancellation message by invoking channel.basic_cancel(consumer_tag) and waiting for the consumer to terminate From 6cc9369b21783e2edc806653ec9cdba39812c7d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81lex=20C=C3=A1mara?= Date: Fri, 31 Oct 2025 12:51:43 +0100 Subject: [PATCH 3/4] Update CHANGELOG.md and gemspec. Also added manual signaling to stop function in utests --- CHANGELOG.md | 3 +++ logstash-integration-rabbitmq.gemspec | 2 +- spec/inputs/rabbitmq_spec.rb | 10 ++++++---- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index df73b68..d2f0cd3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 7.4.1 + - Improve thread safety to avoid race conditions during shutdown and integration tests. [#67](https://github.com/logstash-plugins/logstash-integration-rabbitmq/pull/66) + ## 7.4.0 - Removed obsolete `verify_ssl` and `debug` options [#60](https://github.com/logstash-plugins/logstash-integration-rabbitmq/pull/60) diff --git a/logstash-integration-rabbitmq.gemspec b/logstash-integration-rabbitmq.gemspec index 8ff60d7..16b2c9c 100644 --- a/logstash-integration-rabbitmq.gemspec +++ b/logstash-integration-rabbitmq.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-integration-rabbitmq' - s.version = '7.4.0' + s.version = '7.4.1' s.licenses = ['Apache License (2.0)'] s.summary = "Integration with RabbitMQ - input and output plugins" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline "+ diff --git a/spec/inputs/rabbitmq_spec.rb b/spec/inputs/rabbitmq_spec.rb index 0d6ff26..a8a2656 100644 --- a/spec/inputs/rabbitmq_spec.rb +++ b/spec/inputs/rabbitmq_spec.rb @@ -75,6 +75,12 @@ allow(consumer).to receive(:consumer_tag).and_return(consumer_tag) end + after do + # This unit tests don't initialize consumer thread, so sync signal should be manually sent. + instance.instance_variable_get(:@terminated).countDown + instance.stop + end + context "with a cancelled consumer" do before do allow(consumer).to receive(:cancelled?).and_return(true) @@ -83,7 +89,6 @@ it "should not call basic_cancel" do expect(channel).to_not receive(:basic_cancel) - instance.stop end end @@ -95,7 +100,6 @@ it "should not call basic_cancel" do expect(channel).to_not receive(:basic_cancel) - instance.stop end end @@ -107,13 +111,11 @@ it "should call basic_cancel" do expect(channel).to receive(:basic_cancel).with(consumer_tag) - instance.stop end it "should log terminating info" do allow(channel).to receive(:basic_cancel).with(consumer_tag) expect(instance.logger).to receive(:info).with(/Waiting for RabbitMQ consumer to terminate before stopping/, anything) - instance.stop end end end From 3abd6692af145c923898f39818a27784f0542e4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81lex=20C=C3=A1mara?= Date: Mon, 3 Nov 2025 09:50:40 +0100 Subject: [PATCH 4/4] Renamed CountDown variable --- lib/logstash/inputs/rabbitmq.rb | 6 +++--- spec/inputs/rabbitmq_spec.rb | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/logstash/inputs/rabbitmq.rb b/lib/logstash/inputs/rabbitmq.rb index b644010..e6982b5 100644 --- a/lib/logstash/inputs/rabbitmq.rb +++ b/lib/logstash/inputs/rabbitmq.rb @@ -190,7 +190,7 @@ def setup! connect! declare_queue! bind_exchange! - @terminated = CountDownLatch.new(1) + @poison_latch = CountDownLatch.new(1) @hare_info.channel.prefetch = @prefetch_count rescue => e # when encountering an exception during shut-down, @@ -272,7 +272,7 @@ def internal_queue_consume! if payload == INTERNAL_QUEUE_POISON @logger.info("RabbitMQ consumer thread received shutdown signal, exiting") - @terminated.countDown + @poison_latch.countDown break end @@ -318,7 +318,7 @@ def stop_consumer_thread # Closing channel right away may lead to messages not being acked properly or a race condition # where the consumer thread tries to ack a message on a closed channel. @internal_queue.put(INTERNAL_QUEUE_POISON) - timed_out = !@terminated.await(2, TimeUnit::SECONDS) + timed_out = !@poison_latch.await(2, TimeUnit::SECONDS) if timed_out @logger.warn("Timeout waiting for RabbitMQ consumer thread to terminate") end diff --git a/spec/inputs/rabbitmq_spec.rb b/spec/inputs/rabbitmq_spec.rb index a8a2656..5e2a2df 100644 --- a/spec/inputs/rabbitmq_spec.rb +++ b/spec/inputs/rabbitmq_spec.rb @@ -77,7 +77,7 @@ after do # This unit tests don't initialize consumer thread, so sync signal should be manually sent. - instance.instance_variable_get(:@terminated).countDown + instance.instance_variable_get(:@poison_latch).countDown instance.stop end