Skip to content

Commit 5428c70

Browse files
gaurav-narulachia7712
authored andcommitted
KAFKA-19894 Reintroduce SaslPlainSslEndToEndAuthorizationTest (#20915)
PR #17424 removed `SaslPlainSslEndToEndAuthorizationTest` along with `AclAuthorizer`. While there was a test within `SaslPlainSslEndToEndAuthorizationTest` which tested ZK ACLs, it *also* tested all the tests in its inheritance hierarchy. We should therefore re-introduce it as the suite lacks a test for `SASL/PLAIN` mechanism. Reviewers: Chia-Ping Tsai <[email protected]>, PoAn Yang <[email protected]>
1 parent cb2edfd commit 5428c70

File tree

1 file changed

+145
-0
lines changed

1 file changed

+145
-0
lines changed
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package kafka.api
18+
19+
import kafka.security.JaasTestUtils._
20+
import kafka.security.{JaasModule, JaasTestUtils}
21+
import kafka.utils.TestUtils
22+
import org.apache.kafka.common.config.SaslConfigs
23+
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
24+
import org.apache.kafka.common.internals.SecurityManagerCompatibility
25+
import org.apache.kafka.common.network.ConnectionMode
26+
import org.apache.kafka.common.security.auth._
27+
import org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
28+
import org.apache.kafka.common.security.plain.PlainAuthenticateCallback
29+
import org.apache.kafka.test.TestSslUtils
30+
import org.junit.jupiter.api.Assertions.assertTrue
31+
32+
import java.util
33+
import java.util.{Collections, Optional, Properties}
34+
import javax.security.auth.callback._
35+
import javax.security.auth.login.AppConfigurationEntry
36+
import scala.collection.Seq
37+
import scala.jdk.javaapi.OptionConverters
38+
39+
object SaslPlainSslEndToEndAuthorizationTest {
40+
41+
val controllerPrincipalName = "admin"
42+
43+
class TestPrincipalBuilder extends DefaultKafkaPrincipalBuilder(null, null) {
44+
override def build(context: AuthenticationContext): KafkaPrincipal = {
45+
val saslContext = context.asInstanceOf[SaslAuthenticationContext]
46+
47+
// Verify that peer principal can be obtained from the SSLSession provided in the context
48+
// since we have enabled TLS mutual authentication for the listener
49+
val sslPrincipal = saslContext.sslSession.get.getPeerPrincipal.getName
50+
assertTrue(sslPrincipal.endsWith(s"CN=${TestUtils.SslCertificateCn}"), s"Unexpected SSL principal $sslPrincipal")
51+
52+
saslContext.server.getAuthorizationID match {
53+
case KAFKA_PLAIN_ADMIN =>
54+
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, controllerPrincipalName)
55+
case KAFKA_PLAIN_USER =>
56+
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user")
57+
case _ =>
58+
KafkaPrincipal.ANONYMOUS
59+
}
60+
}
61+
}
62+
63+
object Credentials {
64+
val allUsers = Map(KAFKA_PLAIN_USER -> "user1-password",
65+
KAFKA_PLAIN_USER_2 -> KAFKA_PLAIN_PASSWORD_2,
66+
KAFKA_PLAIN_ADMIN -> "broker-password")
67+
}
68+
69+
class TestServerCallbackHandler extends AuthenticateCallbackHandler {
70+
def configure(configs: java.util.Map[String, _], saslMechanism: String, jaasConfigEntries: java.util.List[AppConfigurationEntry]): Unit = {}
71+
def handle(callbacks: Array[Callback]): Unit = {
72+
var username: String = null
73+
for (callback <- callbacks) {
74+
callback match {
75+
case nameCallback: NameCallback => username = nameCallback.getDefaultName
76+
case plainCallback: PlainAuthenticateCallback =>
77+
plainCallback.authenticated(Credentials.allUsers(username) == new String(plainCallback.password))
78+
case _ => throw new UnsupportedCallbackException(callback)
79+
}
80+
}
81+
}
82+
def close(): Unit = {}
83+
}
84+
85+
class TestClientCallbackHandler extends AuthenticateCallbackHandler {
86+
def configure(configs: java.util.Map[String, _], saslMechanism: String, jaasConfigEntries: java.util.List[AppConfigurationEntry]): Unit = {}
87+
def handle(callbacks: Array[Callback]): Unit = {
88+
val subject = SecurityManagerCompatibility.get().current()
89+
val username = subject.getPublicCredentials(classOf[String]).iterator().next()
90+
for (callback <- callbacks) {
91+
callback match {
92+
case nameCallback: NameCallback => nameCallback.setName(username)
93+
case passwordCallback: PasswordCallback =>
94+
if (username == KAFKA_PLAIN_USER || username == KAFKA_PLAIN_ADMIN)
95+
passwordCallback.setPassword(Credentials.allUsers(username).toCharArray)
96+
case _ => throw new UnsupportedCallbackException(callback)
97+
}
98+
}
99+
}
100+
def close(): Unit = {}
101+
}
102+
}
103+
104+
105+
// This test uses SASL callback handler overrides for server connections of Kafka broker
106+
// and client connections of Kafka producers and consumers. Client connections from Kafka brokers
107+
// used for inter-broker communication also use custom callback handlers. The second client used in
108+
// the multi-user test SaslEndToEndAuthorizationTest#testTwoConsumersWithDifferentSaslCredentials uses
109+
// static JAAS configuration with default callback handlers to test those code paths as well.
110+
class SaslPlainSslEndToEndAuthorizationTest extends SaslEndToEndAuthorizationTest {
111+
import SaslPlainSslEndToEndAuthorizationTest._
112+
113+
this.serverConfig.setProperty(s"${listenerName.configPrefix}${BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG}", "required")
114+
this.serverConfig.setProperty(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, classOf[TestPrincipalBuilder].getName)
115+
this.serverConfig.put(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS, classOf[TestClientCallbackHandler].getName)
116+
val mechanismPrefix = listenerName.saslMechanismConfigPrefix("PLAIN")
117+
this.serverConfig.put(s"$mechanismPrefix${BrokerSecurityConfigs.SASL_SERVER_CALLBACK_HANDLER_CLASS_CONFIG}", classOf[TestServerCallbackHandler].getName)
118+
this.producerConfig.put(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS, classOf[TestClientCallbackHandler].getName)
119+
this.consumerConfig.put(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS, classOf[TestClientCallbackHandler].getName)
120+
this.adminClientConfig.put(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS, classOf[TestClientCallbackHandler].getName)
121+
this.superuserClientConfig.put(SaslConfigs.SASL_CLIENT_CALLBACK_HANDLER_CLASS, classOf[TestClientCallbackHandler].getName)
122+
123+
override protected def kafkaClientSaslMechanism = "PLAIN"
124+
override protected def kafkaServerSaslMechanisms = List("PLAIN")
125+
126+
override val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "user")
127+
override val kafkaPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, controllerPrincipalName)
128+
129+
override def jaasSections(kafkaServerSaslMechanisms: Seq[String],
130+
kafkaClientSaslMechanism: Option[String],
131+
kafkaServerEntryName: String): Seq[JaasSection] = {
132+
val brokerLogin = JaasModule.plainLoginModule(KAFKA_PLAIN_ADMIN, "", false, util.Map.of()) // Password provided by callback handler
133+
val clientLogin = JaasModule.plainLoginModule(KAFKA_PLAIN_USER_2, KAFKA_PLAIN_PASSWORD_2, false, util.Map.of())
134+
Seq(new JaasSection(kafkaServerEntryName, Collections.singletonList(brokerLogin)),
135+
new JaasSection(KAFKA_CLIENT_CONTEXT_NAME, Collections.singletonList(clientLogin)))
136+
}
137+
138+
// Generate SSL certificates for clients since we are enabling TLS mutual authentication
139+
// in this test for the SASL_SSL listener.
140+
override def clientSecurityProps(certAlias: String): Properties = {
141+
JaasTestUtils.securityConfigs(ConnectionMode.CLIENT, securityProtocol, OptionConverters.toJava(trustStoreFile),
142+
certAlias, JaasTestUtils.SSL_CERTIFICATE_CN, OptionConverters.toJava(clientSaslProperties),
143+
TestSslUtils.DEFAULT_TLS_PROTOCOL_FOR_TESTS, Optional.of(true))
144+
}
145+
}

0 commit comments

Comments
 (0)