某马头条

您所在的位置:网站首页 今日头条马某 某马头条

某马头条

2024-06-21 13:13| 来源: 网络整理| 查看: 265

实时计算和定时计算 

流式计算

 

kafkaStream

入门案例

导入依赖

org.apache.kafka kafka-streams connect-json org.apache.kafka org.apache.kafka kafka-clients

创建原生的kafka staream入门案例  

/** * 流式处理 */ public class KafkaStreamQuickStart { public static void main(String[] args) { //kafka的配置信心 Properties prop = new Properties(); prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092"); prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart"); //stream 构建器 StreamsBuilder streamsBuilder = new StreamsBuilder(); //流式计算 streamProcessor(streamsBuilder); //创建kafkaStream对象 KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(),prop); //开启流式计算 kafkaStreams.start(); } /** * 流式计算 * 消息的内容:hello kafka hello itcast * @param streamsBuilder */ private static void streamProcessor(StreamsBuilder streamsBuilder) { //创建kstream对象,同时指定从那个topic中接收消息 KStream stream = streamsBuilder.stream("itcast-topic-input"); /** * 处理消息的value */ stream.flatMapValues(new ValueMapper() { @Override public Iterable apply(String value) { return Arrays.asList(value.split(" ")); } }) //按照value进行聚合处理 .groupBy((key,value)->value) //时间窗口 .windowedBy(TimeWindows.of(Duration.ofSeconds(10))) //统计单词的个数 .count() //转换为kStream .toStream() .map((key,value)->{ System.out.println("key:"+key+",vlaue:"+value); return new KeyValue(key.key().toString(),value.toString()); }) //发送消息 .to("itcast-topic-out"); } }  SpringBoot集成kafka Stream

import lombok.Getter; import lombok.Setter; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafkaStreams; import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration; import org.springframework.kafka.config.KafkaStreamsConfiguration; import java.util.HashMap; import java.util.Map; /** * 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数 */ @Setter @Getter @Configuration @EnableKafkaStreams @ConfigurationProperties(prefix="kafka") public class KafkaStreamConfig { private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024; private String hosts; private String group; @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) public KafkaStreamsConfiguration defaultKafkaStreamsConfig() { Map props = new HashMap(); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts); props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid"); props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid"); props.put(StreamsConfig.RETRIES_CONFIG, 10); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); return new KafkaStreamsConfiguration(props); } } kafka: hosts: 192.168.200.130:9092 group: ${spring.application.name}

@Configuration @Slf4j public class KafkaStreamHelloListener { @Bean public KStream kStream(StreamsBuilder streamsBuilder){ //创建kstream对象,同时指定从那个topic中接收消息 KStream stream = streamsBuilder.stream("itcast-topic-input"); stream.flatMapValues(new ValueMapper() { @Override public Iterable apply(String value) { return Arrays.asList(value.split(" ")); } }) //根据value进行聚合分组 .groupBy((key,value)->value) //聚合计算时间间隔 .windowedBy(TimeWindows.of(Duration.ofSeconds(10))) //求单词的个数 .count() .toStream() //处理后的结果转换为string字符串 .map((key,value)->{ System.out.println("key:"+key+",value:"+value); return new KeyValue(key.key().toString(),value.toString()); }) //发送消息 .to("itcast-topic-out"); return stream; } } 热点文章—实时计算 实现思路

实现步骤

用户行为收集

①在heima-leadnews-behavior微服务中集成kafka生产者配置

修改nacos,新增内容

spring: application: name: leadnews-behavior kafka: bootstrap-servers: 192.168.200.130:9092 producer: retries: 10 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer

②修改ApLikesBehaviorServiceImpl新增发送消息

定义消息发送封装类:UpdateArticleMess

package com.heima.model.mess; import lombok.Data; @Data public class UpdateArticleMess { /** * 修改文章的字段类型 */ private UpdateArticleType type; /** * 文章ID */ private Long articleId; /** * 修改数据的增量,可为正负 */ private Integer add; public enum UpdateArticleType{ COLLECTION,COMMENT,LIKES,VIEWS; } }

