From 322e47af06a6f24e51aa4f81f844199f67f9376d Mon Sep 17 00:00:00 2001 From: Rob Bavey Date: Tue, 17 Nov 2020 10:32:39 -0500 Subject: [PATCH] Actually set prefetch count and receive timeout values (#61) * Actually set prefetch count and receive timeout values Prefetch count and receive timeout configurations are mentioned in part of the source code, and treated as advanced configuration options, but not actually set in the EventProcessorOptions. Exposing these options allows advanced EventHub users to tweak configuration settings to potentially improve performance. This commit allows these values to be set, but maintains the existing policy of not documenting these values. --- CHANGELOG.md | 3 +++ VERSION | 2 +- lib/logstash/inputs/azure_event_hubs.rb | 6 +++++- 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 66d92bd..91d0150 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 1.2.3 + - Fixed missing configuration of `prefetch_count` and `receive_timeout` [#61](https://github.com/logstash-plugins/logstash-input-azure_event_hubs/pull/61) + ## 1.2.2 - Refactor: scope and review global java_imports [#57](https://github.com/logstash-plugins/logstash-input-azure_event_hubs/pull/57) diff --git a/VERSION b/VERSION index 23aa839..0495c4a 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.2.2 +1.2.3 diff --git a/lib/logstash/inputs/azure_event_hubs.rb b/lib/logstash/inputs/azure_event_hubs.rb index b4251aa..baa542e 100644 --- a/lib/logstash/inputs/azure_event_hubs.rb +++ b/lib/logstash/inputs/azure_event_hubs.rb @@ -18,6 +18,7 @@ class LogStash::Inputs::AzureEventHubs < LogStash::Inputs::Base java_import com.microsoft.azure.eventhubs.ConnectionStringBuilder java_import java.util.concurrent.Executors java_import java.util.concurrent.TimeUnit + java_import java.time.Duration config_name "azure_event_hubs" @@ -290,7 +291,7 @@ class LogStash::Inputs::AzureEventHubs < LogStash::Inputs::Base # } config :decorate_events, :validate => :boolean, :default => false - attr_reader :count, :pre_count + attr_reader :count def initialize(params) @@ -409,6 +410,9 @@ def run(queue) end options = EventProcessorOptions.new options.setMaxBatchSize(max_batch_size) + options.setPrefetchCount(prefetch_count) + options.setReceiveTimeOut(Duration.ofSeconds(receive_timeout)) + options.setExceptionNotification(LogStash::Inputs::Azure::ErrorNotificationHandler.new) case @initial_position when 'beginning'