SpringBoot快速集成Kafka

您所在的位置:网站首页 kafka相关命令 SpringBoot快速集成Kafka

SpringBoot快速集成Kafka

2023-03-22 22:49| 来源: 网络整理| 查看: 265

SpringBoot快速集成Kafka

由于需要对接kafka进行数据同步,在对接Kafka时由于内外网的原因没办法直接消费数据,故参考内网环境搭建一个Kafka来测试是否网络已通

一、概述

Kafka部署依赖于zookeeper,所以部署方式采用docker compose部署

环境对安全要求较高,需要添加安全验证,使用SASL/PLAIN验证方式

为了方便于大家测试,代码已提交到Gitee中,欢迎Star⭐️一下~

二、集成SpringBoot 自定义topic

配置文件中添加topic、groupId的配置,便于修改

project: kafka:   topicIds: somliy-test   groupId: group-id-1 复制代码

通过读取配置文件的方式,把配置信息加载到Spring中

package top.somliy.kafka.config; ​ ​ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import top.somliy.kafka.properties.KafkaProperties; ​ /** * 类名: @ClassName KafkaTopicConfig * 创建人:@author zhao dong * 类描述:@Description: * 创建时间: 2023/3/20 16:45 */ @Slf4j @Configuration public class KafkaConfig implements InitializingBean { ​    @Autowired    private KafkaProperties kafkaProperties; ​    @Override    public void afterPropertiesSet() {        // 获取配置        String topicIds = kafkaProperties.getTopicIds();        String groupId = kafkaProperties.getGroupId();        log.info("KafkaConfig 读取配置,topicIds:" + topicIds);        log.info("KafkaConfig 读取配置,groupId:" + groupId);        // 系统写入        System.setProperty("topicIds", topicIds);        System.setProperty("groupId", groupId);   } } 复制代码

在读入到系统中后,监听注解就可以写成如下格式

/**     * kafka监听消息     *     * @param kafkaMessage 消息     */    @KafkaListener(topics = "#{'${topicIds}'.split(',')}", groupId = "${groupId}")    public void onMessage(KafkaMessage kafkaMessage) {        log.info("[线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), kafkaMessage);   } 复制代码 三、使用Offset Explorer连接Kafka property填写

image-20230321232627652.png

Security填写

image-20230321232653436.png

Advanced填写

image-20230321232732935.png

JAAS Config填写

分别为用户名,密码(与docker compose中保持一致)

org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret"; 复制代码

image-20230321232747314.png

四、集成时遇到的问题 序列化方式

在配置中,需要正确配置生产者和消费者的序列化方式,否则启动会报错。

spring: kafka: ***   consumer:     key-deserializer: org.apache.kafka.common.serialization.StringDeserializer     value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer     ***   producer:   key-serializer: org.apache.kafka.common.serialization.StringSerializer     value-serializer: org.springframework.kafka.support.serializer.JsonSerializer     ***     复制代码 因为需要添加安全验证,生产消费双防都需要添加身份校验数据 spring: kafka: ***   consumer:     properties:       spring:         json:           trusted:              # 配置 信任, 不认会抛异常 is not in the trusted packages             packages: top.somliy.kafka.message       session.timeout.ms: 15000       security.protocol: SASL_PLAINTEXT       sasl.mechanism: PLAIN       sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule         required username="admin" password="admin-secret"; 复制代码 添加信任类 spring: kafka:   consumer:     ***     properties:       spring:         json:           trusted:              # 配置 信任, 不认会抛异常 is not in the trusted packages             packages: top.somliy.kafka.message 复制代码 五、使用方法

启动SpringBoot

image-20230321233039383.png 运行test中的发送消息方法

image-20230321233304702.png



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3