Skip to content

Commit 188b86f

Browse files
authored
KAFKA-19793 Adding allow topic creation false for global and restore consumer (#20723)
Added auto topic creation false for both restore and global consumers. Reviewers: Nikita Shupletsov <[email protected]>, Matthias J. Sax <[email protected]>
1 parent d9d9fcb commit 188b86f

File tree

2 files changed

+89
-14
lines changed

2 files changed

+89
-14
lines changed

streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -826,7 +826,7 @@ public class StreamsConfig extends AbstractConfig {
826826
private static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC = "Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift. Default is 1 day";
827827

828828
private static final String[] NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS =
829-
new String[] {ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ConsumerConfig.GROUP_PROTOCOL_CONFIG};
829+
new String[] {ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ConsumerConfig.GROUP_PROTOCOL_CONFIG, ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG};
830830
private static final String[] NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS =
831831
new String[] {ConsumerConfig.ISOLATION_LEVEL_CONFIG};
832832
private static final String[] NON_CONFIGURABLE_PRODUCER_EOS_CONFIGS =
@@ -1265,7 +1265,8 @@ public class StreamsConfig extends AbstractConfig {
12651265
ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000",
12661266
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest",
12671267
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false",
1268-
ConsumerConfig.GROUP_PROTOCOL_CONFIG, "classic"
1268+
ConsumerConfig.GROUP_PROTOCOL_CONFIG, "classic",
1269+
ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false"
12691270
);
12701271

12711272
private static final Map<String, Object> CONSUMER_EOS_OVERRIDES;
@@ -1640,8 +1641,8 @@ private Map<String, Object> getCommonConsumerConfigs() {
16401641

16411642
clientProvidedProps.remove(GROUP_PROTOCOL_CONFIG);
16421643

1643-
checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS);
1644-
checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS);
1644+
checkIfUnexpectedUserSpecifiedClientConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS);
1645+
checkIfUnexpectedUserSpecifiedClientConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS);
16451646

