redis集群的发布订阅模式

您所在的位置:网站首页 消息队列订阅模式怎么关闭不了呢 redis集群的发布订阅模式

redis集群的发布订阅模式

2024-06-29 08:26| 来源: 网络整理| 查看: 265

项目开发过程中,遇到需要发消息的情况,是不是脑海里不自主的浮现kafka、rabbitmq等常用的消息队列?但如果消息非常简单,并且用量也不大,消息队列就会有点大材小用了吧,忽然想起了redis 也有消息队列的功能,只不过我们经常把redis 用作缓存(这个是redis最大的卖点),忽略了它的辅助技能,今天我就简单讲解一下 redis 的发布订阅模式如何使用。发布者和订阅者都是Redis客户端,Channel则为Redis服务器端,发布者将消息发送到某个频道,订阅了这个频道的订阅者就能接收到这条消息。Redis的这种发布订阅机制与基于主题的发布订阅类似,Channel相当于主题。 在这里插入图片描述方法的选择 操作 redis 集群的方法一般有两种,一种是 redis 官方推荐的 JedisCluster,另一种是RedisTemplate,这两种客户端工具均能实现redis 的发布订阅模式,下面我会逐一讲解。maven依赖 org.springframework.boot spring-boot-starter-data-redis Redsi 集群连接配置 1、在application.yml 文件添加属性 spring: redis-config: max-total: 50 max-idle: 10 min-idle: 5 max-wait-millis: 6000 test-on-borrow: true test-on-return: false test-while-idle: false max-redirects: 10 connection-timeout: 1000 so-timeout: 1000 address: - 192.168.xx.xxx:7360 - 192.168.yy.yyy:7360 - 192.168.zz.zzz:7360

2、注入config 文件

package com.jianmin.config; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisClusterConfiguration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.RedisNode; import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.JdkSerializationRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; import redis.clients.jedis.HostAndPort; import redis.clients.jedis.JedisCluster; import redis.clients.jedis.JedisPoolConfig; import java.util.HashSet; import java.util.List; import java.util.Set; /** * @Author: jianmin.li * @Description: redis配置 * @Date: 2019/5/28 14:47 * @Version: 1.0 */ @Configuration @Slf4j @Data @ConfigurationProperties(prefix = "spring.redis-config") public class RedisConfig { private List address; private Integer connectionTimeout; private Integer soTimeout; private Integer maxRedirects; @ConfigurationProperties(prefix = "spring.redis-config") @Bean public JedisPoolConfig jedisPoolConfig() { return new JedisPoolConfig(); } @Bean public JedisCluster jedisCluster(JedisPoolConfig poolConfig) { Set hostAndPorts = new HashSet(address.size()); for (String ipAndPort : address) { String[] arr = ipAndPort.split(":"); hostAndPorts.add(new HostAndPort(arr[0],Integer.parseInt(arr[1]))); log.warn("JedisCluster init Redis Address--->{}:{}",arr[0].trim(),arr[1]); } return new JedisCluster(hostAndPorts,connectionTimeout,soTimeout,maxRedirects,poolConfig); } @Bean public RedisClusterConfiguration redisClusterConfiguration() { RedisClusterConfiguration redisClusterConfiguration = new RedisClusterConfiguration(); Set nodes = new HashSet(address.size()); for (String ipPort : address) { String[] ipAndPort = ipPort.split(":"); nodes.add(new RedisNode(ipAndPort[0].trim(),Integer.valueOf(ipAndPort[1]))); log.warn("RedisTemplate init Redis Address--->{}:{}",ipAndPort[0].trim(),ipAndPort[1]); } redisClusterConfiguration.setClusterNodes(nodes); redisClusterConfiguration.setMaxRedirects(maxRedirects); return redisClusterConfiguration; } @Bean public JedisConnectionFactory jedisConnectionFactory(JedisPoolConfig jedisPoolConfig,RedisClusterConfiguration redisClusterConfiguration) { JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory(redisClusterConfiguration, jedisPoolConfig); jedisConnectionFactory.afterPropertiesSet(); return jedisConnectionFactory; } @Bean public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate redisTemplate = new RedisTemplate(); redisTemplate.setConnectionFactory(redisConnectionFactory); redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer()); redisTemplate.setEnableTransactionSupport(true); redisTemplate.setHashKeySerializer(new StringRedisSerializer()); redisTemplate.setHashValueSerializer(new Jackson2JsonRedisSerializer(Object.class)); redisTemplate.afterPropertiesSet(); return redisTemplate; } @Bean RedisMessageListenerContainer redisMessageListenerContainer(MessageListenerAdapter listenerAdapter, JedisConnectionFactory jedisConnectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(jedisConnectionFactory); container.addMessageListener(listenerAdapter,Arrays.asList(new PatternTopic ("channel-lijianmin-redistemplate"))); return container; } } 使用方法 1、JedisCluster 实现redis 的发布订阅 发送消息 @CrossOrigin @RestController @RequestMapping("/demo") public class Demo { @Autowired private JedisCluster jedisCluster; /** * 发送消息非常简单,直接使用 JedisCluster 的 publish 方法即可。发送消息成 * 功,会返回一个长整型数值0L。 * * @param * @return : void * @Author: jianmin.li * @Date: 2019/10/25 13:38 */ @GetMapping("/jedisCluster") public void method() { Long publish = jedisCluster.publish("channel-lijianmin","这是来自JedisCluster发布者的消息"); System.err.println(publish); } }

