设计一个简单的点赞功能

您所在的位置:网站首页 微信如何关闭点赞功能 设计一个简单的点赞功能

设计一个简单的点赞功能

2023-08-19 11:47| 来源: 网络整理| 查看: 265

新增功能:点赞

现在几乎所有的媒体内容,无论是商品评价、话题讨论还是朋友圈都支持点赞,点赞功能成为了互联网项目的标配,那么我们也尝试在评价系统中加入点赞功能,实现为每一个评价点赞。

豆瓣短评中的点赞:

image-20210407234242890

要实现的点赞需求细节:

image-20210414215247131

从放弃出发

完整得实现点赞系统功能是很困难的。要支持亿级的用户数量,又要做到数据归档入库,要支持高峰期百万的秒并发写入,又要实现多客户端实时同步,要记录并维护用户的点赞关系,又要展示用户的点赞列表,这样全方位的需求会产生设计上的矛盾,就像CAP矛盾一样。

典型的比如并发量和同步性的矛盾。高并发的本质是速度,网络传输速度和程序运行速度决定了系统所能承载的容量,每个请求处理速度快才能在单位时间内处理更多的请求,只是一味得增大连接数而忽略请求响应时间,并发问题得不到根本性的解决。在我看来,应用程序内部运行速度的瓶颈在于三处,优先级由高到低是网络请求、对象创建、冗余计算,网络请求对响应速度具有决定性的影响力。但是,同步性又要求我们进行网络请求,比如同步数据到mysql或redis之中。鱼与熊掌不可兼得,并发量和同步性具有不可调和的矛盾。

还有存储容量与访问速度的矛盾。要记录用户的点赞列表,就意味着要长期维护用户的点赞关系,日积月累,用户的点赞关系在单台存储系统中装不下,需要写入分布式存储系统中,这带来了额外的复杂度与调度时延,并且需要很好地设计区分维度,不同分区之间数据不耦合。而一旦一次查询跨越了多个存储节点,就会产生级联调用,具有较大的网络时延。

要实现,先舍弃。看到一个新的需求时,我习惯于反向思考,观察这个需求不涉及到哪些功能,哪些功能可以放弃,从这个角度出发,很容易找到取巧而又简单,却能满足当前需求的设计方案。

重新列一个需求清单,上面写了不需要实现哪些功能,这样做设计决策时,就豁然开朗了。

image-20210414225218732

产品经理只会给你提供表格1,他们很少会显示说明什么不需要做。在决定放弃时,还是需要商量一下,因为这些需求往往是软性的,需求文档中没有包含不一定是不需要,也有可能是没考虑到。

如何记录用户的点赞关系

点赞关系是典型的K-V类型或是集合类型,用Redis实现是比较合适的,那么用Redis中的哪种数据类型呢?

下表列出了能想到的数据类型与它们各自的优劣。

image-20210414231656907

比较关键的特性是批量查询和内存占用,批量查询特性使得可以在一次请求中查询全部的点赞关系,内存占用使得可以用尽可能少的redis节点,甚至一台redis解决存储问题。

我选择字符串类型,因为哈希类型真的很难实现点赞数据的淘汰,除非记录点赞时间并且定期全局扫描,或者记录双份哈希键,做新旧替换,代价太高,不合适。而淘汰机制本身就是解决内存占用问题,所以字符串类型不会占用异常多的内存。

image-20210415101020806

点赞操作的原子性

点赞操作需要改写两个值,一个是用户对内容的点赞关系,另一个是内容的点赞总数,这两个能不能放在一个key中表示呢?显然是不行的。所以需要先设置用户的点赞关系,再增加点赞总数,如果点赞关系已经存在,就不能增加点赞总数。

设置点赞关系可以用setnx命令实现,仅当不存在key时才设置,并返回一个是否设置的标志,根据这个标志决定是否增加点赞总数。比如:

if setnx(key1) == 1 then incr(key2)

看似每个操作都是原子性的,但是这样的逻辑如果在客户端执行,整体上仍不满足原子性,仍有可能在两个操作之间发生中断,导致点赞成功但是没有增加计数的情况发生。虽然这对于点赞系统来说不是什么大问题,极少出现的概率可以接受,但是我们完全可以做的更好。

redis的事务或脚本特性可以解决上述的问题。脚本的实现更加灵活自由,而且能减少网络请求,我们选择脚本的方式:

--点赞操作,写入并自增,如果写入失败则不自增,【原子性、幂等性】 if redis.call('SETNX',KEYS[1],1) == 1 then redis.call('EXPIRE',KEYS[1],864000) redis.call('INCR',KEYS[2]) end return redis.call('GET',KEYS[2]) --取消点赞操作,删除并递减,如果删除失败则不递减,【原子性、幂等性】 if redis.call('DEL',KEYS[1]) == 1 then redis.call('DECR',KEYS[2]) end return redis.call('GET',KEYS[2])

稳定性的基本要求之一就是数据不能无限膨胀,否则迟早出问题,任何存储方案都必须设计与之对应的销毁方案,才能保证系统的稳定长久运行。所以设置KEY1的有效期非常重要,而KEY2可能需要一直保持,由其他机制来删除它,比如销毁陈旧评价或折叠评价时,需要删除对应的KEY2.

脚本返回了点赞后的总数,这对后续数据归档是有帮助的。

封装脚本操作

既然已经决定了redis存储方式,那么就先来实现它。一步一个脚印,扎扎实实地把点赞功能完成。

首先使用Spring配置Lua脚本,它自动预加载脚本,不用麻烦在redis服务器上用script load预编译。