16461647
final Map<String, Object> consumerProps = new HashMap<>(eosEnabled ? CONSUMER_EOS_OVERRIDES : CONSUMER_DEFAULT_OVERRIDES);
16471648
if (StreamsConfigUtils.eosEnabled(this)) {
@@ -1656,12 +1657,12 @@ private Map<String, Object> getCommonConsumerConfigs() {
16561657
return consumerProps;
16571658
}
16581659

1659-
private void checkIfUnexpectedUserSpecifiedConsumerConfig(final Map<String, Object> clientProvidedProps,
1660-
final String[] nonConfigurableConfigs) {
1661-
// Streams does not allow users to configure certain consumer/producer configurations, for example,
1662-
// enable.auto.commit. In cases where user tries to override such non-configurable
1663-
// consumer/producer configurations, log a warning and remove the user defined value from the Map.
1664-
// Thus, the default values for these consumer/producer configurations that are suitable for
1660+
private void checkIfUnexpectedUserSpecifiedClientConfig(final Map<String, Object> clientProvidedProps,
1661+
final String[] nonConfigurableConfigs) {
1662+
// Streams does not allow users to configure certain client configurations (consumer/producer),
1663+
// for example, enable.auto.commit or transactional.id. In cases where user tries to override
1664+
// such non-configurable client configurations, log a warning and remove the user defined value
1665+
// from the Map. Thus, the default values for these client configurations that are suitable for
16651666
// Streams will be used instead.
16661667

16671668
final String nonConfigurableConfigMessage = "Unexpected user-specified {} config '{}' found. {} setting ({}) will be ignored and the Streams default setting ({}) will be used.";
@@ -1769,6 +1770,7 @@ public Map<String, Object> getMainConsumerConfigs(final String groupId, final St
17691770

17701771
// Get main consumer override configs
17711772
final Map<String, Object> mainConsumerProps = originalsWithPrefix(MAIN_CONSUMER_PREFIX);
1773+
checkIfUnexpectedUserSpecifiedClientConfig(mainConsumerProps, NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS);
17721774
consumerProps.putAll(mainConsumerProps);
17731775

17741776
// this is a hack to work around StreamsConfig constructor inside StreamsPartitionAssignor to avoid casting
@@ -1799,9 +1801,6 @@ public Map<String, Object> getMainConsumerConfigs(final String groupId, final St
17991801
consumerProps.put(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG, getInt(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG));
18001802
consumerProps.put(TASK_ASSIGNOR_CLASS_CONFIG, getString(TASK_ASSIGNOR_CLASS_CONFIG));
18011803

1802-
// disable auto topic creation
1803-
consumerProps.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
1804-
18051804
// verify that producer batch config is no larger than segment size, then add topic configs required for creating topics
18061805
final Map<String, Object> topicProps = originalsWithPrefix(TOPIC_PREFIX, false);
18071806
final Map<String, Object> producerProps = getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames());
@@ -1844,6 +1843,7 @@ public Map<String, Object> getRestoreConsumerConfigs(final String clientId) {
18441843

18451844
// Get restore consumer override configs
18461845
final Map<String, Object> restoreConsumerProps = originalsWithPrefix(RESTORE_CONSUMER_PREFIX);
1846+
checkIfUnexpectedUserSpecifiedClientConfig(restoreConsumerProps, NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS);
18471847
baseConsumerProps.putAll(restoreConsumerProps);
18481848

18491849
// no need to set group id for a restore consumer
@@ -1877,6 +1877,7 @@ public Map<String, Object> getGlobalConsumerConfigs(final String clientId) {
18771877

18781878
// Get global consumer override configs
18791879
final Map<String, Object> globalConsumerProps = originalsWithPrefix(GLOBAL_CONSUMER_PREFIX);
1880+
checkIfUnexpectedUserSpecifiedClientConfig(globalConsumerProps, NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS);
18801881
baseConsumerProps.putAll(globalConsumerProps);
18811882

18821883
// no need to set group id for a global consumer
@@ -1887,6 +1888,7 @@ public Map<String, Object> getGlobalConsumerConfigs(final String clientId) {
18871888
// add client id with stream client id prefix
18881889
baseConsumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-global-consumer");
18891890
baseConsumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
1891+
18901892
return baseConsumerProps;
18911893
}
18921894

@@ -1903,7 +1905,7 @@ public Map<String, Object> getGlobalConsumerConfigs(final String clientId) {
19031905
public Map<String, Object> getProducerConfigs(final String clientId) {
19041906
final Map<String, Object> clientProvidedProps = getClientPropsWithPrefix(PRODUCER_PREFIX, ProducerConfig.configNames());
19051907

1906-
checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, NON_CONFIGURABLE_PRODUCER_EOS_CONFIGS);
1908+
checkIfUnexpectedUserSpecifiedClientConfig(clientProvidedProps, NON_CONFIGURABLE_PRODUCER_EOS_CONFIGS);
19071909

19081910
// generate producer configs from original properties and overridden maps
19091911
final Map<String, Object> props = new HashMap<>(eosEnabled ? PRODUCER_EOS_OVERRIDES : PRODUCER_DEFAULT_OVERRIDES);

streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,79 @@ public void shouldOverrideAdminDefaultAdminClientEnableTelemetry() {
442442
assertTrue((boolean) returnedProps.get(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG));
443443
}
444444

445+
@Test
446+
public void shouldNotAllowAutoCreateTopicsForConsumers_WithCommonConsumerPrefix() {
447+
// Test with generic consumer.* prefix (affects all consumer types)
448+
props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), "true");
449+
450+
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
451+
appender.setClassLogger(StreamsConfig.class, Level.ERROR);
452+
453+
final StreamsConfig streamsConfig = new StreamsConfig(props);
454+
455+
// Main consumer - verify override is ignored
456+
final Map<String, Object> mainConfigs = streamsConfig.getMainConsumerConfigs("group", "client", 0);
457+
assertEquals("false", mainConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
458+
"Main consumer should not allow auto topic creation with consumer.* override");
459+
460+
// Restore consumer - verify override is ignored
461+
final Map<String, Object> restoreConfigs = streamsConfig.getRestoreConsumerConfigs("client");
462+
assertEquals("false", restoreConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
463+
"Restore consumer should not allow auto topic creation with consumer.* override");
464+
465+
// Global consumer - verify override is ignored
466+
final Map<String, Object> globalConfigs = streamsConfig.getGlobalConsumerConfigs("client");
467+
assertEquals("false", globalConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
468+
"Global consumer should not allow auto topic creation with consumer.* override");
469+
470+
// Verify exactly 1 error is logged (consumer.* prefix is validated once in getCommonConsumerConfigs for each type of consumer)
471+
final List<String> errorMessages = appender.getMessages();
472+
final long errorCount = errorMessages.stream()
473+
.filter(msg -> msg.contains("Unexpected user-specified consumer config 'allow.auto.create.topics' found"))
474+
.count();
475+
assertEquals(3, errorCount,
476+
"Should log exactly 3 error for consumer.* prefix");
477+
}
478+
}
479+
480+
@Test
481+
public void shouldNotAllowAutoCreateTopicsForConsumers_WithSpecificConsumerPrefixes() {
482+
// Test with specific prefixes for each consumer type
483+
props.put(StreamsConfig.mainConsumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), "true");
484+
props.put(StreamsConfig.restoreConsumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), "true");
485+
props.put(StreamsConfig.globalConsumerPrefix(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG), "true");
486+
487+
try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
488+
appender.setClassLogger(StreamsConfig.class, Level.ERROR);
489+
490+
final StreamsConfig streamsConfig = new StreamsConfig(props);
491+
492+
// Main consumer - verify override is ignored
493+
final Map<String, Object> mainConfigs = streamsConfig.getMainConsumerConfigs("group", "client", 0);
494+
assertEquals("false", mainConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
495+
"Main consumer should not allow auto topic creation with main.consumer.* override");
496+
497+
// Restore consumer - verify override is ignored
498+
final Map<String, Object> restoreConfigs = streamsConfig.getRestoreConsumerConfigs("client");
499+
assertEquals("false", restoreConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
500+
"Restore consumer should not allow auto topic creation with restore.consumer.* override");
501+
502+
// Global consumer - verify override is ignored
503+
final Map<String, Object> globalConfigs = streamsConfig.getGlobalConsumerConfigs("client");
504+
assertEquals("false", globalConfigs.get(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG),
505+
"Global consumer should not allow auto topic creation with global.consumer.* override");
506+
507+
// Verify exactly 3 errors are logged (one for each specific prefix)
508+
final List<String> errorMessages = appender.getMessages();
509+
final long errorCount = errorMessages.stream()
510+
.filter(msg -> msg.contains("Unexpected user-specified consumer config 'allow.auto.create.topics' found"))
511+
.count();
512+
assertEquals(3, errorCount,
513+
"Should log exactly 3 errors: one for main.consumer.*, one for restore.consumer.*, one for global.consumer.*");
514+
}
515+
}
516+
517+
445518
@Test
446519
public void shouldSupportNonPrefixedAdminConfigs() {
447520
props.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 10);

0 commit comments

Comments
 (0)