Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 1.5.3
- Fix: With `config_mode => 'advanced'`, event hub-specific settings (`initial_position`, `max_batch_size`, `prefetch_count`, `receive_timeout`, `initial_position_look_back`) were being ignored and replaced with global defaults. These settings are now correctly applied per event hub [#104](https://github.com/logstash-plugins/logstash-input-azure_event_hubs/pull/104)

## 1.5.2
- Updated JWT dependency [#101](https://github.com/logstash-plugins/logstash-input-azure_event_hubs/pull/101)

Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.5.2
1.5.3
12 changes: 6 additions & 6 deletions lib/logstash/inputs/azure_event_hubs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -417,12 +417,12 @@ def run(queue)
event_processor_host = create_in_memory_event_processor_host(event_hub, event_hub_name, scheduled_executor_service)
end
options = EventProcessorOptions.new
options.setMaxBatchSize(max_batch_size)
options.setPrefetchCount(prefetch_count)
options.setReceiveTimeOut(Duration.ofSeconds(receive_timeout))
options.setMaxBatchSize(event_hub['max_batch_size'])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

options.setPrefetchCount(event_hub['prefetch_count'])
options.setReceiveTimeOut(Duration.ofSeconds(event_hub['receive_timeout']))

options.setExceptionNotification(LogStash::Inputs::Azure::ErrorNotificationHandler.new)
case @initial_position
case event_hub['initial_position']
when 'beginning'
msg = "Configuring Event Hub #{event_hub_name} to read events all events."
@logger.debug("If this is the initial read... " + msg) if event_hub['storage_connection']
Expand All @@ -434,10 +434,10 @@ def run(queue)
@logger.info(msg) unless event_hub['storage_connection']
options.setInitialPositionProvider(EventProcessorOptions::EndOfStreamInitialPositionProvider.new(options))
when 'look_back'
msg = "Configuring Event Hub #{event_hub_name} to read events starting at 'now - #{@initial_position_look_back}' seconds."
msg = "Configuring Event Hub #{event_hub_name} to read events starting at 'now - #{event_hub['initial_position_look_back']}' seconds."
@logger.debug("If this is the initial read... " + msg) if event_hub['storage_connection']
@logger.info(msg) unless event_hub['storage_connection']
options.setInitialPositionProvider(LogStash::Inputs::Azure::LookBackPositionProvider.new(@initial_position_look_back))
options.setInitialPositionProvider(LogStash::Inputs::Azure::LookBackPositionProvider.new(event_hub['initial_position_look_back']))
end
event_processor_host.registerEventProcessorFactory(LogStash::Inputs::Azure::ProcessorFactory.new(queue, event_hub['codec'], event_hub['checkpoint_interval'], self.method(:decorate), event_hub['decorate_events']), options)
.when_complete(lambda {|x, e|
Expand Down
37 changes: 31 additions & 6 deletions spec/inputs/azure_event_hub_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -175,34 +175,59 @@
# same named event hub with different configuration is allowed
{'event_hub_name0' => {
'event_hub_connection' => 'Endpoint=sb://...',
'storage_connection' => 'DefaultEndpointsProtocol=https;AccountName=...',
'consumer_group' => 'ls'}}
],
'codec' => 'plain',
'consumer_group' => 'default_consumer_group',
'max_batch_size' => 21,
'prefetch_count' => 250,
'receive_timeout' => 90,
'initial_position' => 'beginning',
'initial_position_look_back' => 7200,
'checkpoint_interval' => 15,
'decorate_events' => false,
'threads' => 9
}
end
it_behaves_like "an exploded Event Hub config", 1
it "it explodes the 2cnd advanced config event hub correctly" do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why there are 3 sets of event hubs defined here, but only 2 verified?

I think this is leading to missing testing some of the override settings, as the tests are done against the "no overriden settings" event hub and the "partially overriden settings" event hub - eg prefetch_count and initial_position_lookback only ever test the top-level setting, and not an individual event hub override.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why there are 3 sets of event hubs defined here, but only 2 verified?

First entry (index is 0) is verified in it_behaves_like "an exploded Event Hub config", 1

prefetch_count (30) and initial_position_look_back (50) are different in first entry. 2nd and 3rd entries are using global 'prefetch_count' => 250 and 'initial_position_look_back' => 7200 settings.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, thank you. Missed that completely


it "it explodes the second advanced config event hub correctly (with individual and inherited settings)" do
exploded_config = input.event_hubs_exploded
expect(exploded_config[1]['event_hubs'].size).to be == 1 #always 1 in the exploded form
expect(exploded_config[1]['event_hubs'][0]).to eql('event_hub_name1')
expect(exploded_config[1]['event_hub_connections'][0].value).to eql('1Endpoint=sb://...')
expect(exploded_config[1]['storage_connection'].value).to eql('1DefaultEndpointsProtocol=https;AccountName=...')
expect(exploded_config[1]['threads']).to be == 9
expect(exploded_config[1]['codec'].class.to_s).to eq("LogStash::Codecs::JSON") # different between configs
expect(exploded_config[1]['consumer_group']).to eql('cg1') # override global
expect(exploded_config[1]['max_batch_size']).to be == 21 # filled from global
expect(exploded_config[1]['prefetch_count']).to be == 300 # default
expect(exploded_config[1]['codec'].class.to_s).to eq("LogStash::Codecs::JSON")
expect(exploded_config[1]['consumer_group']).to eql('cg1')
expect(exploded_config[1]['max_batch_size']).to be == 21
expect(exploded_config[1]['prefetch_count']).to be == 250
expect(exploded_config[1]['receive_timeout']).to be == 41
expect(exploded_config[1]['initial_position']).to eql('end')
expect(exploded_config[1]['initial_position_look_back']).to be == 86400 # default
expect(exploded_config[1]['initial_position_look_back']).to be == 7200
expect(exploded_config[1]['checkpoint_interval']).to be == 61
expect(exploded_config[1]['decorate_events']).to be_falsy
expect(exploded_config[1]['storage_container']).to eq('alt_container')
end

it "it explodes the third advanced config event hub correctly (mostly inherited settings)" do
exploded_config = input.event_hubs_exploded
expect(exploded_config[2]['event_hubs'].size).to be == 1
expect(exploded_config[2]['event_hubs'][0]).to eql('event_hub_name0')
expect(exploded_config[2]['event_hub_connections'][0].value).to eql('Endpoint=sb://...')
expect(exploded_config[2]['storage_connection'].value).to eql('DefaultEndpointsProtocol=https;AccountName=...')
expect(exploded_config[2]['threads']).to be == 9
expect(exploded_config[2]['codec'].class.to_s).to eq("LogStash::Codecs::Plain")
expect(exploded_config[2]['consumer_group']).to eql('ls')
expect(exploded_config[2]['max_batch_size']).to be == 21
expect(exploded_config[2]['prefetch_count']).to be == 250
expect(exploded_config[2]['receive_timeout']).to be == 90
expect(exploded_config[2]['initial_position']).to eql('beginning')
expect(exploded_config[2]['initial_position_look_back']).to be == 7200
expect(exploded_config[2]['checkpoint_interval']).to be == 15
end

it "it runs the Event Processor Host" do
mock_queue = double("queue")
mock_host = double("event_processor_host")
Expand Down