/** * Lua脚本 */ @Configuration public class LuaConfiguration { /** * [点赞]脚本 lua_set_and_incr */ @Bean public DefaultRedisScript voteScript() { DefaultRedisScript redisScript = new DefaultRedisScript(); redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("script/lua_set_and_incr.lua"))); redisScript.setResultType(Integer.class); return redisScript; } /** * [取消点赞]脚本 lua_del_and_decr */ @Bean public DefaultRedisScript noVoteScript() { DefaultRedisScript redisScript = new DefaultRedisScript(); redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("script/lua_del_and_decr.lua"))); redisScript.setResultType(Integer.class); return redisScript; } } /** * 点赞箱 */ @Repository public class VoteBox { private final RedisTemplate redisTemplate; private final DefaultRedisScript voteScript; private final DefaultRedisScript noVoteScript; public VoteBox(RedisTemplate redisTemplate, DefaultRedisScript voteScript, DefaultRedisScript noVoteScript) { this.redisTemplate = redisTemplate; this.voteScript = voteScript; this.noVoteScript = noVoteScript; } /** * 给评价投票(点赞),用户增加评价点赞记录,评价点赞次数+1.该操作是原子性、幂等性的。 * @param voterId 投票人 * @param contentId 投票目标内容id * @return 返回当前最新点赞数 */ public Integer vote(long voterId, long contentId){ //使用lua脚本 List list = new ArrayList(); list.add(MessageFormat.format(RedisKeyConstants.VOTE_USER_PATTERN, voterId, contentId)); list.add(MessageFormat.format(RedisKeyConstants.VOTE_SUM_PATTERN, contentId)); return redisTemplate.execute(voteScript, list); } /** * 取消给评价投票(点赞),用户删除评价点赞记录,评价点赞次数-1.该操作是原子性、幂等性的。 * @param voterId 投票人 * @param contentId 投票目标内容id * @return 返回当前最新点赞数 */ public Integer noVote(long voterId, long contentId){ //使用lua脚本 List list = new ArrayList(); list.add(MessageFormat.format(RedisKeyConstants.VOTE_USER_PATTERN, voterId, contentId)); list.add(MessageFormat.format(RedisKeyConstants.VOTE_SUM_PATTERN, contentId)); return redisTemplate.execute(noVoteScript, list); } } 点赞的流程

点赞的流程可以用如下时序图表示:

image-20210415151828448

服务端接收用户的点赞请求 执行redis脚本,并返回点赞总数信息,redis保存点赞功能的暂时数据 发送普通消息到消息队列 以上两步执行成功后响应点赞完成,否则加入重试队列 重试队列异步重试请求redis或消息队列,直到成功或重试次数用尽 消息队列消费者接收消息,并将消息写入mysql

为什么加入消息队列这个角色?因为消息队列使得同步和异步可以优雅的分离。redis命令需要在当前请求中完成,用户想看到请求的执行结果,希望在其他客户端上立刻看到自己的点赞状态,这个举例可能不太恰当,点赞也可能是单向请求,用户没有那么在乎同步性,这里只是为了演示案例。而数据入库或者是其他操作不需要在当前请求生命周期内完成。

如果同步可以称之为“在线服务”,那么异步可以称之为“半在线半离线服务”,虽然不在请求的生命周期内,但是运行于在线服务器之上,占用cpu和内存,占用网络带宽,势必给线上业务造成影响。当异步模式调整时,需要连同在线业务一起发布,造成逻辑上的耦合。而消息队列让“离线服务”成为可能,消费者可以与在线服务器独立开来,独立开发独立部署,无论是物理上还是逻辑上都完全解耦。当然前提是消息对象的序列化格式一致,所以我喜欢使用字符串作为消息对象的内容,而不是对象序列化。

实现mysql的点赞入库

设计好redis的存储方案后,接下来设计mysql的存储方案。

首先是表结构:

#点赞/投票归档表 CREATE TABLE IF NOT EXISTS vote_document ( id INT primary key auto_increment COMMENT 'ID', gmt_create datetime not null default CURRENT_TIMESTAMP COMMENT '创建时间', voter_id INT not null COMMENT '投票人id', contentr_id INT not null COMMENT '投票内容id', voting TINYINT not null COMMENT '投票状态(0:取消投票 1:投票)', votes INT not null COMMENT '投下/放弃这一票后,内容在此刻的投票总数', create_date INT not null COMMENT '创建日期 如:20210414 用于分区分表' ); insert into vote_document(voter_id,content_id,voting,votes,create_date) values(1,1,1,1,'20210414');

显然,这是一个以Insert代替Update的日志表,无论是点赞、取消点赞还是重新点赞,都是追加新的记录,而不是修改原有记录。这样做有两个原因,一是Insert不用锁表,执行效率远高于Update,二是蕴含的信息更丰富,可以看到用户的完整行为,对于大数据分析是有帮助的。

Insert代替Update之后,一大难点就是数据聚合,解决方案就是每一次插入,都冗余地记录聚合状态,就像votes字段一样,分析时只需要拿相关评价的最后一条记录即可知道点赞总数,而不需全表扫描。

入库代码:

@Repository public class VoteRepository { @Autowired private JdbcTemplate db; /** * 添加点赞 * @param vote 点赞对象 * @return 如果插入成功,返回true,否则返回false */ public boolean addVote(/*valid*/ Vote vote) { String sql = "insert into vote_document(voter_id,content_id,voting,votes,create_date) values(?,?,?,?,?)"; return db.update(sql, vote.getVoterId(), vote.getContentId(), vote.getVoting(), vote.getVotes(), Sunday.getDate()) > 0; } } RocketMQ

Apache RocketMQ是一种低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列RocketMQ既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。

消息队列核心概念:

Topic:消息主题,一级消息类型,生产者向其发送消息。 Broker:中间人/经纪人,消息队列集群的节点,负责保存和收发消息。 生产者:也称为消息发布者,负责生产并发送消息至Topic。 消费者:也称为消息订阅者,负责从Topic接收并消费消息。 Tag:消息标签,二级消息类型,表示Topic主题下的具体消息分类。 消息:生产者向Topic发送并最终传送给消费者的数据和(可选)属性的组合。 消息属性:生产者可以为消息定义的属性,包含Message Key和Tag。 Group:一类生产者或消费者,这类生产者或消费者通常生产或消费同一类消息,且消息发布或订阅的逻辑一致。

生产者发送消息到消息队列,最终发送到消费者的示意图如下:

image-20210112223820896

消息类型可以划分为:

普通消息。也称并发消息,没有顺序,生产消费都是并行的,拥有极高的吞吐性能 事务消息。提供了保证消息一定送达到broker的机制。 分区顺序消息。Topic分为多个分区,在一个分区内遵循先入先出原则。 全局顺序消息。把Topic分区数设置为1,所有消息都遵循先入先出原则。 定时消息。将消息发送到MQ服务端,在消息发送时间(当前时间)之后的指定时间点进行投递 延迟消息。将消息发送到MQ服务端,在消息发送时间(当前时间)之后的指定延迟时间点进行投递

消费方式可以划分为:

集群消费。任意一条消息只需要被集群内的任意一个消费者处理即可。 广播消费。将每条消息推送给集群内所有注册过的消费者,保证消息至少被每个消费者消费一次。

消费者获取消息模式可以划分为:

Push。开启单独的线程轮询broker获取消息,回调消费者的接收方法,仿佛是broker在推消息给消费者。 Pull。消费者主动从消息队列拉取消息。 使用RocketMQ

我们使用某云产品的RocketMq消息队列,按照官方文档,先在云控制中心创建Group和Topic,然后引入maven依赖,创建好MqConfig连接配置对象。最后:

配置生产者(在项目A):

@Configuration public class ProducerConfig { @Autowired private MqConfig mqConfig; @Bean(initMethod = "start", destroyMethod = "shutdown") public Producer buildProducer() { return ONSFactory.createProducer(mqConfig.getMqPropertie()); } }

配置消费者(在项目B):

@Configuration public class ConsumerClient { @Autowired private MqConfig mqConfig; @Autowired private VoteMessageReceiver receiver; @Bean(initMethod = "start", destroyMethod = "shutdown") public ConsumerBean buildConsumer() { ConsumerBean consumerBean = new ConsumerBean(); Properties properties = mqConfig.getMqPropertie(); properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.GROUP_CONSUMER_ID); properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "10"); consumerBean.setProperties(properties); Map subscriptionTable = new HashMap(); Subscription subscription = new Subscription(); subscription.setTopic(mqConfig.TOPIC_ISSUE); subscription.setExpression(mqConfig.TAG_ISSUE); subscriptionTable.put(subscription, receiver); consumerBean.setSubscriptionTable(subscriptionTable); return consumerBean; } }

