diff --git a/CHANGELOG.md b/CHANGELOG.md index df73b68..d2f0cd3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 7.4.1 + - Improve thread safety to avoid race conditions during shutdown and integration tests. [#67](https://github.com/logstash-plugins/logstash-integration-rabbitmq/pull/66) + ## 7.4.0 - Removed obsolete `verify_ssl` and `debug` options [#60](https://github.com/logstash-plugins/logstash-integration-rabbitmq/pull/60) diff --git a/lib/logstash/inputs/rabbitmq.rb b/lib/logstash/inputs/rabbitmq.rb index 1654692..e6982b5 100644 --- a/lib/logstash/inputs/rabbitmq.rb +++ b/lib/logstash/inputs/rabbitmq.rb @@ -59,6 +59,7 @@ module Inputs class RabbitMQ < LogStash::Inputs::Threadable java_import java.util.concurrent.TimeUnit + java_import java.util.concurrent.CountDownLatch include ::LogStash::PluginMixins::RabbitMQConnection @@ -189,6 +190,7 @@ def setup! connect! declare_queue! bind_exchange! + @poison_latch = CountDownLatch.new(1) @hare_info.channel.prefetch = @prefetch_count rescue => e # when encountering an exception during shut-down, @@ -268,7 +270,11 @@ def internal_queue_consume! next end - break if payload == INTERNAL_QUEUE_POISON + if payload == INTERNAL_QUEUE_POISON + @logger.info("RabbitMQ consumer thread received shutdown signal, exiting") + @poison_latch.countDown + break + end metadata, data = payload @codec.decode(data) do |event| @@ -302,11 +308,22 @@ def decorate(event, metadata, data) end def stop - @internal_queue.put(INTERNAL_QUEUE_POISON) + stop_consumer_thread shutdown_consumer close_connection end + def stop_consumer_thread + # After sending the poison pill, we need to wait for the consumer thread to actually exit. + # Closing channel right away may lead to messages not being acked properly or a race condition + # where the consumer thread tries to ack a message on a closed channel. + @internal_queue.put(INTERNAL_QUEUE_POISON) + timed_out = !@poison_latch.await(2, TimeUnit::SECONDS) + if timed_out + @logger.warn("Timeout waiting for RabbitMQ consumer thread to terminate") + end + end + def shutdown_consumer # There are two possible flows to shutdown consumers. When the plugin is the one shutting down, it should send a channel # cancellation message by invoking channel.basic_cancel(consumer_tag) and waiting for the consumer to terminate diff --git a/logstash-integration-rabbitmq.gemspec b/logstash-integration-rabbitmq.gemspec index 8ff60d7..16b2c9c 100644 --- a/logstash-integration-rabbitmq.gemspec +++ b/logstash-integration-rabbitmq.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-integration-rabbitmq' - s.version = '7.4.0' + s.version = '7.4.1' s.licenses = ['Apache License (2.0)'] s.summary = "Integration with RabbitMQ - input and output plugins" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline "+ diff --git a/spec/inputs/rabbitmq_spec.rb b/spec/inputs/rabbitmq_spec.rb index 1736970..5e2a2df 100644 --- a/spec/inputs/rabbitmq_spec.rb +++ b/spec/inputs/rabbitmq_spec.rb @@ -33,7 +33,6 @@ let(:connection) { double("MarchHare Connection") } let(:channel) { double("Channel") } let(:exchange) { double("Exchange") } - let(:channel) { double("Channel") } let(:queue) { double("queue") } # Doing this in a before block doesn't give us enough control over scope @@ -76,6 +75,12 @@ allow(consumer).to receive(:consumer_tag).and_return(consumer_tag) end + after do + # This unit tests don't initialize consumer thread, so sync signal should be manually sent. + instance.instance_variable_get(:@poison_latch).countDown + instance.stop + end + context "with a cancelled consumer" do before do allow(consumer).to receive(:cancelled?).and_return(true) @@ -84,7 +89,6 @@ it "should not call basic_cancel" do expect(channel).to_not receive(:basic_cancel) - instance.stop end end @@ -96,7 +100,6 @@ it "should not call basic_cancel" do expect(channel).to_not receive(:basic_cancel) - instance.stop end end @@ -108,13 +111,11 @@ it "should call basic_cancel" do expect(channel).to receive(:basic_cancel).with(consumer_tag) - instance.stop end it "should log terminating info" do allow(channel).to receive(:basic_cancel).with(consumer_tag) expect(instance.logger).to receive(:info).with(/Waiting for RabbitMQ consumer to terminate before stopping/, anything) - instance.stop end end end @@ -384,24 +385,30 @@ let(:hare_info) { instance.instance_variable_get(:@hare_info) } let(:output_queue) { Queue.new } - # Spawn a connection in the bg and wait up (n) seconds + # Spawn a connection and waits for it to be ready. def spawn_and_wait(instance) instance.register output_queue # materialize in this thread - Thread.new { + @consumer_thread = Thread.new { instance.run(output_queue) } - 20.times do - instance.send(:connection_open?) ? break : sleep(0.1) + # Ensure that the connection and channel are fully started. + Timeout.timeout(2, Timeout::Error, "Timeout waiting for connection to open and channel to be available") do + until instance.send(:connection_open?) && instance.instance_variable_get(:@hare_info)&.channel + sleep 0.05 + end end - # Extra time to make sure the consumer can attach - # Without this there's a chance the shutdown code will execute - # before consumption begins. This is tricky to do more elegantly - sleep 4 + # Ensure that the consumer is fully started. + hare_info = instance.instance_variable_get(:@hare_info) + Timeout.timeout(10, Timeout::Error, "Timeout waiting for channel to have consumers.") do + until hare_info.channel.consumers && !hare_info.channel.consumers.empty? + sleep 0.05 + end + end end let(:test_connection) { MarchHare.connect(instance.send(:rabbitmq_settings)) } @@ -418,6 +425,11 @@ def spawn_and_wait(instance) after do instance.stop() + # Stop the thread gracefully before tearing down connections. + @consumer_thread.join + # Exchange deletion needs to be placed after thread is finished, + # as the exchange may be dynamically created during the test context initialization. + exchange&.delete if defined?(exchange) test_channel.close test_connection.close end @@ -440,10 +452,6 @@ def spawn_and_wait(instance) #let(:queue) { test_channel.queue(queue_name, :auto_delete => true) } let(:queue_name) { "logstash-input-rabbitmq-#{rand(0xFFFFFFFF)}" } - after do - exchange.delete - end - context "when the message has a payload but no message headers" do before do exchange.publish(message)