监听消息 要想让redis 在web 容器开启时就一直处于订阅状态,考虑通过listener 实现: 自定义MyListener,实现ServletContextListener,开启异步线程去监听redis 的channel,因为JedisCluster 的发布和订阅是阻塞的,如果用同步监听,那么你的项目就起不来了,一直阻塞在JedisCluster 初始化的地方,所以这里必须用异步线程去监听redsi 的channel。

package com.jianmin.util; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import redis.clients.jedis.JedisCluster; import javax.servlet.ServletContextEvent; import javax.servlet.ServletContextListener; import javax.servlet.annotation.WebListener; /** * @Author: jianmin.li * @Description: 消息监听 * @Date: 2019/5/30 12:55 * @Version: 1.0 */ @Configuration @WebListener public class MyListener implements ServletContextListener { @Autowired private MySubscribe jedisPubSub; @Autowired private JedisCluster jedisCluster; @Override public void contextInitialized(ServletContextEvent sce) { new Thread(() -> jedisCluster.subscribe(jedisPubSub,"channel-lijianmin")).start(); } @Override public void contextDestroyed(ServletContextEvent sce) { System.out.println("ServletContext容器销毁了。。。"); } }

自定义 MySubscribe 并继承 JedisPubSub,实现对 redis 通道的监听,重写 onMessage 方法,该方法可以获取通道的名称以及监听到的消息,具体处理消 息的逻辑就写在这里。

package com.jianmin.util; import org.springframework.stereotype.Component; import redis.clients.jedis.JedisPubSub; /** * @Author: jianmin.li * @Description: 监听redis的通道 * @Date: 2019/5/30 12:51 * @Version: 1.0 */ @Component public class MySubscribe extends JedisPubSub { /** * 监听到的消息在这里进行业务处理 * * @param channel * @param message * @return : void * @Author: jianmin.li * @Date: 2019/10/25 13:46 */ @Override public void onMessage(String channel,String message) { System.err.println("redis通道" + channel + "监听到的消息:" + message); } }

启动项目,触发接口

redis发布订阅模式--jedisCluster客户端

2、RedisTemplate 实现redis 的发布订阅 发送消息

@CrossOrigin @RestController @RequestMapping("/demo") public class Demo { @Autowired private RedisTemplate redisTemplate; /** * 使用 RedisTemplate 发送消息也非常简单, * 直接调用 convertAndSend 方法即可。 * * @param * @return : void * @Author: jianmin.li * @Date: 2019/10/25 14:10 */ @GetMapping("/redisTemplate") public void send() { redisTemplate.convertAndSend("channel-lijianmin-redistemplate","这是来自RedisTemplate发布者的消息"); } }

监听消息 自定义RedisSubscriber 并继承MessageListenerAdapter,重写onMessage 方 法,该方法可以获取通道的名称以及监听到的消息,具体处理消息的逻辑就写 在这里。

package com.jianmin.util; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import org.springframework.stereotype.Component; /** * @Author: jianmin.li * @Description: 监听redis通道 * @Date: 2019/5/30 15:37 * @Version: 1.0 */ @Component public class RedisSubscriber extends MessageListenerAdapter { @Autowired private RedisTemplate redisTemplate; @Override public void onMessage(Message message,byte[] pattern) { System.err.println("通道----->" + redisTemplate.getKeySerializer().deserialize(message.getChannel())); System.err.println("消息----->" + redisTemplate.getValueSerializer().deserialize(message.getBody())); } }

在上面的RedisConfig 里面注入redis 消息监听容器 (上面代码已经注入)

@Bean RedisMessageListenerContainer redisMessageListenerContainer(MessageListenerAdapter listenerAdapter, JedisConnectionFactory jedisConnectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(jedisConnectionFactory); container.addMessageListener(listenerAdapter,Arrays.asList(new PatternTopic ("channel-lijianmin-redistemplate"))); return container; }

启动项目,触发接口

redis发布订阅模式--redisTemplate客户端

总结: 1、 redis 的发布订阅模式,监听了同一个通道的监听者都能收到相同的消息,在集群模式下,多个相同的监听者进行相同的信息消费,容易产生表单重复提交的问题,在实际开发中需要注意,建议采取分布式锁; 2、 JedisCluster 的发布订阅,在容器启动时要异步去监听redis 通道; 3、 RedisTemplate 的发布订阅,在监听到消息时,建议用redisTemplate 的KeySerializer反序列化通道,用ValueSerializer 反序列化消息,不建议用new String()反序列化,因为redisTemplate 在初始化时如果指定了相关的序列化转换器,在序列化和反序列化时就必须使用相同的序列化转换器,否则容易出现反序列化异常或乱码。


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3