创建消息接收、监听器:

/** * 投票消息接收器 */ @Component public class VoteMessageReceiver implements MessageListener { private final VoteRepository voteRepository; public VoteMessageReceiver(VoteRepository voteRepository) { this.voteRepository = voteRepository; } @Override public Action consume(Message message, ConsumeContext context) { try { JSONObject object = JSONObject.parseObject(new String(message.getBody())); Vote vote = new Vote(); vote.setVoterId(object.getLongValue("voterId")); vote.setContentId(object.getLongValue("contentId")); vote.setVoting(object.getIntValue("voting")); vote.setVotes(object.getLongValue("votes")); try { vote.validate(); voteRepository.addVote(vote); } catch (IllegalArgumentException ignored) { } return Action.CommitMessage; }catch (Exception e) { e.printStackTrace(); return Action.ReconsumeLater; } } }

发送消息的生产者,再稍稍封装一下:

/** * 消息生产者,消息投递仓库 */ @Repository public class MessagePoster { private final Producer producer; public MessagePoster(Producer producer) { this.producer = producer; } public void sendMessage(String topic, String tag, String content){ Message message = new Message(); message.setTopic(topic); message.setTag(tag); message.setBody(content.getBytes(StandardCharsets.UTF_8)); producer.send(message); } public void sendMessage(String topic, String content){ sendMessage(topic, "", content); } }

发布消费者,在云控制中心测试(确保流程走通,步步为营):

image-20210415171026192

能达成一致吗

执行redis命令与发送消息这两步,能做到一致性吗,也就是常说的同时完成与同时失败?如果是同构的系统,可以利用系统本身的特性实现事务,比如同是redis操作可以使用redis事务或脚本,前面已经这么做了,如果同是数据库操作,可以使用数据库事务,其他存储系统应该也有类似的支持。

但它们是异构的系统,只能通过在客户端实现事务逻辑或者由第三方协调。常见的客户端实现方法是回滚:

