Spring Boot 整合 Kafka |
您所在的位置:网站首页 › springboot安装包 › Spring Boot 整合 Kafka |
![]() ![]() ![]() 环境:自行创建 Spring Boot 项目,添加测试依赖,并启动 Zookeeper 和 kafka 服务。 注意:Zookeeper 默认好像占用 8080 端口,自己注意端口占用问题。 1. 添加依赖 org.springframework.kafka spring-kafka 2. 添加配置 # kafka 配置 spring: kafka: bootstrap-servers: localhost:9092 producer: # 发生错误后,消息重发的次数。 retries: 1 #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。 batch-size: 16384 # 设置生产者内存缓冲区的大小。 buffer-memory: 33554432 # 键的序列化方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer # 值的序列化方式 value-serializer: org.apache.kafka.common.serialization.StringSerializer # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。 # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。 # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。 acks: 1 consumer: # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D auto-commit-interval: 1S # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录) # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录 auto-offset-reset: earliest # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量 enable-auto-commit: false # 键的反序列化方式 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # 值的反序列化方式 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer listener: # 在侦听器容器中运行的线程数。 concurrency: 5 # listner负责ack,每调用一次,就立即commit ack-mode: manual_immediate missing-topics-fatal: false 3. 创建消息生产者 @Component public class KafkaProducer { private Logger logger = LoggerFactory.getLogger(KafkaProducer.class); @Resource private KafkaTemplate kafkaTemplate; public static final String TOPIC_TEST = "Hello-Kafka"; public static final String TOPIC_GROUP = "test-consumer-group"; public void send(Object obj) { String obj2String = JSON.toJSONString(obj); logger.info("准备发送消息为:{}", obj2String); // 发送消息 ListenableFuture future = kafkaTemplate.send(TOPIC_TEST, obj); future.addCallback(new ListenableFutureCallback() { @Override public void onFailure(Throwable throwable) { //发送失败的处理 logger.info(TOPIC_TEST + " - 生产者 发送消息失败:" + throwable.getMessage()); } @Override public void onSuccess(SendResult stringObjectSendResult) { //成功的处理 logger.info(TOPIC_TEST + " - 生产者 发送消息成功:" + stringObjectSendResult.toString()); } }); } } 4. 创建消息消费者 @Component public class KafkaConsumer { private Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); @KafkaListener(topics = KafkaProducer.TOPIC_TEST, groupId = KafkaProducer.TOPIC_GROUP) public void topicTest(ConsumerRecord record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { Optional message = Optional.ofNullable(record.value()); if (message.isPresent()) { // 包含非空值,则执行 Object msg = message.get(); logger.info("topic_test 消费了: Topic:" + topic + ",Message:" + msg); ack.acknowledge(); // 确认成功消费一个消息 } } } 5. 消息发送测试 @RunWith(SpringRunner.class) @SpringBootTest public class KafkaProducerTest { private Logger logger = LoggerFactory.getLogger(KafkaProducerTest.class); @Resource private KafkaProducer kafkaProducer; // 注意使用自己创建的,看清楚! /* 测试之前需要开启 Kafka 服务 启动 Zookeeper:bin/zookeeper-server-start.sh -daemon config/zookeeper.properties 启动 Kafka:bin/kafka-server-start.sh -daemon config/server.properties 测试结果数据: 准备发送消息为:"你好,我是Lottery 001" Hello-Kafka - 生产者 发送消息成功:SendResult [producerRecord=ProducerRecord(topic=Hello-Kafka, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=你好,我是Lottery 001, timestamp=null), recordMetadata=Hello-Kafka-0@47] topic_test 消费了: Topic:Hello-Kafka,Message:你好,我是Lottery 001 */ @Test public void test_send() throws InterruptedException { // 循环发送消息 while (true) { kafkaProducer.send("你好,我是Lottery 001"); Thread.sleep(3500); } } } 内容来源于网络如有侵权请私信删除文章来源: 博客园 原文链接: https://www.cnblogs.com/luisblog/p/17307292.html 标签: java java8 java开发 你还没有登录,请先登录或注册! 还没有人评论,欢迎说说您的想法! 相关课程![]() ![]() ![]() |
CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3 |