Skip to content

Commit 3fd7053

Browse files
authored
Merge pull request #104 from logstash-plugins/fix-advanced-config-mode-params
Advanced mode uses individual configs properly.
2 parents 4e49756 + ac2f23c commit 3fd7053

File tree

4 files changed

+41
-13
lines changed

4 files changed

+41
-13
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 1.5.3
2+
- Fix: With `config_mode => 'advanced'`, event hub-specific settings (`initial_position`, `max_batch_size`, `prefetch_count`, `receive_timeout`, `initial_position_look_back`) were being ignored and replaced with global defaults. These settings are now correctly applied per event hub [#104](https://github.com/logstash-plugins/logstash-input-azure_event_hubs/pull/104)
3+
14
## 1.5.2
25
- Updated JWT dependency [#101](https://github.com/logstash-plugins/logstash-input-azure_event_hubs/pull/101)
36

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.5.2
1+
1.5.3

lib/logstash/inputs/azure_event_hubs.rb

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -417,12 +417,12 @@ def run(queue)
417417
event_processor_host = create_in_memory_event_processor_host(event_hub, event_hub_name, scheduled_executor_service)
418418
end
419419
options = EventProcessorOptions.new
420-
options.setMaxBatchSize(max_batch_size)
421-
options.setPrefetchCount(prefetch_count)
422-
options.setReceiveTimeOut(Duration.ofSeconds(receive_timeout))
420+
options.setMaxBatchSize(event_hub['max_batch_size'])
421+
options.setPrefetchCount(event_hub['prefetch_count'])
422+
options.setReceiveTimeOut(Duration.ofSeconds(event_hub['receive_timeout']))
423423

424424
options.setExceptionNotification(LogStash::Inputs::Azure::ErrorNotificationHandler.new)
425-
case @initial_position
425+
case event_hub['initial_position']
426426
when 'beginning'
427427
msg = "Configuring Event Hub #{event_hub_name} to read events all events."
428428
@logger.debug("If this is the initial read... " + msg) if event_hub['storage_connection']
@@ -434,10 +434,10 @@ def run(queue)
434434
@logger.info(msg) unless event_hub['storage_connection']
435435
options.setInitialPositionProvider(EventProcessorOptions::EndOfStreamInitialPositionProvider.new(options))
436436
when 'look_back'
437-
msg = "Configuring Event Hub #{event_hub_name} to read events starting at 'now - #{@initial_position_look_back}' seconds."
437+
msg = "Configuring Event Hub #{event_hub_name} to read events starting at 'now - #{event_hub['initial_position_look_back']}' seconds."
438438
@logger.debug("If this is the initial read... " + msg) if event_hub['storage_connection']
439439
@logger.info(msg) unless event_hub['storage_connection']
440-
options.setInitialPositionProvider(LogStash::Inputs::Azure::LookBackPositionProvider.new(@initial_position_look_back))
440+
options.setInitialPositionProvider(LogStash::Inputs::Azure::LookBackPositionProvider.new(event_hub['initial_position_look_back']))
441441
end
442442
event_processor_host.registerEventProcessorFactory(LogStash::Inputs::Azure::ProcessorFactory.new(queue, event_hub['codec'], event_hub['checkpoint_interval'], self.method(:decorate), event_hub['decorate_events']), options)
443443
.when_complete(lambda {|x, e|

spec/inputs/azure_event_hub_spec.rb

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -175,34 +175,59 @@
175175
# same named event hub with different configuration is allowed
176176
{'event_hub_name0' => {
177177
'event_hub_connection' => 'Endpoint=sb://...',
178+
'storage_connection' => 'DefaultEndpointsProtocol=https;AccountName=...',
178179
'consumer_group' => 'ls'}}
179180
],
180181
'codec' => 'plain',
181182
'consumer_group' => 'default_consumer_group',
182183
'max_batch_size' => 21,
184+
'prefetch_count' => 250,
185+
'receive_timeout' => 90,
186+
'initial_position' => 'beginning',
187+
'initial_position_look_back' => 7200,
188+
'checkpoint_interval' => 15,
189+
'decorate_events' => false,
183190
'threads' => 9
184191
}
185192
end
186193
it_behaves_like "an exploded Event Hub config", 1
187-
it "it explodes the 2cnd advanced config event hub correctly" do
194+
195+
it "it explodes the second advanced config event hub correctly (with individual and inherited settings)" do
188196
exploded_config = input.event_hubs_exploded
189197
expect(exploded_config[1]['event_hubs'].size).to be == 1 #always 1 in the exploded form
190198
expect(exploded_config[1]['event_hubs'][0]).to eql('event_hub_name1')
191199
expect(exploded_config[1]['event_hub_connections'][0].value).to eql('1Endpoint=sb://...')
192200
expect(exploded_config[1]['storage_connection'].value).to eql('1DefaultEndpointsProtocol=https;AccountName=...')
193201
expect(exploded_config[1]['threads']).to be == 9
194-
expect(exploded_config[1]['codec'].class.to_s).to eq("LogStash::Codecs::JSON") # different between configs
195-
expect(exploded_config[1]['consumer_group']).to eql('cg1') # override global
196-
expect(exploded_config[1]['max_batch_size']).to be == 21 # filled from global
197-
expect(exploded_config[1]['prefetch_count']).to be == 300 # default
202+
expect(exploded_config[1]['codec'].class.to_s).to eq("LogStash::Codecs::JSON")
203+
expect(exploded_config[1]['consumer_group']).to eql('cg1')
204+
expect(exploded_config[1]['max_batch_size']).to be == 21
205+
expect(exploded_config[1]['prefetch_count']).to be == 250
198206
expect(exploded_config[1]['receive_timeout']).to be == 41
199207
expect(exploded_config[1]['initial_position']).to eql('end')
200-
expect(exploded_config[1]['initial_position_look_back']).to be == 86400 # default
208+
expect(exploded_config[1]['initial_position_look_back']).to be == 7200
201209
expect(exploded_config[1]['checkpoint_interval']).to be == 61
202210
expect(exploded_config[1]['decorate_events']).to be_falsy
203211
expect(exploded_config[1]['storage_container']).to eq('alt_container')
204212
end
205213

214+
it "it explodes the third advanced config event hub correctly (mostly inherited settings)" do
215+
exploded_config = input.event_hubs_exploded
216+
expect(exploded_config[2]['event_hubs'].size).to be == 1
217+
expect(exploded_config[2]['event_hubs'][0]).to eql('event_hub_name0')
218+
expect(exploded_config[2]['event_hub_connections'][0].value).to eql('Endpoint=sb://...')
219+
expect(exploded_config[2]['storage_connection'].value).to eql('DefaultEndpointsProtocol=https;AccountName=...')
220+
expect(exploded_config[2]['threads']).to be == 9
221+
expect(exploded_config[2]['codec'].class.to_s).to eq("LogStash::Codecs::Plain")
222+
expect(exploded_config[2]['consumer_group']).to eql('ls')
223+
expect(exploded_config[2]['max_batch_size']).to be == 21
224+
expect(exploded_config[2]['prefetch_count']).to be == 250
225+
expect(exploded_config[2]['receive_timeout']).to be == 90
226+
expect(exploded_config[2]['initial_position']).to eql('beginning')
227+
expect(exploded_config[2]['initial_position_look_back']).to be == 7200
228+
expect(exploded_config[2]['checkpoint_interval']).to be == 15
229+
end
230+
206231
it "it runs the Event Processor Host" do
207232
mock_queue = double("queue")
208233
mock_host = double("event_processor_host")

0 commit comments

Comments
 (0)