Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 7.4.1
- Improve thread safety to avoid race conditions during shutdown and integration tests. [#67](https://github.com/logstash-plugins/logstash-integration-rabbitmq/pull/66)

## 7.4.0
- Removed obsolete `verify_ssl` and `debug` options [#60](https://github.com/logstash-plugins/logstash-integration-rabbitmq/pull/60)

Expand Down
21 changes: 19 additions & 2 deletions lib/logstash/inputs/rabbitmq.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ module Inputs
class RabbitMQ < LogStash::Inputs::Threadable

java_import java.util.concurrent.TimeUnit
java_import java.util.concurrent.CountDownLatch

include ::LogStash::PluginMixins::RabbitMQConnection

Expand Down Expand Up @@ -189,6 +190,7 @@ def setup!
connect!
declare_queue!
bind_exchange!
@poison_latch = CountDownLatch.new(1)
@hare_info.channel.prefetch = @prefetch_count
rescue => e
# when encountering an exception during shut-down,
Expand Down Expand Up @@ -268,7 +270,11 @@ def internal_queue_consume!
next
end

break if payload == INTERNAL_QUEUE_POISON
if payload == INTERNAL_QUEUE_POISON
@logger.info("RabbitMQ consumer thread received shutdown signal, exiting")
@poison_latch.countDown
break
end

metadata, data = payload
@codec.decode(data) do |event|
Expand Down Expand Up @@ -302,11 +308,22 @@ def decorate(event, metadata, data)
end

def stop
@internal_queue.put(INTERNAL_QUEUE_POISON)
stop_consumer_thread
shutdown_consumer
close_connection
end

def stop_consumer_thread
# After sending the poison pill, we need to wait for the consumer thread to actually exit.
# Closing channel right away may lead to messages not being acked properly or a race condition
# where the consumer thread tries to ack a message on a closed channel.
@internal_queue.put(INTERNAL_QUEUE_POISON)
timed_out = !@poison_latch.await(2, TimeUnit::SECONDS)
if timed_out
@logger.warn("Timeout waiting for RabbitMQ consumer thread to terminate")
end
end

def shutdown_consumer
# There are two possible flows to shutdown consumers. When the plugin is the one shutting down, it should send a channel
# cancellation message by invoking channel.basic_cancel(consumer_tag) and waiting for the consumer to terminate
Expand Down
2 changes: 1 addition & 1 deletion logstash-integration-rabbitmq.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-integration-rabbitmq'
s.version = '7.4.0'
s.version = '7.4.1'
s.licenses = ['Apache License (2.0)']
s.summary = "Integration with RabbitMQ - input and output plugins"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline "+
Expand Down
42 changes: 25 additions & 17 deletions spec/inputs/rabbitmq_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
let(:connection) { double("MarchHare Connection") }
let(:channel) { double("Channel") }
let(:exchange) { double("Exchange") }
let(:channel) { double("Channel") }
let(:queue) { double("queue") }

# Doing this in a before block doesn't give us enough control over scope
Expand Down Expand Up @@ -76,6 +75,12 @@
allow(consumer).to receive(:consumer_tag).and_return(consumer_tag)
end

after do
# This unit tests don't initialize consumer thread, so sync signal should be manually sent.
instance.instance_variable_get(:@poison_latch).countDown
instance.stop
end

context "with a cancelled consumer" do
before do
allow(consumer).to receive(:cancelled?).and_return(true)
Expand All @@ -84,7 +89,6 @@

it "should not call basic_cancel" do
expect(channel).to_not receive(:basic_cancel)
instance.stop
end
end

Expand All @@ -96,7 +100,6 @@

it "should not call basic_cancel" do
expect(channel).to_not receive(:basic_cancel)
instance.stop
end
end

Expand All @@ -108,13 +111,11 @@

it "should call basic_cancel" do
expect(channel).to receive(:basic_cancel).with(consumer_tag)
instance.stop
end

it "should log terminating info" do
allow(channel).to receive(:basic_cancel).with(consumer_tag)
expect(instance.logger).to receive(:info).with(/Waiting for RabbitMQ consumer to terminate before stopping/, anything)
instance.stop
end
end
end
Expand Down Expand Up @@ -384,24 +385,30 @@
let(:hare_info) { instance.instance_variable_get(:@hare_info) }
let(:output_queue) { Queue.new }

# Spawn a connection in the bg and wait up (n) seconds
# Spawn a connection and waits for it to be ready.
def spawn_and_wait(instance)
instance.register

output_queue # materialize in this thread

Thread.new {
@consumer_thread = Thread.new {
instance.run(output_queue)
}

20.times do
instance.send(:connection_open?) ? break : sleep(0.1)
# Ensure that the connection and channel are fully started.
Timeout.timeout(2, Timeout::Error, "Timeout waiting for connection to open and channel to be available") do
until instance.send(:connection_open?) && instance.instance_variable_get(:@hare_info)&.channel
sleep 0.05
end
end

# Extra time to make sure the consumer can attach
# Without this there's a chance the shutdown code will execute
# before consumption begins. This is tricky to do more elegantly
sleep 4
# Ensure that the consumer is fully started.
hare_info = instance.instance_variable_get(:@hare_info)
Timeout.timeout(10, Timeout::Error, "Timeout waiting for channel to have consumers.") do
until hare_info.channel.consumers && !hare_info.channel.consumers.empty?
sleep 0.05
end
end
end

let(:test_connection) { MarchHare.connect(instance.send(:rabbitmq_settings)) }
Expand All @@ -418,6 +425,11 @@ def spawn_and_wait(instance)

after do
instance.stop()
# Stop the thread gracefully before tearing down connections.
@consumer_thread.join
# Exchange deletion needs to be placed after thread is finished,
# as the exchange may be dynamically created during the test context initialization.
exchange&.delete if defined?(exchange)
test_channel.close
test_connection.close
end
Expand All @@ -440,10 +452,6 @@ def spawn_and_wait(instance)
#let(:queue) { test_channel.queue(queue_name, :auto_delete => true) }
let(:queue_name) { "logstash-input-rabbitmq-#{rand(0xFFFFFFFF)}" }

after do
exchange.delete
end

context "when the message has a payload but no message headers" do
before do
exchange.publish(message)
Expand Down