Skip to content

Commit

Permalink
Actually set prefetch count and receive timeout values (#61)
Browse files Browse the repository at this point in the history
* Actually set prefetch count and receive timeout values

Prefetch count and receive timeout configurations are mentioned in
part of the source code, and treated as advanced configuration options,
but not actually set in the EventProcessorOptions.
Exposing these options allows advanced EventHub users to tweak configuration
settings to potentially improve performance.

This commit allows these values to be set, but maintains the existing
policy of not documenting these values.
  • Loading branch information
robbavey committed Nov 17, 2020
1 parent 8be5b23 commit 322e47a
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 2 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 1.2.3
- Fixed missing configuration of `prefetch_count` and `receive_timeout` [#61](https://github.com/logstash-plugins/logstash-input-azure_event_hubs/pull/61)

## 1.2.2
- Refactor: scope and review global java_imports [#57](https://github.com/logstash-plugins/logstash-input-azure_event_hubs/pull/57)

Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.2.2
1.2.3
6 changes: 5 additions & 1 deletion lib/logstash/inputs/azure_event_hubs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class LogStash::Inputs::AzureEventHubs < LogStash::Inputs::Base
java_import com.microsoft.azure.eventhubs.ConnectionStringBuilder
java_import java.util.concurrent.Executors
java_import java.util.concurrent.TimeUnit
java_import java.time.Duration

config_name "azure_event_hubs"

Expand Down Expand Up @@ -290,7 +291,7 @@ class LogStash::Inputs::AzureEventHubs < LogStash::Inputs::Base
# }
config :decorate_events, :validate => :boolean, :default => false

attr_reader :count, :pre_count
attr_reader :count

def initialize(params)

Expand Down Expand Up @@ -409,6 +410,9 @@ def run(queue)
end
options = EventProcessorOptions.new
options.setMaxBatchSize(max_batch_size)
options.setPrefetchCount(prefetch_count)
options.setReceiveTimeOut(Duration.ofSeconds(receive_timeout))

options.setExceptionNotification(LogStash::Inputs::Azure::ErrorNotificationHandler.new)
case @initial_position
when 'beginning'
Expand Down

0 comments on commit 322e47a

Please sign in to comment.