Skip to content

Commit 7ae6dcd

Browse files
authored
Improve thread safety to avoid race conditions during shutdown and integration tests. (#66)
* CI-Fix: Improve thread safety in integration tests to avoid race conditions during CI execution * Add sync mechanism between stop function and consumer thread to avoid race conditions * Update CHANGELOG.md and gemspec. Also added manual signaling to stop function in utests * Renamed CountDown variable
1 parent 532fe37 commit 7ae6dcd

File tree

4 files changed

+48
-20
lines changed

4 files changed

+48
-20
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 7.4.1
2+
- Improve thread safety to avoid race conditions during shutdown and integration tests. [#67](https://github.com/logstash-plugins/logstash-integration-rabbitmq/pull/66)
3+
14
## 7.4.0
25
- Removed obsolete `verify_ssl` and `debug` options [#60](https://github.com/logstash-plugins/logstash-integration-rabbitmq/pull/60)
36

lib/logstash/inputs/rabbitmq.rb

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ module Inputs
5959
class RabbitMQ < LogStash::Inputs::Threadable
6060

6161
java_import java.util.concurrent.TimeUnit
62+
java_import java.util.concurrent.CountDownLatch
6263

6364
include ::LogStash::PluginMixins::RabbitMQConnection
6465

@@ -189,6 +190,7 @@ def setup!
189190
connect!
190191
declare_queue!
191192
bind_exchange!
193+
@poison_latch = CountDownLatch.new(1)
192194
@hare_info.channel.prefetch = @prefetch_count
193195
rescue => e
194196
# when encountering an exception during shut-down,
@@ -268,7 +270,11 @@ def internal_queue_consume!
268270
next
269271
end
270272

271-
break if payload == INTERNAL_QUEUE_POISON
273+
if payload == INTERNAL_QUEUE_POISON
274+
@logger.info("RabbitMQ consumer thread received shutdown signal, exiting")
275+
@poison_latch.countDown
276+
break
277+
end
272278

273279
metadata, data = payload
274280
@codec.decode(data) do |event|
@@ -302,11 +308,22 @@ def decorate(event, metadata, data)
302308
end
303309

304310
def stop
305-
@internal_queue.put(INTERNAL_QUEUE_POISON)
311+
stop_consumer_thread
306312
shutdown_consumer
307313
close_connection
308314
end
309315

316+
def stop_consumer_thread
317+
# After sending the poison pill, we need to wait for the consumer thread to actually exit.
318+
# Closing channel right away may lead to messages not being acked properly or a race condition
319+
# where the consumer thread tries to ack a message on a closed channel.
320+
@internal_queue.put(INTERNAL_QUEUE_POISON)
321+
timed_out = !@poison_latch.await(2, TimeUnit::SECONDS)
322+
if timed_out
323+
@logger.warn("Timeout waiting for RabbitMQ consumer thread to terminate")
324+
end
325+
end
326+
310327
def shutdown_consumer
311328
# There are two possible flows to shutdown consumers. When the plugin is the one shutting down, it should send a channel
312329
# cancellation message by invoking channel.basic_cancel(consumer_tag) and waiting for the consumer to terminate

logstash-integration-rabbitmq.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-integration-rabbitmq'
3-
s.version = '7.4.0'
3+
s.version = '7.4.1'
44
s.licenses = ['Apache License (2.0)']
55
s.summary = "Integration with RabbitMQ - input and output plugins"
66
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline "+

spec/inputs/rabbitmq_spec.rb

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
let(:connection) { double("MarchHare Connection") }
3434
let(:channel) { double("Channel") }
3535
let(:exchange) { double("Exchange") }
36-
let(:channel) { double("Channel") }
3736
let(:queue) { double("queue") }
3837

3938
# Doing this in a before block doesn't give us enough control over scope
@@ -76,6 +75,12 @@
7675
allow(consumer).to receive(:consumer_tag).and_return(consumer_tag)
7776
end
7877

78+
after do
79+
# This unit tests don't initialize consumer thread, so sync signal should be manually sent.
80+
instance.instance_variable_get(:@poison_latch).countDown
81+
instance.stop
82+
end
83+
7984
context "with a cancelled consumer" do
8085
before do
8186
allow(consumer).to receive(:cancelled?).and_return(true)
@@ -84,7 +89,6 @@
8489

8590
it "should not call basic_cancel" do
8691
expect(channel).to_not receive(:basic_cancel)
87-
instance.stop
8892
end
8993
end
9094

@@ -96,7 +100,6 @@
96100

97101
it "should not call basic_cancel" do
98102
expect(channel).to_not receive(:basic_cancel)
99-
instance.stop
100103
end
101104
end
102105

@@ -108,13 +111,11 @@
108111

109112
it "should call basic_cancel" do
110113
expect(channel).to receive(:basic_cancel).with(consumer_tag)
111-
instance.stop
112114
end
113115

114116
it "should log terminating info" do
115117
allow(channel).to receive(:basic_cancel).with(consumer_tag)
116118
expect(instance.logger).to receive(:info).with(/Waiting for RabbitMQ consumer to terminate before stopping/, anything)
117-
instance.stop
118119
end
119120
end
120121
end
@@ -384,24 +385,30 @@
384385
let(:hare_info) { instance.instance_variable_get(:@hare_info) }
385386
let(:output_queue) { Queue.new }
386387

387-
# Spawn a connection in the bg and wait up (n) seconds
388+
# Spawn a connection and waits for it to be ready.
388389
def spawn_and_wait(instance)
389390
instance.register
390391

391392
output_queue # materialize in this thread
392393

393-
Thread.new {
394+
@consumer_thread = Thread.new {
394395
instance.run(output_queue)
395396
}
396397

397-
20.times do
398-
instance.send(:connection_open?) ? break : sleep(0.1)
398+
# Ensure that the connection and channel are fully started.
399+
Timeout.timeout(2, Timeout::Error, "Timeout waiting for connection to open and channel to be available") do
400+
until instance.send(:connection_open?) && instance.instance_variable_get(:@hare_info)&.channel
401+
sleep 0.05
402+
end
399403
end
400404

401-
# Extra time to make sure the consumer can attach
402-
# Without this there's a chance the shutdown code will execute
403-
# before consumption begins. This is tricky to do more elegantly
404-
sleep 4
405+
# Ensure that the consumer is fully started.
406+
hare_info = instance.instance_variable_get(:@hare_info)
407+
Timeout.timeout(10, Timeout::Error, "Timeout waiting for channel to have consumers.") do
408+
until hare_info.channel.consumers && !hare_info.channel.consumers.empty?
409+
sleep 0.05
410+
end
411+
end
405412
end
406413

407414
let(:test_connection) { MarchHare.connect(instance.send(:rabbitmq_settings)) }
@@ -418,6 +425,11 @@ def spawn_and_wait(instance)
418425

419426
after do
420427
instance.stop()
428+
# Stop the thread gracefully before tearing down connections.
429+
@consumer_thread.join
430+
# Exchange deletion needs to be placed after thread is finished,
431+
# as the exchange may be dynamically created during the test context initialization.
432+
exchange&.delete if defined?(exchange)
421433
test_channel.close
422434
test_connection.close
423435
end
@@ -440,10 +452,6 @@ def spawn_and_wait(instance)
440452
#let(:queue) { test_channel.queue(queue_name, :auto_delete => true) }
441453
let(:queue_name) { "logstash-input-rabbitmq-#{rand(0xFFFFFFFF)}" }
442454

443-
after do
444-
exchange.delete
445-
end
446-
447455
context "when the message has a payload but no message headers" do
448456
before do
449457
exchange.publish(message)

0 commit comments

Comments
 (0)