 topic常量类:

package com.heima.common.constants; public class HotArticleConstants { public static final String HOT_ARTICLE_SCORE_TOPIC="hot.article.score.topic"; }

完整代码如下:  

package com.heima.behavior.service.impl; import com.alibaba.fastjson.JSON; import com.heima.behavior.service.ApLikesBehaviorService; import com.heima.common.constants.BehaviorConstants; import com.heima.common.constants.HotArticleConstants; import com.heima.common.redis.CacheService; import com.heima.model.behavior.dtos.LikesBehaviorDto; import com.heima.model.common.dtos.ResponseResult; import com.heima.model.common.enums.AppHttpCodeEnum; import com.heima.model.mess.UpdateArticleMess; import com.heima.model.user.pojos.ApUser; import com.heima.utils.thread.AppThreadLocalUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @Service @Transactional @Slf4j public class ApLikesBehaviorServiceImpl implements ApLikesBehaviorService { @Autowired private CacheService cacheService; @Autowired private KafkaTemplate kafkaTemplate; @Override public ResponseResult like(LikesBehaviorDto dto) { //1.检查参数 if (dto == null || dto.getArticleId() == null || checkParam(dto)) { return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID); } //2.是否登录 ApUser user = AppThreadLocalUtil.getUser(); if (user == null) { return ResponseResult.errorResult(AppHttpCodeEnum.NEED_LOGIN); } UpdateArticleMess mess = new UpdateArticleMess(); mess.setArticleId(dto.getArticleId()); mess.setType(UpdateArticleMess.UpdateArticleType.LIKES); //3.点赞 保存数据 if (dto.getOperation() == 0) { Object obj = cacheService.hGet(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString()); if (obj != null) { return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID, "已点赞"); } // 保存当前key log.info("保存当前key:{} ,{}, {}", dto.getArticleId(), user.getId(), dto); cacheService.hPut(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString(), JSON.toJSONString(dto)); mess.setAdd(1); } else { // 删除当前key log.info("删除当前key:{}, {}", dto.getArticleId(), user.getId()); cacheService.hDelete(BehaviorConstants.LIKE_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString()); mess.setAdd(-1); } //发送消息,数据聚合 kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess)); return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS); } /** * 检查参数 * * @return */ private boolean checkParam(LikesBehaviorDto dto) { if (dto.getType() > 2 || dto.getType() < 0 || dto.getOperation() > 1 || dto.getOperation() < 0) { return true; } return false; } }

③修改阅读行为的类ApReadBehaviorServiceImpl发送消息

package com.heima.behavior.service.impl; import com.alibaba.fastjson.JSON; import com.heima.behavior.service.ApReadBehaviorService; import com.heima.common.constants.BehaviorConstants; import com.heima.common.constants.HotArticleConstants; import com.heima.common.redis.CacheService; import com.heima.model.behavior.dtos.ReadBehaviorDto; import com.heima.model.common.dtos.ResponseResult; import com.heima.model.common.enums.AppHttpCodeEnum; import com.heima.model.mess.UpdateArticleMess; import com.heima.model.user.pojos.ApUser; import com.heima.utils.thread.AppThreadLocalUtil; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @Service @Transactional @Slf4j public class ApReadBehaviorServiceImpl implements ApReadBehaviorService { @Autowired private CacheService cacheService; @Autowired private KafkaTemplate kafkaTemplate; @Override public ResponseResult readBehavior(ReadBehaviorDto dto) { //1.检查参数 if (dto == null || dto.getArticleId() == null) { return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID); } //2.是否登录 ApUser user = AppThreadLocalUtil.getUser(); if (user == null) { return ResponseResult.errorResult(AppHttpCodeEnum.NEED_LOGIN); } //更新阅读次数 String readBehaviorJson = (String) cacheService.hGet(BehaviorConstants.READ_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString()); if (StringUtils.isNotBlank(readBehaviorJson)) { ReadBehaviorDto readBehaviorDto = JSON.parseObject(readBehaviorJson, ReadBehaviorDto.class); dto.setCount((short) (readBehaviorDto.getCount() + dto.getCount())); } // 保存当前key log.info("保存当前key:{} {} {}", dto.getArticleId(), user.getId(), dto); cacheService.hPut(BehaviorConstants.READ_BEHAVIOR + dto.getArticleId().toString(), user.getId().toString(), JSON.toJSONString(dto)); //发送消息,数据聚合 UpdateArticleMess mess = new UpdateArticleMess(); mess.setArticleId(dto.getArticleId()); mess.setType(UpdateArticleMess.UpdateArticleType.VIEWS); mess.setAdd(1); kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess)); return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS); } }  流式聚合处理

①在leadnews-article微服务中集成kafkaStream (参考kafka-demo)

②定义实体类,用于聚合之后的分值封装

package com.heima.model.article.mess; import lombok.Data; @Data public class ArticleVisitStreamMess { /** * 文章id */ private Long articleId; /** * 阅读 */ private int view; /** * 收藏 */ private int collect; /** * 评论 */ private int comment; /** * 点赞 */ private int like; }

修改常量类:增加常量

package com.heima.common.constans; public class HotArticleConstants { public static final String HOT_ARTICLE_SCORE_TOPIC="hot.article.score.topic"; public static final String HOT_ARTICLE_INCR_HANDLE_TOPIC="hot.article.incr.handle.topic"; }

③ 定义stream,接收消息并聚合

package com.heima.article.stream; import com.alibaba.fastjson.JSON; import com.heima.common.constants.HotArticleConstants; import com.heima.model.mess.ArticleVisitStreamMess; import com.heima.model.mess.UpdateArticleMess; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.time.Duration; @Configuration @Slf4j public class HotArticleStreamHandler { @Bean public KStream kStream(StreamsBuilder streamsBuilder){ //接收消息 KStream stream = streamsBuilder.stream(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC); //聚合流式处理 stream.map((key,value)->{ UpdateArticleMess mess = JSON.parseObject(value, UpdateArticleMess.class); //重置消息的key:1234343434 和 value: likes:1 return new KeyValue(mess.getArticleId().toString(),mess.getType().name()+":"+mess.getAdd()); }) //按照文章id进行聚合 .groupBy((key,value)->key) //时间窗口 .windowedBy(TimeWindows.of(Duration.ofSeconds(10))) /** * 自行的完成聚合的计算 */ .aggregate(new Initializer() { /** * 初始方法,返回值是消息的value * @return */ @Override public String apply() { return "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0"; } /** * 真正的聚合操作,返回值是消息的value */ }, new Aggregator() { @Override public String apply(String key, String value, String aggValue) { if(StringUtils.isBlank(value)){ return aggValue; } String[] aggAry = aggValue.split(","); int col = 0,com=0,lik=0,vie=0; for (String agg : aggAry) { String[] split = agg.split(":"); /** * 获得初始值,也是时间窗口内计算之后的值 */ switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){ case COLLECTION: col = Integer.parseInt(split[1]); break; case COMMENT: com = Integer.parseInt(split[1]); break; case LIKES: lik = Integer.parseInt(split[1]); break; case VIEWS: vie = Integer.parseInt(split[1]); break; } } /** * 累加操作 */ String[] valAry = value.split(":"); switch (UpdateArticleMess.UpdateArticleType.valueOf(valAry[0])){ case COLLECTION: col += Integer.parseInt(valAry[1]); break; case COMMENT: com += Integer.parseInt(valAry[1]); break; case LIKES: lik += Integer.parseInt(valAry[1]); break; case VIEWS: vie += Integer.parseInt(valAry[1]); break; } String formatStr = String.format("COLLECTION:%d,COMMENT:%d,LIKES:%d,VIEWS:%d", col, com, lik, vie); System.out.println("文章的id:"+key); System.out.println("当前时间窗口内的消息处理结果:"+formatStr); return formatStr; } }, Materialized.as("hot-atricle-stream-count-001")) .toStream() .map((key,value)->{ return new KeyValue(key.key().toString(),formatObj(key.key().toString(),value)); }) //发送消息 .to(HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC); return stream; } /** * 格式化消息的value数据 * @param articleId * @param value * @return */ public String formatObj(String articleId,String value){ ArticleVisitStreamMess mess = new ArticleVisitStreamMess(); mess.setArticleId(Long.valueOf(articleId)); //COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0 String[] valAry = value.split(","); for (String val : valAry) { String[] split = val.split(":"); switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0])){ case COLLECTION: mess.setCollect(Integer.parseInt(split[1])); break; case COMMENT: mess.setComment(Integer.parseInt(split[1])); break; case LIKES: mess.setLike(Integer.parseInt(split[1])); break; case VIEWS: mess.setView(Integer.parseInt(split[1])); break; } } log.info("聚合消息处理之后的结果为:{}",JSON.toJSONString(mess)); return JSON.toJSONString(mess); } } 重新计算文章的分值,更新到数据库和缓存中

①在ApArticleService添加方法,用于更新数据库中的文章分值

/** * 更新文章的分值 同时更新缓存中的热点文章数据 * @param mess */ public void updateScore(ArticleVisitStreamMess mess);

实现类方法

/** * 更新文章的分值 同时更新缓存中的热点文章数据 * @param mess */ @Override public void updateScore(ArticleVisitStreamMess mess) { //1.更新文章的阅读、点赞、收藏、评论的数量 ApArticle apArticle = updateArticle(mess); //2.计算文章的分值 Integer score = computeScore(apArticle); score = score * 3; //3.替换当前文章对应频道的热点数据 replaceDataToRedis(apArticle, score, ArticleConstants.HOT_ARTICLE_FIRST_PAGE + apArticle.getChannelId()); //4.替换推荐对应的热点数据 replaceDataToRedis(apArticle, score, ArticleConstants.HOT_ARTICLE_FIRST_PAGE + ArticleConstants.DEFAULT_TAG); } /** * 替换数据并且存入到redis * @param apArticle * @param score * @param s */ private void replaceDataToRedis(ApArticle apArticle, Integer score, String s) { String articleListStr = cacheService.get(s); if (StringUtils.isNotBlank(articleListStr)) { List hotArticleVoList = JSON.parseArray(articleListStr, HotArticleVo.class); boolean flag = true; //如果缓存中存在该文章,只更新分值 for (HotArticleVo hotArticleVo : hotArticleVoList) { if (hotArticleVo.getId().equals(apArticle.getId())) { hotArticleVo.setScore(score); flag = false; break; } } //如果缓存中不存在,查询缓存中分值最小的一条数据,进行分值的比较,如果当前文章的分值大于缓存中的数据,就替换 if (flag) { if (hotArticleVoList.size() >= 30) { hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList()); HotArticleVo lastHot = hotArticleVoList.get(hotArticleVoList.size() - 1); if (lastHot.getScore() < score) { hotArticleVoList.remove(lastHot); HotArticleVo hot = new HotArticleVo(); BeanUtils.copyProperties(apArticle, hot); hot.setScore(score); hotArticleVoList.add(hot); } } else { HotArticleVo hot = new HotArticleVo(); BeanUtils.copyProperties(apArticle, hot); hot.setScore(score); hotArticleVoList.add(hot); } } //缓存到redis hotArticleVoList = hotArticleVoList.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList()); cacheService.set(s, JSON.toJSONString(hotArticleVoList)); } } /** * 更新文章行为数量 * @param mess */ private ApArticle updateArticle(ArticleVisitStreamMess mess) { ApArticle apArticle = getById(mess.getArticleId()); apArticle.setCollection(apArticle.getCollection()==null?0:apArticle.getCollection()+mess.getCollect()); apArticle.setComment(apArticle.getComment()==null?0:apArticle.getComment()+mess.getComment()); apArticle.setLikes(apArticle.getLikes()==null?0:apArticle.getLikes()+mess.getLike()); apArticle.setViews(apArticle.getViews()==null?0:apArticle.getViews()+mess.getView()); updateById(apArticle); return apArticle; } /** * 计算文章的具体分值 * @param apArticle * @return */ private Integer computeScore(ApArticle apArticle) { Integer score = 0; if(apArticle.getLikes() != null){ score += apArticle.getLikes() * ArticleConstants.HOT_ARTICLE_LIKE_WEIGHT; } if(apArticle.getViews() != null){ score += apArticle.getViews(); } if(apArticle.getComment() != null){ score += apArticle.getComment() * ArticleConstants.HOT_ARTICLE_COMMENT_WEIGHT; } if(apArticle.getCollection() != null){ score += apArticle.getCollection() * ArticleConstants.HOT_ARTICLE_COLLECTION_WEIGHT; } return score; }

 ②定义监听,接收聚合之后的数据,文章的分值重新进行计算

package com.heima.article.listener; import com.alibaba.fastjson.JSON; import com.heima.article.service.ApArticleService; import com.heima.common.constants.HotArticleConstants; import com.heima.model.mess.ArticleVisitStreamMess; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component @Slf4j public class ArticleIncrHandleListener { @Autowired private ApArticleService apArticleService; @KafkaListener(topics = HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC) public void onMessage(String mess){ if(StringUtils.isNotBlank(mess)){ ArticleVisitStreamMess articleVisitStreamMess = JSON.parseObject(mess, ArticleVisitStreamMess.class); apArticleService.updateScore(articleVisitStreamMess); } } } 下面是day12  持续集成

软件开发模式

 

Jenkins

 艹,好麻烦,不做了。以后用到再去看吧。不搞了。 还特么要用百度网盘下一个10g的镜像....



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3