SpringBoot快速集成Kafka |
您所在的位置:网站首页 › kafka相关命令 › SpringBoot快速集成Kafka |
SpringBoot快速集成Kafka
由于需要对接kafka进行数据同步,在对接Kafka时由于内外网的原因没办法直接消费数据,故参考内网环境搭建一个Kafka来测试是否网络已通 一、概述Kafka部署依赖于zookeeper,所以部署方式采用docker compose部署 环境对安全要求较高,需要添加安全验证,使用SASL/PLAIN验证方式 为了方便于大家测试,代码已提交到Gitee中,欢迎Star⭐️一下~ 二、集成SpringBoot 自定义topic配置文件中添加topic、groupId的配置,便于修改 project: kafka: topicIds: somliy-test groupId: group-id-1 复制代码通过读取配置文件的方式,把配置信息加载到Spring中 package top.somliy.kafka.config; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import top.somliy.kafka.properties.KafkaProperties; /** * 类名: @ClassName KafkaTopicConfig * 创建人:@author zhao dong * 类描述:@Description: * 创建时间: 2023/3/20 16:45 */ @Slf4j @Configuration public class KafkaConfig implements InitializingBean { @Autowired private KafkaProperties kafkaProperties; @Override public void afterPropertiesSet() { // 获取配置 String topicIds = kafkaProperties.getTopicIds(); String groupId = kafkaProperties.getGroupId(); log.info("KafkaConfig 读取配置,topicIds:" + topicIds); log.info("KafkaConfig 读取配置,groupId:" + groupId); // 系统写入 System.setProperty("topicIds", topicIds); System.setProperty("groupId", groupId); } } 复制代码在读入到系统中后,监听注解就可以写成如下格式 /** * kafka监听消息 * * @param kafkaMessage 消息 */ @KafkaListener(topics = "#{'${topicIds}'.split(',')}", groupId = "${groupId}") public void onMessage(KafkaMessage kafkaMessage) { log.info("[线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), kafkaMessage); } 复制代码 三、使用Offset Explorer连接Kafka property填写 Security填写 Advanced填写 JAAS Config填写分别为用户名,密码(与docker compose中保持一致) org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret"; 复制代码 四、集成时遇到的问题 序列化方式在配置中,需要正确配置生产者和消费者的序列化方式,否则启动会报错。 spring: kafka: *** consumer: key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer *** producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer *** 复制代码 因为需要添加安全验证,生产消费双防都需要添加身份校验数据 spring: kafka: *** consumer: properties: spring: json: trusted: # 配置 信任, 不认会抛异常 is not in the trusted packages packages: top.somliy.kafka.message session.timeout.ms: 15000 security.protocol: SASL_PLAINTEXT sasl.mechanism: PLAIN sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="admin-secret"; 复制代码 添加信任类 spring: kafka: consumer: *** properties: spring: json: trusted: # 配置 信任, 不认会抛异常 is not in the trusted packages packages: top.somliy.kafka.message 复制代码 五、使用方法启动SpringBoot 运行test中的发送消息方法 |
今日新闻 |
推荐新闻 |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |