Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.
This repository was archived by the owner on Jan 24, 2024. It is now read-only.

[BUG] Failed with librdkafka when enable idempotence option #2008

@NiuBlibing

Description

@NiuBlibing

Describe the bug
When set enable.idempotence true, the librdkafka will fail with the following

% Running producer loop. Press Ctrl-C to exit
%0|1695637496.333|FATAL|rdkafka#producer-1| [thrd:main]: Fatal error: Local: Required feature not supported by broker: Idempotent producer not supported by any of the 1 connected broker(s): requires Apache Kafka broker version >= 0.11.0
% Failed to produce to topic persistent://test/test-ns/testtopic: Local: Fatal error
% Flushing outstanding messages..
% Error: _FATAL: Fatal error: Local: Required feature not supported by broker: Idempotent producer not supported by any of the 1 connected broker(s): requires Apache Kafka broker version >= 0.11.0
% FATAL ERROR: _UNSUPPORTED_FEATURE: Idempotent producer not supported by any of the 1 connected broker(s): requires Apache Kafka broker version >= 0.11.0
% Terminating on fatal error
% Message delivery failed: Local: Purged in queue
% 1 message(s) produced, 0 delivered, 1 failed

To Reproduce
Steps to reproduce the behavior:

  1. use librdkafka's example idempotent_producer.c, set user name and password(I use sasl for kop)
  2. idempotent_producer

Expected behavior
Support enable.idempotence option

Additional context
pulsar version 3.0.0
kop version: 3.0.0.4
librdkafka: v1.9.2
broker.conf:

messagingProtocols=kafka
protocolHandlerDirectory=./protocols
allowAutoTopicCreationType=partitioned
kafkaListeners=PLAINTEXT://localhost:9092
kafkaAdvertisedListeners=PLAINTEXT://localhost:9092
brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
brokerDeleteInactiveTopicsEnabled=false
kafkaTransactionCoordinatorEnabled=true
brokerDeduplicationEnabled=true
kafkaBrokerId=1
saslAllowedMechanisms=PLAIN

idempotent_producer.patch

diff --git a/examples/idempotent_producer.c b/examples/idempotent_producer.c
index 1e799eaf..7c14fb6f 100644
--- a/examples/idempotent_producer.c
+++ b/examples/idempotent_producer.c
@@ -197,6 +197,35 @@ int main(int argc, char **argv) {
                 return 1;
         }
 
+        if (rd_kafka_conf_set(conf, "security.protocol", "SASL_PLAINTEXT",
+                          errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
+        {
+            fprintf(stderr, "%s\n", errstr);
+            rd_kafka_conf_destroy(conf);
+            return 1;
+        }
+            if (rd_kafka_conf_set(conf, "sasl.mechanisms", "PLAIN",
+                          errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
+    {
+        fprintf(stderr, "%s\n", errstr);
+        rd_kafka_conf_destroy(conf);
+        return 1;
+    }
+    if (rd_kafka_conf_set(conf, "sasl.username", "test/test-ns",
+                          errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
+    {
+        fprintf(stderr, "%s\n", errstr);
+        rd_kafka_conf_destroy(conf);
+        return 1;
+    }
+    if (rd_kafka_conf_set(conf, "sasl.password", "token:123456",
+                          errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
+    {
+        fprintf(stderr, "%s\n", errstr);
+        rd_kafka_conf_destroy(conf);
+        return 1;
+    }
+
         /* Set the delivery report callback.
          * This callback will be called once per message to inform
          * the application if delivery succeeded or failed.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions