Kafka + zookeeper (ACL)安全认证及权限控制 |
您所在的位置:网站首页 › zookeeper的权限控制方式 › Kafka + zookeeper (ACL)安全认证及权限控制 |
版本:kafka_2.12-2.4.1 、apache-zookeeper-3.6.3-bin 一、Zookeeper 配置 SASL若只关注 kafka 的安全认证,不需要配置 Zookeeper 的 SASL,但 kafka 会在 zk 中存储一些必要的信息,因此 zk 的安全认证也会影响到 kafka 1.1 新建 zoo_jaas.conf 文件zoo_jaas.conf 文件名、文件所在路径没有特殊要求,一般放置在${ZOOKEEPER_HOME}/conf目录下 Server { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin" user_kafka="kafka@123"; }; Server.username、Server.password为 Zookeeper 内部通信的用户名和密码,因此保证每个 zk 节点该属性一致即可 Server.user_xxx 中 xxx 为自定义用户名,用于 zkClient 连接所使用的用户名和密码,即为 kafka 创建的用户名 1.2 配置 zoo.conf 文件authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider requireClientAuthScheme=sasl jaasLoginRenew=3600000 zookeeper.sasl.client=true zookeeper.sasl.client 设置为 true,开启客户端身份验证,否则zoo_jaas.conf中配置的用户名将不起作用,客户端仍然可以无 jaas 文件连接,只是带有 WARNNING 而已 1.3 导入依赖包因为使用的权限验证类为:org.apache.kafka.common.security.plain.PlainLoginModule,所以需要 kafka 相关 jar 包,新建文件夹 zk_sasl_lib,如下: kafka-clients-2.4.1.jar lz4-java-1.6.0.jar slf4j-api-1.7.28.jar slf4j-log4j12-1.7.28.jar snappy-java-1.1.7.3.jar 1.4 修改 zkEnv.sh 文件修改前: export SERVER_JVMFLAGS="-Xmx${ZK_SERVER_HEAP}m $SERVER_JVMFLAGS" 修改后: for jar in /Users/wjun/env/zookeeper/zk_sasl_lib/*.jar; do CLASSPATH="$jar:$CLASSPATH" done export SERVER_JVMFLAGS="-Xmx${ZK_SERVER_HEAP}m $SERVER_JVMFLAGS -Djava.security.auth.login.config=/Users/wjun/env/zookeeper/conf/zoo_jaas.conf" 重启 Zookeeper 服务即可 二、Kakfa 配置 SASL 2.1 新建 kafka_server_jaas.conf 文件kafka_server_jaas.conf 文件名和存放路径没有要求,一般放置在${KAFKA_HOME}/config目录下 KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin@123" user_admin="admin@123" user_producer="producer@123" user_consumer="consumer@123"; }; Client{ org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka@123"; }; KafkaServer.username、KafkaServer.password 为 broker 内部通信的用户名密码,同上 KafkaServer.user_xxx 其中 xxx 必须和 KafkaServer.username 配置的用户名一致,密码也一致 KafkaServer.user_producer、KafkaServer.user_consumer 为了之后的 ACL 做准备,达到消费者生产者使用不同账号且消费者账号只能消费数据,生产者账号只能生产数据 Client.username、Client.password 填写 Zookeeper 中注册的账号密码,用于 broker 与 zk 的通信(若 zk 没有配置 SASL 可以忽略、若 zookeeper.sasl.client 为 false 也可以忽略只是带有,日志如下) [2021-06-29 17:14:30,204] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/Users/wjun/env/kafka/config/kafka_server_jaas.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn) 2.2 修改 server.properties 文件listeners=SASL_PLAINTEXT://localhost:9092 security.inter.broker.protocol=SASL_PLAINTEXT sasl.mechanism.inter.broker.protocol=PLAIN sasl.enabled.mechanisms=PLAIN allow.everyone.if.no.acl.found=false authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer super.users=User:admin 其中 localhost 需要修改成 IP地址 super.users 配置超级用户,该用户不受之后的 ACL 配置影响 2.3 修改启动脚本修改 kafka-server-start.sh 文件,使之加载到 kafka_server_jaas.conf 文件 修改前: if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" fi 修改后: if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/Users/wjun/env/kafka/config/kafka_server_jaas.conf" fi 重启 kafka 服务即可 2.4 Java API 验证验证前需要先在 kafka 中创建 topic,kafka版本不同,命令有所不同 新. bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic kafkaTopic 旧. bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafkaTopic pom 引用 kafka-client org.apache.kafka kafka-clients代码如下: public class TProducer { public static void main(String[] args) throws IOException { // 创建配置类 Properties properties = new Properties(); // 加载生产者配置文件 properties.load(TProducer.class.getClassLoader().getResourceAsStream("producer.properties")); // 创建生产者对象 KafkaProducer producer = new KafkaProducer(properties); ProducerRecord producerRecord = new ProducerRecord("demo1", "key_1", "value_1"); producer.send(producerRecord, (metadata, exception) -> { if (exception == null) { System.out.println("消息发送至 --> " + metadata.topic() + " 偏移量为:" + metadata.offset()); } else { System.out.println("消息发送失败 " + exception.getMessage()); } }); producer.close(); } } 其中 producer.properties 如下: ############################# Producer Basics ############################# # list of brokers used for bootstrapping knowledge about the rest of the cluster # format: host1:port1,host2:port2 ... bootstrap.servers=localhost:9092 # specify the compression codec for all data generated: none, gzip, snappy, lz4 compression.type=none # name of the partitioner class for partitioning events; default partition spreads data randomly # partitioner.class= # the maximum amount of time the client will wait for the response of a request #request.timeout.ms= # how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for #max.block.ms= # the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together #linger.ms= # the maximum size of a request in bytes #max.request.size= # the default batch size in bytes when batching multiple records sent to a partition #batch.size= # the total bytes of memory the producer can use to buffer records waiting to be sent to the server #buffer.memory= key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN security.protocol=SASL_PLAINTEXT、sasl.mechanism=PLAIN 必须配置 此时运行会报 Caused by: java.lang.IllegalArgumentException: Could not find a 'KafkaClient' entry in the JAAS configuration. System property 'java.security.auth.login.config' is not set at org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:133) at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:98) at org.apache.kafka.common.security.JaasContext.loadClientContext(JaasContext.java:84) at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:124) at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:67) at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:99) at org.apache.kafka.clients.producer.KafkaProducer.newSender(KafkaProducer.java:450) at org.apache.kafka.clients.producer.KafkaProducer.(KafkaProducer.java:421) ... 2 more 原因是需要用户名密码才能连接到 Kafka,即 kafka_server_jaas.conf 配置中的 方式一: 创建 kafka_client_jaas.conf 文件 KafkaClient{ org.apache.kafka.common.security.plain.PlainLoginModule required username="producer" password="producer@123"; }; 程序启动时添加参数 -Djava.security.auth.login.config=/Users/wjun/Documents/Program/Java/kafka_demo/src/main/resources/kafka_client_jaas.conf 方式二: 在 producer.properties 添加: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="producer" password="producer@123"; 启动程序成功生产数据 [main] INFO org.apache.kafka.common.security.authenticator.AbstractLogin - Successfully logged in. [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.4.1 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: c57222ae8cd7866b [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1624965871345 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: l3Agv3weRiG27uo5EDj4KA [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. 消息发送至 --> demo1 偏移量为:18 消费者同理 public class TConsumer { public static void main(String[] args) throws IOException { // 创建配置类 Properties properties = new Properties(); // 加载生产者配置文件 properties.load(TProducer.class.getClassLoader().getResourceAsStream("consumer.properties")); // 构建消费者 KafkaConsumer consumer = new KafkaConsumer(properties); // 订阅主题 consumer.subscribe(Collections.singletonList("demo1")); ConsumerRecords records; while (true) { records = consumer.poll(Duration.ofSeconds(1)); for (ConsumerRecord record : records) { System.out.println(record.key() + "--" + record.value()); } } } } 其中 consumer.properties 如下: # comma separated host:port pairs, each corresponding to a zk bootstrap.servers=localhost:9092 # timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000 #consumer group id group.id=test-consumer-group #consumer timeout #consumer.timeout.ms=5000 key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=org.apache.kafka.common.serialization.StringDeserializer #offset submission method enable.auto.commit=true # earliest or latest auto.offset.reset=latest security.protocol=SASL_PLAINTEXT sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="consumer" password="consumer@123"; 此时会发现并不能消费,异常信息为: Exception in thread "main" org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [demo1] 原因是该消费者组没有消费主题的权限,即 ACL 操作,当然换成 admin 是不存在这个问题的 三、Kafka ACL配置用户具有某个主题的写权限,即生产数据 kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:producer --producer --topic demo1 配置用户具有某个主题的读权限,即消费数据 kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:consumer --consumer --topic demo1 --group test-consumer-group 查看 ACL 列表 kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --list --topic demo1 同时也可以取消用户权限,还可以限制 ip 等,具体 ACL操作见 kafka官方文档 四、以下讲述 spring-kafka 的使用 引用 spring-kafka org.springframework.kafka spring-kafkaapplication.yml 文件如下 spring: kafka: bootstrap-servers: 172.20.238.103:9092 producer: value-serializer: org.apache.kafka.common.serialization.StringSerializer key-serializer: org.apache.kafka.common.serialization.IntegerSerializer properties: security: protocol: SASL_PLAINTEXT sasl: mechanism: PLAIN jaas: config: 'org.apache.kafka.common.security.scram.ScramLoginModule required username="producer" password="producer@123";' consumer: group-id: test-consumer-group auto-offset-reset: earliest enable-auto-commit: true key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer properties: security: protocol: SASL_PLAINTEXT sasl: mechanism: PLAIN jaas: config: 'org.apache.kafka.common.security.scram.ScramLoginModule required username="consumer" password="consumer@123";'生产者: import com.example.collection.kafka.service.IKafkaService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @Service public class KafkaService implements IKafkaService { @Autowired KafkaTemplate kafkaTemplate; @Override public void product(String msg) { try { System.out.println("调用发送消息"); kafkaTemplate.send("kafkaTopic", msg); } catch (Exception e) { e.printStackTrace(); } } }消费者: import com.alibaba.fastjson.JSON; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.util.Optional; @Component public class KafkaConsumer { private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); @KafkaListener(topics= {"kafkaTopic"}) public void listener(ConsumerRecord record) { logger.info("kafka 监听---------"); Optional msg= Optional.ofNullable(record.value()); if(msg.isPresent()) { Object o = msg.get(); String s = JSON.toJSONString(o); System.out.println("接收到kafka消息: " + s); } } } |
今日新闻 |
推荐新闻 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |