Spring Cloud Stream 整合Kafka

您所在的位置:网站首页 mq中间件qq模式qa模式 Spring Cloud Stream 整合Kafka

Spring Cloud Stream 整合Kafka

2023-04-28 10:09| 来源: 网络整理| 查看: 265

Spring Cloud Stream是一个构建消息驱动微服务的框架,抽象了MQ的使用方式, 提供统一的API操作。Spring Cloud Stream通过Binder(绑定器)、inputs/outputschannel完成应用程序和MQ的解耦。Spring Cloud Stream的模型如下图:image.png

Binder

负责绑定应用程序和MQ中间件,即指定应用程序是和KafKa交互还是和RabbitMQ交互或者和其他的MQ中间件交互

inputs/outputs channel

inputs/outputs channel抽象发布订阅消息的方式,即无论是什么类型的MQ应用程序都通过统一的方式发布订阅消息

Spring Cloud Stream主要配置binder

绑定MQ中间件及配置

bindings

管理所有的Topic

des1tination指定发布订阅的TopiccontentType指定发布订阅消息的格式group指定消费者组,(一条消息只能被一组消息者中的一个消息者消费)示例

高版本的Spring Cloud Stream提供两种使用方式,一种是使用yml配置的方式绑定生产/消费者,另一种是通过Function的方式绑定生产/消费者。以下代码为使用Function的方式绑定生产/消费者

引入依赖

org.springframework.cloud spring-cloud-starter-stream-kafka 3.1.0

配置yml

spring: cloud: stream: kafka: binder: auto-create-topics: true # 自动创建topics brokers: ***.****.***.***:9092 bindings: logP-out-0: # 对用在ProducersConfig中的生产函数logP destination: log # logP将数据发送的topic contentType: application/json logC-in-0: # 对用在ConsumersConfig中的生产函数logC destination: log group: log_group addAgeC-in-0: destination: addAge group: addAge_group function: definition: logP;logC;addAgeC # 指定对应的函数为Spring Cloud Stream中的生产消费通道

编写生产者

方式1

@Configuration public class ProducersConfig { private BlockingQueue unbounded = new LinkedBlockingQueue(); /** * 对应yml中配置的logP-out-0通道,即topic log * @return java.util.function.Supplier * @Date 2020-12-27 **/ @Bean public Supplier logP(){ return () -> unbounded.poll(); } /** * 调用本方法向log topic发送消息 * * @param person: * @return void * @Date 2020-12-27 **/ public void log(Person person){ unbounded.offer(person); } }

方式2

@RestController public class UserController { @Autowired private StreamBridge streamBridge; @PostMapping("/addAge") public boolean addAge(@RequestBody Person person){ person.setAge(RandomUtil.randomInt(10, 90)); person.setSuccess(RandomUtil.randomBoolean()); person.setBirthday(new Date()); // 通过streamBridge直接对应的topic发送消息 return streamBridge.send("addAge", person); } }

编写消费者

@Configuration public class ConsumersConfig { /** * 对应yml中配置的logC-in-0通道,即topic log。 * 消费topic log中的消息 * * @return java.util.function.Consumer * @Date 2020-12-27 **/ @Bean public Consumer logC() { return person -> { System.out.println("Received: " + person); }; } /** * 对应yml中配置的addAgeC-in-0通道,即topic addAge。 * 消费topic addAge中的消息 * * @return java.util.function.Consumer * @Date 2020-12-27 **/ @Bean public Consumer addAgeC(){ return person -> { person.setAge(person.getAge() + 10); System.out.println("Consumer addAge: " + person.toString()); }; } }

发送消息

@RestController public class UserController { @Autowired private StreamBridge streamBridge; @Autowired private ProducersConfig producersConfig; @PostMapping("/log") public void log(@RequestBody Person person){ producersConfig.log(person); } @PostMapping("/addAge") public boolean addAge(@RequestBody Person person){ person.setAge(RandomUtil.randomInt(10, 90)); person.setSuccess(RandomUtil.randomBoolean()); person.setBirthday(new Date()); System.out.println("Producer addAge: " + person.toString()); return streamBridge.send("addAge", person); } }


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3