Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .ci/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ env

set -ex

export KAFKA_VERSION=3.3.1
export KAFKA_VERSION=3.9.1
./kafka_test_setup.sh

bundle exec rspec -fd
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 11.8.0
- Deprecate partitioner `default` and `uniform_sticky` options [#206](https://github.com/logstash-plugins/logstash-integration-kafka/pull/206)
Both options are deprecated in Kafka client 3 and will be removed in the plugin 12.0.0.

## 11.7.0
- Add `reconnect_backoff_max_ms` option for configuring kafka client [#204](https://github.com/logstash-plugins/logstash-integration-kafka/pull/204)

Expand Down
11 changes: 6 additions & 5 deletions docs/output-kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -349,14 +349,15 @@ The max time in milliseconds before a metadata refresh is forced.
* Value type is <<string,string>>
* There is no default value for this setting.

The default behavior is to hash the `message_key` of an event to get the partition.
When no message key is present, the plugin picks a partition in a round-robin fashion.
By not setting this value, the plugin uses the built-in partitioning strategy provided by the Kafka client. Read more about the "partitioner.class" on the Kafka documentation.

Available options for choosing a partitioning strategy are as follows:
Available options are as follows:

* `default` use the default partitioner as described above
* `default` hashes the `message_key` of an event to get the partition. When no message key is present, the plugin picks a partition in a round-robin fashion. Please note that this is a different strategy than the one used when `partitioner` is left unset.
* `round_robin` distributes writes to all partitions equally, regardless of `message_key`
* `uniform_sticky` sticks to a partition for the duration of a batch than randomly picks a new one
* `uniform_sticky` hashes the `message_key` of an event to get the partition. When no message key is present, the plugin sticks to a partition for the duration of a batch than randomly picks a new one.

NOTE: `default` and `uniform_sticky` are deprecated and will be removed in `12.0.0`.

[id="plugins-{type}s-{plugin}-receive_buffer_bytes"]
===== `receive_buffer_bytes`
Expand Down
4 changes: 2 additions & 2 deletions kafka_test_setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ set -ex
if [ -n "${KAFKA_VERSION+1}" ]; then
echo "KAFKA_VERSION is $KAFKA_VERSION"
else
KAFKA_VERSION=3.4.1
KAFKA_VERSION=3.9.1
fi

export _JAVA_OPTIONS="-Djava.net.preferIPv4Stack=true"
Expand All @@ -17,7 +17,7 @@ mkdir build
echo "Setup Kafka version $KAFKA_VERSION"
if [ ! -e "kafka_2.13-$KAFKA_VERSION.tgz" ]; then
echo "Kafka not present locally, downloading"
curl -s -o "kafka_2.13-$KAFKA_VERSION.tgz" "https://archive.apache.org/dist/kafka/$KAFKA_VERSION/kafka_2.13-$KAFKA_VERSION.tgz"
curl -s -o "kafka_2.13-$KAFKA_VERSION.tgz" "https://downloads.apache.org/kafka/$KAFKA_VERSION/kafka_2.13-$KAFKA_VERSION.tgz"
fi
cp kafka_2.13-$KAFKA_VERSION.tgz build/kafka.tgz
mkdir build/kafka && tar xzf build/kafka.tgz -C build/kafka --strip-components 1
Expand Down
8 changes: 8 additions & 0 deletions lib/logstash/outputs/kafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -409,8 +409,10 @@ def partitioner_class
when 'round_robin'
'org.apache.kafka.clients.producer.RoundRobinPartitioner'
when 'uniform_sticky'
log_partitioner_warning(partitioner, 'UniformStickyPartitioner')
'org.apache.kafka.clients.producer.UniformStickyPartitioner'
when 'default'
log_partitioner_warning(partitioner, 'DefaultPartitioner')
'org.apache.kafka.clients.producer.internals.DefaultPartitioner'
else
unless partitioner.index('.')
Expand All @@ -420,4 +422,10 @@ def partitioner_class
end
end

def log_partitioner_warning(partitioner, class_name)
deprecation_logger.deprecated("Producer `partitioner` is configured with the deprecated option `#{partitioner}`. " \
"#{class_name} is removed in kafka-client 4.0 and the `#{partitioner}` option will be removed in the plugin 12.0.0. "\
'Please update your configuration to use `round_robin` or unset the option to use the build-in partitioning strategy. ')
end

end #class LogStash::Outputs::Kafka
2 changes: 1 addition & 1 deletion logstash-integration-kafka.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-integration-kafka'
s.version = '11.7.0'
s.version = '11.8.0'
s.licenses = ['Apache-2.0']
s.summary = "Integration with Kafka - input and output plugins"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline "+
Expand Down