Skip to content

Commit

Permalink
Actually set maxBatchSize on EventProcessor (#52)
Browse files Browse the repository at this point in the history
* Actually set maxBatchSize on EventProcessor

The `max_batch_size` option was being ignored when creating the
event processor, leading to a default batch size of 10, instead of
the intended 125, and not allowing users to change their batch size.

This caused the potential for much more frequent checkpointing of
completed event batches, potentially causing performance issues.

* Update changelog and bump version

Co-Authored-By: João Duarte <jsvd@users.noreply.github.com>
  • Loading branch information
robbavey and jsvd committed Feb 28, 2020
1 parent 3079449 commit 69d7b9e
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 2 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 1.1.4
- Fixed missing configuration of the `max_batch_size`setting [#52](https://github.com/logstash-plugins/logstash-input-azure_event_hubs/pull/52)

## 1.1.3
- [DOC] Added clarification for threads parameter [#50](https://github.com/logstash-plugins/logstash-input-azure_event_hubs/pull/50)

Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.1.3
1.1.4
1 change: 1 addition & 0 deletions lib/logstash/inputs/azure_event_hubs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ def run(queue)
checkpoint_manager.java_send :initialize, [HostContext], event_processor_host.getHostContext
end
options = EventProcessorOptions.new
options.setMaxBatchSize(max_batch_size)
options.setExceptionNotification(LogStash::Inputs::Azure::ErrorNotificationHandler.new)
case @initial_position
when 'beginning'
Expand Down
2 changes: 1 addition & 1 deletion lib/logstash/inputs/processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def onClose(context, reason)
end

def onEvents(context, batch)
@logger.debug("Event Hub: #{context.getEventHubPath.to_s}, Partition: #{context.getPartitionId.to_s} is processing a batch.") if @logger.debug?
@logger.debug("Event Hub: #{context.getEventHubPath.to_s}, Partition: #{context.getPartitionId.to_s} is processing a batch of size #{batch.size}.") if @logger.debug?
last_payload = nil
batch_size = 0
batch.each do |payload|
Expand Down

0 comments on commit 69d7b9e

Please sign in to comment.