try{ redis.call(); mq.call(); }catch(MqException e){ //只有mq出错时才需要回滚 //使用反向操作回滚 redis.rollback(); }

但是如果回滚失败呢?如果消息发到MQ但却接收失败呢?如果依赖的服务不支持回滚呢?在苛刻的条件下实现苛刻的一致性是不可能的。

还是应该反向思考,有选择性地舍弃某些不重要的部分,才能实现我们的需求。在目前这个需求中,没有必要为了redis和MQ的同步引入第三方的事务协调,但也不能对明显的事务问题视而不见。

我总结的分布式事务解决思路导图:

image-20210415202027996

我们选择使用重试队列来解决这个问题。

设计重试队列

不局限于当前的分布式事务问题,我们设计一个较为通用的重试队列。

先设计重试队列中的基本概念:任务。一个任务由多个单元组成,可计算单元表示有返回值的方法对象,执行单元表示没有返回值的方法对象,但是会接收上一步可计算单元的返回值作为入参。任务中保持了单元的单向链表,只有当一个单元执行成功后,才会指向下一个单元继续执行,但当执行失败时,会在当前单元不断重试直到成功,已执行通过的单元不会重试。这样就保证了各个单元的稳定、有序运行,每个环节的执行具有容错性。

image-20210415210047077

基础接口,让使用者可以自己实现任务执行失败的日志记录,比如持久化磁盘或是发送到远程服务器,避免任务丢失,是保持事务一致性的兜底方案之一,设置成缺省方法使得使用者有选择地实现,不强制一定要有失败处理方案。

/** * 失败记录器 */ interface IFailRecorder { /** * 记录每次重试的失败情况 * @param attemptTimes 重试次数,第一次重试=0 * @param e 导致失败的异常 */ default void recordFail(int attemptTimes, Exception e){} /** * 记录每次重试的失败情况 * @param attemptTimes 重试次数,第一次重试=0 */ default void recordFail(int attemptTimes){} /** * 记录重试之后的最终失败 * @param e 导致失败的异常,如果没有异常,返回null */ default void recordEnd(Exception e){} }

定义执行的基本单元,代表需要执行一个redis操作或是发送MQ操作,接口方法可能会由调度器重复地执行,所以要求接口实现者自身保证幂等性。

/** * 可重复执行的任务 */ public interface Repeatable extends IFailRecorder{ /** * Computes a result, or throws an exception if unable to do so. * * @param repeatTimes repeat times, first repeatTimes is 0 * @return computed result * @throws Exception if unable to compute a result */ V compute(int repeatTimes) throws Exception; /** * Execute with no result, and throws an exception if unable to do so. * * @param repeatTimes repeat times, first repeatTimes is 0 * @param receiveValue last step computed result * @throws Exception if unable to compute a result */ default void execute(int repeatTimes, V receiveValue) throws Exception{} /** * Execute with no result, and throws an exception if unable to do so. * * @param repeatTimes repeat times, first repeatTimes is 0 * @throws Exception if unable to compute a result */ default void execute(int repeatTimes) throws Exception{} }

对应的派生抽象类,主要是为了引导用户实现接口。

/** * 可计算任务 * @param 计算结果类型 */ public abstract class Computable implements Repeatable{ @Override public void execute(int repeatTimes) throws Exception { throw new IllegalAccessException("不支持的方法"); } @Override public void execute(int repeatTimes, V receiveValue) throws Exception { throw new IllegalAccessException("不支持的方法"); } } /** * 可执行任务 */ public abstract class Executable implements Repeatable{ @Override public V compute(int repeatTimes) throws Exception { throw new IllegalAccessException("不支持的方法"); } } 重试的意义

好的重试机制可以起到削峰填谷的作用,而不好的重试机制可能火上浇油。

这不是危言耸听,仔细思考一下,程序什么情况下会失败,大致可以总结为三种情况:

参数错误导致的逻辑异常 负载过大导致的超时或熔断 不稳定的网络与人工意外事故

其中对于情况1进行重试是完全没有意义的,参数错误的问题应该通过改变参数来解决,逻辑异常应该修复逻辑bug,无脑重试只能让错误重复发生,只会浪费cpu。对于情况2的重试得小心,因为遇到流量波峰而失败,短时间内重试很可能再次遭遇失败,并且这次重试还会带来更大的流量压力,像滚雪球一样把自己搞垮,也就是火上浇油。

对于情况3的重试就非常有价值,尤其是对于具有SLA协议的第三方服务。第三方服务可能因为种种意外(比如停服更新),导致服务短暂不可用,但是却不违反SLA协议。将这种失败情况加入重试队列,确保只要第三方服务在较长的一段时间内有响应,任务就可以成功,如果第三方服务一直没有响应而导致任务最终失败,那么他往往也就破坏了SLA协议,可以申请赔偿了。

所以,设计重试策略时首先需要判断什么情况下需要重试,可以设定当出现特定的比如参数错误的异常时,就没必要重试了,直接失败即可。可以设定只要当返回参数不为空时才算成功。可以设置固定的重试间隔,让两个重试之间拉开比较长的时间。

更聪明的做法是,使用断路器模式,借助当前连接对目标服务器的请求结果,如果不符预期(异常比率大),就暂时阻塞重试队列中等待的任务,隔一段时间再试探一下。

重试队列与普通限流降级或熔断的区别:

image-20210415234437188

重试的策略

重试策略决定任务何时发起重试,重试策略接口:

/** * 重试策略,决定任务何时可以重试 */ public interface IRetryStrategy { /** * 现在是否应该执行重试 * @param attemptTimes 第几次重试 * @param lastTimestamp 上一次重试的时间戳 * @param itemId 当前的执行项目id * @return 允许重试,返回true,否则,返回false */ boolean shouldTryAtNow(int attemptTimes, long lastTimestamp, int itemId); /** * 通知一次失败 * @param itemId 当前的执行项目id */ void noticeFail(int itemId); /** * 通知一次成功 * @param itemId 当前的执行项目id */ void noticeSuccess(int itemId); }

基本实现类:

/** * 指定间隔时间的重试策略 */ public class DefinedRetryStrategy implements IRetryStrategy { private final int[] intervals; public DefinedRetryStrategy(int... intervals) { if (intervals.length == 0) { this.intervals = new int[]{0}; } else { this.intervals = intervals; } } private DefinedRetryStrategy() { this.intervals = new int[]{0}; } /** * 现在是否应该执行重试 * * @param attemptTimes 第几次重试 * @param lastTimestamp 上一次重试的时间戳 * @param itemId 当前的执行项目id * @return 允许重试,返回true,否则,返回false */ @Override public boolean shouldTryAtNow(int attemptTimes, long lastTimestamp, int itemId) { return System.currentTimeMillis() > lastTimestamp + getWaitSecond(attemptTimes) * 1000L; } @Override public void noticeFail(int itemId) { } @Override public void noticeSuccess(int itemId) { } /** * 根据当前重试次数,获取下一次重试等待间隔(单位:秒) */ private int getWaitSecond(int attemptTimes) { if (attemptTimes < 0) { attemptTimes = 0; } if (attemptTimes >= intervals.length) { attemptTimes = intervals.length - 1; } return intervals[attemptTimes]; } }

使用断路器实现重试策略,断路器内部实现省略:

/** * 断路器模式实现的智能的重试策略 */ public class SmartRetryStrategy extends DefinedRetryStrategy { //断路器集合 private final Map circuitBreakers = new ConcurrentHashMap(); private final Object LOCK = new Object(); private static CircuitBreaker newCircuitBreaker() { return new ExceptionCircuitBreaker(); } public SmartRetryStrategy(int[] intervals) { super(intervals); } private CircuitBreaker getCircuitBreaker(Integer itemId) { if (!circuitBreakers.containsKey(itemId)) { synchronized (LOCK) { if (!circuitBreakers.containsKey(itemId)) { circuitBreakers.put(itemId, newCircuitBreaker()); } } } return circuitBreakers.get(itemId); } /** * 现在是否应该执行重试 * * @param attemptTimes 第几次重试 * @param lastTimestamp 上一次重试的时间戳 * @param itemId 当前的执行项目id * @return 允许重试,返回true,否则,返回false */ @Override public boolean shouldTryAtNow(int attemptTimes, long lastTimestamp, int itemId) { //如果基本条件不满足,则不能重试 if (!super.shouldTryAtNow(attemptTimes, lastTimestamp, itemId)) { return false; } //断路器是否允许请求通过 return canPass(itemId); } /** * 通知一次失败 * * @param itemId 当前的执行项目id */ @Override public void noticeFail(int itemId) { getCircuitBreaker(itemId).onFail(); } /** * 通知一次成功 * * @param itemId 当前的执行项目id */ @Override public void noticeSuccess(int itemId) { getCircuitBreaker(itemId).onSuccess(); } /** * 是否允许通过 */ public boolean canPass(int itemId){ return getCircuitBreaker(itemId).canPass(); } } 可重试任务

根据上面的结构图,定义可重试任务接口:

/** * 重试任务 */ public interface IRetryTask { /** * 执行一次重试 * @return 如果执行成功,返回true,否则返回false */ boolean tryOnce(); /** * 是否应该关闭任务 * @return 如果达到最大重试次数,返回true,表示可以关闭 */ boolean shouldClose(); /** * 现在是否应该执行重试 * @return 当等待时间超过重试间隔时间后,允许重试,返回true,否则,返回false */ boolean shouldTryAtNow(); /** * 获取执行结果 */ V getResult(); }

然后设计抽象类:

/** * 重试任务. * 非线程安全 */ public abstract class AbstractRetryTask implements IRetryTask { //重试等待间隔 protected final IRetryStrategy retryStrategy; //当前重试次数 protected int curAttemptTimes = -1; //最大重试次数 private final int maxAttemptTimes; //上一次重试的时间戳 protected long lastTimestamp = 0; public AbstractRetryTask(IRetryStrategy retryStrategy, int maxAttemptTimes) { this.retryStrategy = retryStrategy; this.maxAttemptTimes = maxAttemptTimes; } /** * 执行一次重试 * * @return 如果执行成功,返回true,否则返回false */ @Override public boolean tryOnce() { if (isFinished()) { return true; } setNextCycle(); //执行重试 doTry(); //重试任务执行异常或者返回null,将视为执行失败 return isFinished(); } /** * 是否结束 */ protected abstract boolean isFinished(); /** * 执行回调 */ protected abstract void doTry(); /** * 是否应该关闭任务 * * @return 如果达到最大重试次数,返回true,表示可以关闭 */ @Override public boolean shouldClose() { return curAttemptTimes >= maxAttemptTimes; } //设置下一执行周期 private void setNextCycle() { curAttemptTimes++; lastTimestamp = System.currentTimeMillis(); } }

以及实现类:

/** * 多段重试任务. 任务链路执行失败时,下一次重试从失败的点继续执行。 */ @Slf4j public class SegmentRetryTask extends AbstractRetryTask { //分段执行方法 private final List segments; //当前执行片段,上一次执行中断的片段 private int currentSegment = 0; //上一次的执行结果值 private V result; public SegmentRetryTask(IRetryStrategy retryStrategy, int maxAttemptTimes, List segments) { super(retryStrategy == null ? new DefinedRetryStrategy(0) : retryStrategy, maxAttemptTimes); this.segments = segments; } /** * 执行回调 */ @Override protected void doTry() { try { for (; currentSegment < segments.size(); currentSegment++) { //如果当前断路器打开,不尝试执行 if (retryStrategy instanceof SmartRetryStrategy){ if (!((SmartRetryStrategy)retryStrategy).canPass(currentSegment)) { segments.get(currentSegment).recordFail(curAttemptTimes, new CircuitBreakingException()); return; } } //如果抛异常,分段计数器不增加,下次从这个地方执行 Repeatable repeatable = segments.get(currentSegment); if (!execute(repeatable)) return; } } catch (Exception e) { retryStrategy.noticeFail(currentSegment); if (currentSegment < segments.size()) { if (shouldClose()) { segments.get(currentSegment).recordEnd(e); } else { segments.get(currentSegment).recordFail(curAttemptTimes, e); } } } } private boolean execute(Repeatable repeatable) throws Exception { if (repeatable instanceof Computable) { result = repeatable.compute(curAttemptTimes); if (result == null) { repeatable.recordFail(curAttemptTimes); retryStrategy.noticeFail(currentSegment); return false; } retryStrategy.noticeSuccess(currentSegment); } if (repeatable instanceof Executable) { if (result == null) { repeatable.execute(curAttemptTimes); } else { repeatable.execute(curAttemptTimes, result); } retryStrategy.noticeSuccess(currentSegment); } return true; } @Override protected boolean isFinished() { return currentSegment >= segments.size(); } /** * 现在是否应该执行重试 * * @return 当等待时间超过重试间隔时间后,允许重试,返回true,否则,返回false */ @Override public boolean shouldTryAtNow() { return retryStrategy.shouldTryAtNow(curAttemptTimes, lastTimestamp, currentSegment); } /** * 获取执行结果 */ @Override public V getResult() { return result; } }

一个单元测试,当然单元测试有很多,不能全贴出来,这里只展示有代表性的:

class SegmentRetryTaskTest { private final List messages = new ArrayList(); @Test void doTry() { List list = new ArrayList(); list.add(new Computable(){ @Override public String compute(int repeatTimes) throws Exception { if (repeatTimes < 2) throw new Exception(); if (repeatTimes < 4) return null; messages.add("result:good"); return "good"; } @Override public void recordFail(int attemptTimes, Exception e) { messages.add("fail:" + attemptTimes); } @Override public void recordFail(int attemptTimes) { messages.add("fail:" + attemptTimes); } @Override public void recordEnd(Exception e) { messages.add("end"); } }); list.add(new Executable() { @Override public void execute(int repeatTimes, String receiveValue) throws Exception { messages.add("receive:" + receiveValue); throw new Exception("exc"); } @Override public void recordEnd(Exception e) { messages.add("end:" + e.getMessage()); } }); IRetryTask retryTask = new SegmentRetryTask(new DefinedRetryWaitStrategy(0), 5, list); //重试未开始 assertFalse(retryTask.shouldClose()); //重试直到成功 assertFalse(retryTask.tryOnce()); assertFalse(retryTask.shouldClose()); assertFalse(retryTask.tryOnce()); assertFalse(retryTask.tryOnce()); assertFalse(retryTask.tryOnce()); assertFalse(retryTask.tryOnce()); assertFalse(retryTask.tryOnce()); assertTrue(retryTask.shouldClose()); assertTrue(messages.contains("result:good")); assertTrue(messages.contains("fail:1")); assertTrue(messages.contains("fail:2")); assertTrue(messages.contains("fail:3")); assertFalse(messages.contains("end")); assertTrue(messages.contains("receive:good")); assertTrue(messages.contains("end:exc")); } } 重试队列的运作

image-20210416101646494

线程安全的重试队列。 * (Spring-retry 和 guava-retrying都不完全适合这个场景,决定自己开发一个简单的重试机制) * 重试队列会尽最大努力让任务多次执行并成功,使用时需要考虑以下几点。 * 1.重试队列存储在内存之中,暂未同步到磁盘,要求使用者可以承受丢失的风险。 * 2.重试不保证一定会成功,它将在重试一定的次数后结束,如果最终失败,将记录失败结果。 * 3.为了不让频繁的重试让系统的负载过大,建议设置恰当的重试间隔,以起到削峰填谷的作用。 * 4.当超过重试队列允许容纳的数量时,将抛出异常。 * 5.重试任务将在独立的线程中执行,不会阻塞当前线程 * 6.重试任务执行异常或者返回null,将视为执行失败。暂不支持拦截自定义异常。 * 7.由于网络问题,远程过程执行成功未必代表会返回成功,重试任务需要实现幂等性。 * 8."队列"仅指按先进先出的顺序扫描任务,任务移除队列操作取决于其何时完成或结束 * 实现重试队列 /** * 线程安全的重试队列。 * @author sunday * @version 0.0.1 */ public final class RetryQueue { //重试任务队列(全局唯一) private final static Deque retryTaskList = new ConcurrentLinkedDeque(); //重试任务工厂 private final IRetryTaskFactory retryTaskFactory; public RetryQueue(IRetryTaskFactory retryTaskFactory) { this.retryTaskFactory = retryTaskFactory; } static { Thread daemon = new Thread(RetryQueue::scan); daemon.setDaemon(true); daemon.setName(RetryConstants.RETRY_THREAD_NAME); daemon.start(); } //扫描重试队列,执行重试并移除任务(如果成功),周期性执行 private static void scan() { while (true) { //先执行,再删除 retryTaskList.removeIf(task -> retry(task) || task.shouldClose()); // wait some times try { TimeUnit.MILLISECONDS.sleep(RetryConstants.SCAN_INTERVAL); } catch (Throwable ignored) { } } } //执行重试 private static boolean retry(/*not null*/IRetryTask task) { if (task.shouldTryAtNow()) { return task.tryOnce(); } return false; } /** * 提交任务。在当前线程立刻执行,如果失败,则使用设置的重试任务工厂创建包装对象,把这个对象写入重试队列等待异步重试。 * * @param segments 分段执行任务 * @param 结果返回类型 * @return 如果当前线程一次就执行成功,同步返回结果值,否则加入重试队列,异步通知结果值。 * @throws RetryRefuseException 当超过重试队列允许容纳的数量时,将抛出异常 */ public final V submit(List segments) throws RetryRefuseException { if (segments == null || segments.size() == 0) { return null; } IRetryTask task = retryTaskFactory.createRetryTask(segments); //在当前线程执行 if(!task.tryOnce()){ //失败后加入队列 ensureCapacity(); retryTaskList.push(task); } //只要当前已经有执行结果,就返回,即便是加入了重试队列 return task.getResult(); } /** * 提交任务。在当前线程立刻执行,如果失败,则使用设置的重试任务工厂创建包装对象,把这个对象写入重试队列等待异步重试。 * * @param repeatable 执行任务 * @param 结果返回类型 * @return 如果当前线程一次就执行成功,同步返回结果值,否则加入重试队列,异步通知结果值。 * @throws RetryRefuseException 当超过重试队列允许容纳的数量时,将抛出异常 */ public final V submit(Repeatable repeatable) throws RetryRefuseException { return submit(List.of(repeatable)); } //确保容量 private void ensureCapacity() throws RetryRefuseException { //非线程安全,高并发下可能短暂冲破最大容量,不过问题不大 if (retryTaskList.size() >= RetryConstants.MAX_QUEUE_SIZE) { throw RetryRefuseException.getInstance(); } } /** * 队列是否为空 * * @return 如果当前无正在执行的任务,返回true */ public boolean isEmpty() { return retryTaskList.isEmpty(); } }

单元测试:

class RetryQueueTest { private final static int NUM = 100000; private List messages1 = Collections.synchronizedList(new ArrayList()); private List messages2 = Collections.synchronizedList(new ArrayList()); IRetryTaskFactory taskFactory = new IRetryTaskFactory() { @Override public IRetryTask createRetryTask(List segments) { return new SegmentRetryTask(new DefinedRetryWaitStrategy(0), 10, segments); } }; RetryQueue retryQueue = new RetryQueue(taskFactory); @Test void submit() { List list = new ArrayList(); list.add(new Executable() { @Override public void execute(int repeatTimes) throws Exception { if (repeatTimes < 4) throw new Exception(); messages1.add("good"); } }); //模拟高并发提交 ExecutorService executorService = Executors.newFixedThreadPool(100); Semaphore semaphore = new Semaphore(0); for (int i = 0; i < NUM; i++) { executorService.submit(() -> { try { retryQueue.submit(list); } catch (RetryRefuseException e) { fail(); } semaphore.release(); }); } executorService.shutdown(); //等待执行完成 try { semaphore.acquire(NUM); } catch (InterruptedException e) { e.printStackTrace(); } //等待执行完成 while (!retryQueue.isEmpty()) Thread.yield(); assertEquals(NUM, messages1.size()); for (String s : messages1) { assertEquals(s, "good"); } } } 久等的点赞实现代码

好了,轮子已经造完了,可以开始写点赞服务的代码了:

/** * 投票服务 */ @Service @Slf4j public class VoteService { private final VoteBox voteBox; private final MessagePoster mq; private final RetryQueue retryQueue = new RetryQueue(new SegmentRetryTaskFactory()); public VoteService(VoteBox voteBox, MessagePoster mq) { this.voteBox = voteBox; this.mq = mq; } /** * 给评价投票(点赞) * * @param voterId 投票人 * @param contentId 投票目标内容id * @param voting 是否进行点赞(true:点赞 false:取消点赞) * @return 当前内容点赞后的总数,如果点赞失败,抛出异常 * @throws VoteException 投票异常 */ public int vote(long voterId, long contentId, boolean voting) throws VoteException { /* * 第零种情况:用户请求没有发送到服务器,用户可以适时重试。 * 第一种情况:执行1失败,最终点赞失败,记录日志,加入重试队列池,用户也可以适时重试。 * 第二种情况:执行1成功,但返回时网络异常,最终点赞失败,记录日志,加入重试队列池,用户也可能适时重试,该方法是幂等的。 * 第三种情况:执行1成功,但并未增加点赞总数,因为这次是重复提交。仍然执行之后的逻辑,该方法是幂等的。 * 第四种情况:执行1成功,但执行2失败,记录日志,把发送mq加入重试队列池,返回成功。 * 第五种情况:执行方法成功,但返回过程网络异常,用户未收到响应,用户稍后可以查询出点赞结果,用户也可以适时重试 */ List list = new ArrayList(); //1.先在redis中投票 list.add(new Computable() { @Override public Integer compute(int repeatTimes) { return voting ? voteBox.vote(voterId, contentId) : voteBox.noVote(voterId, contentId); } @Override public void recordFail(int attemptTimes, Exception e) { //只记录第一次错误 if (attemptTimes == 0) log.warn("function VoteService.vote.redis make exception:{} by:{},{},{}", e.getMessage(), voterId, contentId, voting); } @Override public void recordEnd(Exception e) { //放弃重试.当然,日志会记录下来,或者通过其他机制将失败记录到中央存储库中,最终还是可以恢复。 log.warn("function VoteService.vote.redis quit:{} by:{},{},{}", e.getMessage(), voterId, contentId, voting); } }); //2.再通知mq list.add(new Executable() { @Override public void execute(int repeatTimes, Integer receiveValue) { JSONObject object = new JSONObject(); object.put("voterId", voterId); object.put("contentId", contentId); object.put("voting", voting ? 1 : 0); object.put("votes", receiveValue); mq.sendMessage(SystemConstants.VOTE_TOPIC, object.toString()); } @Override public void recordFail(int attemptTimes, Exception e) { if (attemptTimes == 0) log.warn("function VoteService.vote.mq make exception:{} by:{},{},{}", e.getMessage(), voterId, contentId, voting); } @Override public void recordEnd(Exception e) { log.trace("function VoteService.vote.mq quit:{} by:{},{},{}", e.getMessage(), voterId, contentId, voting); } }); Integer value = null; try { //系统可能因为mq或者redis自身的过载等问题导致点赞失败,我们想珍惜用户的一次点赞,所以选择为他重试。 value = retryQueue.submit(list); } catch (RetryRefuseException e) { log.error("function VoteService.vote.refuse make exception:{} by:{},{},{}", e.getMessage(), voterId, contentId, voting); } if (value == null){ //当前无法获得投票总数,意味着点赞操作失败,虽然我们会稍后重试,但仍将这个信息告知用户,他们可以进行更理智的选择。 throw new VoteException("投票失败,请稍后再试"); } return value; } private static class SegmentRetryTaskFactory implements IRetryTaskFactory { private final static IRetryStrategy waitStrategy = new SmartRetryStrategy(new int[]{10,100,100,1000,10000}); @Override public IRetryTask createRetryTask(List segments) { return new SegmentRetryTask(waitStrategy, 5, segments); } } }

补充说明:

封装工厂对象的目的是为了简化构造方法参数,并且复用不变对象,如重试策略。 只要重试队列执行有返回结果,哪怕只是部分成功,仍可以算作接口响应成功,剩余部分加入重试队列。 如果重试队列执行全部失败,没有返回结果,则抛出异常,毕竟此刻确实失败了,用户有权知道。 只有熔断器闭合时,才会执行任务,否则将会一直等待,可以设置恰当的中止策略来完善这个机制。 重试队列这个轮子在其他很多场景也都有用武之地,依照我的理解,它大致算是“仓库层”。

但就点赞实现来说,没有必要使用重试,实际上,mq是多节点高可用的,一般不会出现问题,并且,mq自带了重试功能。mq的重试机制是,在一次请求中,如果失败了,立刻向另外的broker发起请求,是一种负载均衡融合高可用的设计。在不要求刚性事务的情景下,可以认为mq是可靠的。

给评价添加点赞

评价列表的数据是相对静态的,不含用户个性化信息,可以很容易地缓存供所有人访问,但是一旦加上用户对每个评价的点赞关系,或是实时变化的点赞数量信息,就变得难以缓存了。我们选择动静分离,静态的数据按照原先的缓存策略不变,动态的数据专门从redis服务中获取,然后再追加到静态数据上。

服务层、控制层,就是数据的聚合层、任务的委派层。

而至于数据聚合,有三种模式:

image-20210416110925640

我们选择第三种方式,这次设计点赞功能,只是作为评价系统的一部分。

在RemarkService中添加如下代码:

/** * 给评价列表添加点赞信息,在现有列表数据上修改 * @param remarks 评价列表 * @param consumerId 用户id * @return 修改后的评价列表 */ public JSONArray appendVoteInfo(JSONArray remarks, Integer consumerId){ if (remarks == null || remarks.size() == 0) { return remarks; } //获取评价id列表 List idList = new ArrayList(); for (int i = 0; i < remarks.size(); i++) { idList.add(remarks.getJSONObject(i).getString("id")); } //获取并添加点赞总数 List voteKeys = new ArrayList(); for (Object s : idList) { voteKeys.add(MessageFormat.format(RedisKeyConstants.VOTE_SUM_PATTERN, s)); } List voteValues = redisRepository.readAll(voteKeys); for (int i = 0; i < remarks.size(); i++) { remarks.getJSONObject(i).put("votes", voteValues.get(i) == null ? 0 : voteValues.get(i)); } //未传用户id,查询时不附带个人点赞数据 if (consumerId == null) { return remarks; } //获取并添加个人点赞状态 List votesKeys = new ArrayList(); for (Object s : idList) { votesKeys.add(MessageFormat.format(RedisKeyConstants.VOTE_USER_PATTERN, consumerId, s)); } List votingValues = redisRepository.readAll(votesKeys); for (int i = 0; i < remarks.size(); i++) { remarks.getJSONObject(i).put("voting", votingValues.get(i) == null ? 0 : 1); } return remarks; } //更新商品的评价缓存 private void updateRemarkCache(String itemId){ //吞掉异常,让更新评价方法不影响原操作的执行结果 try { redisRepository.refreshKeys(RedisKeyConstants.REMARK_PREFIX + itemId); } catch (Exception e) { log.warn("function RemarkService.updateRemarkCache make exception:{} by:{}", e.getMessage(), itemId); } }

修改查询评价列表接口,聚合内容:

/** * 查询商品关联的评价,一次查询固定的条目 * @param itemId 商品id * @param curIndex 当前查询坐标 */ @GetMapping("/remark") public APIBody listRemarks(String itemId, int curIndex, Integer consumerId){ Assert.isTrue(!StringUtils.isEmpty(itemId), "商品id不能为空"); Assert.isTrue(curIndex > 0, "查询坐标异常"); JSONArray list = remarkService.listRemarks(itemId, curIndex, SystemConstants.REMARK_MAX_LIST_LENGTH); //原列表是从redis或db中读取的静态数据,而点赞数据每时每刻都在变化,分开获取这两个部分。 return APIBody.buildSuccess(remarkService.appendVoteInfo(list, consumerId)); }

优化点:评价的点赞总数信息是固定的,是用户无关的,可以与评价内容结合在一起缓存在内存中,而用户的点赞信息只能每次请求都去redis查询。

推荐优质评价

完整的评价系统应该能够输出一个优质评价内容的推荐列表,作为用户查看商品评价时的默认展示。

何为”优质内容“呢?我的理解是具有话题性、高热度、内容丰富的评价内容,其中”点赞总数“是衡量高热度的重要指标之一。当前,我们就以点赞数量为唯一指标,算出优质内容并提供查询接口。未来引入其他指标时,也可能会继续沿用这种设计思路。

评价表中有votes字段,可以据此排序生成前n条数据:

select id,consumer_id,order_id,score,header,content,images,user_name,user_face,gmt_create from remark where item_id = ? and status = '1' order by votes desc limit ?

需要注意的是,votes字段并不随着用户点赞而更新它,因为频繁的更新是低效的。可以通过定期汇总的方式来更新votes字段,点赞表保存着评价的最新点赞总数,所以可以每隔1天或1小时,筛选这期间内对应内容的最近一条点赞,就可以更新votes了。

不管基础数据是在何种数据库何种表中,不管是通过什么方式,我都将这一步骤称为”回源“,回源是缓存未命中时的一种行为概念。

在加载推荐评价时,回源算法为

public List listRecommendRemarks(/*not null*/ String itemId, int expectCount){ if (expectCount reloadRecommendRemarks(itemId)); } return appendVoteInfo(remarkRedis.readRecommendRange(itemId, start, stop)); } catch (Exception e) { log.error("function RemarkService.listRecommendRemarks make exception:{} by:{},{},{}", e.getMessage(), itemId, start, stop); return SystemConstants.EMPTY_ARRAY; } }

其中,仍使用代理键的模式,使Redis存储主要业务数据的列表永不过期,避免缓存击穿以及频繁的分布式阻塞加锁。

一些重要的redis操作代码:

//保存推荐内容并重置过期时间 public void saveRecommendData(String itemId, /*not null*/ List list) { String[] argv = new String[list.size()]; for (int i = 0; i < list.size(); i++) { argv[i] = JSONObject.fromObject(list.get(i)).toString(); } redisTemplate.execute(resetListScript, List.of(RedisKeyConstants.REMARK_RECOMMEND_PREFIX + itemId, RedisKeyConstants.REMARK_RECOMMEND_PROXY_PREFIX + itemId), argv); } //读取推荐内容 public JSONArray readRecommendRange(String itemId, int start, int stop) { String key = RedisKeyConstants.REMARK_RECOMMEND_PREFIX + itemId; return range(start, stop, key); } //是否应该更新推荐 public boolean shouldUpdateRecommend(String itemId) { Boolean flag = redisTemplate.opsForValue().setIfAbsent(RedisKeyConstants.REMARK_RECOMMEND_PROXY_PREFIX + itemId); return flag == null || !flag; } 冷启动与空数据

冷启动是指服务的第一次上线或者redis在零缓存下重新启动时,这时,任何的缓存都未加载,或者之前加载过,现在因为意外已经不存在了。这时,代理锁会过期,SETNX命令成功,加锁成功的线程会同步数据库数据到redis,这样业务数据KEY就不再为空了。如果同步过程出现失败,锁会在2秒后自动过期,新的线程会继续接任这项未完成的使命。如果业务数据加载完成,那么就随即延迟代理锁的寿命为1小时,这样1小时之后才会触发同步。整个流程是异步的,用户请求的线程只会读取业务数据KEY,有则返回,无则为空。也就是说,接口只在冷启动的几秒内是返回为空的,这是可以接受的,因为冷启动只在新业务上线或者redis内存无法恢复这些极为特殊的时间点才会出现。

空数据是指数据库的内容是原本就是空的。根据上面的设计思路,可以得出结论,如果数据库内容为空,那么业务数据KEY是空的,也就是nil,不存储占位符,因为代理KEY已经起到占位符的作用了。这一点来看,一个简简单单的代理KEY,可以起到防止缓存击穿、防止同步阻塞、占位符等作用。

后续

可能会更新一些抽奖、秒杀活动的实现方法。



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3