热度实时计算(代码片段)

赵四司机 赵四司机     2022-12-03     527

关键词:

 个人简介: 

> 📦个人主页:赵四司机
> 🏆学习方向:JAVA后端开发 
> ⏰往期文章:SpringBoot项目整合微信支付
> 🔔博主推荐网站:牛客网 刷题|面试|找工作神器
> 📣种一棵树最好的时间是十年前,其次是现在!
> 💖喜欢的话麻烦点点关注喔,你们的支持是我的最大动力。

前言:

最近在做一个基于SpringCloud+Springboot+Docker的新闻头条微服务项目,用的是黑马的教程,现在项目开发进入了尾声,我打算通过写文章的形式进行梳理一遍,并且会将梳理过程中发现的Bug进行修复,有需要改进的地方我也会继续做出改进。这一系列的文章我将会放入微服务项目专栏中,这个项目适合刚接触微服务的人作为练手项目,假如你对这个项目感兴趣你可以订阅我的专栏进行查看,需要资料可以私信我,当然要是能给我点个小小的关注就更好了,你们的支持是我最大的动力。

如果你想要一个可以系统学习的网站,那么我推荐的是牛客网,个人感觉用着还是不错的,页面很整洁,而且内容也很全面,语法练习,算法题练习,面试知识汇总等等都有,论坛也很活跃,传送门链接:牛客刷题神器

目录

一:Springboot集成Kafka Stream

1.设置配置类信息

2.修改application.yml文件

3.新增配置类,创建KStream对象,进行聚合

二:热点文章实时计算

1.实现思路

2.环境搭建

2.1:在文章微服务中集成Kafka生产者配置

2.2:记录用户行为

2.3:定义Stream实现消息接收并聚合

2.4:重新计算文章分值并更新Redis缓存数据

2.5:设置监听类

三:功能测试


一:Springboot集成Kafka Stream

1.设置配置类信息

package com.my.kafka.config;

import lombok.Getter;
import lombok.Setter;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;

import java.util.HashMap;
import java.util.Map;

/**
 * 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数
 */

@Setter
@Getter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig 
    private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
    private String hosts;
    private String group;
    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration defaultKafkaStreamsConfig() 
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
        props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");
        props.put(StreamsConfig.RETRIES_CONFIG, 10);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        return new KafkaStreamsConfiguration(props);
    

        可能你会有这样的疑问,前面介绍Kafka时候不是直接在yml文件里面设置参数就行了吗?为什么这里还要自己写配置类呢?是因为Spring对KafkaStream的集成并不是很好,所以我们才需要自己去写配置类信息。需要注意的一点是,配置类中必须添加@EnableKafkaStreams这一注解。

2.修改application.yml文件

kafka:
  hosts: 192.168.200.130:9092
  group: $spring.application.name

3.新增配置类,创建KStream对象,进行聚合

package com.my.kafka.stream;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.time.Duration;
import java.util.Arrays;

@Slf4j
@Configuration
public class KafkaStreamHelloListener 

    @Bean
    public KStream<String,String> kStream(StreamsBuilder streamsBuilder)
        //创建KStream对象,同时指定从那个topic中接收消息
        KStream<String, String> stream = streamsBuilder.stream("itcast-topic-input");
        stream.flatMapValues((ValueMapper<String, Iterable<String>>) value -> Arrays.asList(value.split(" ")))
                //根据value进行聚合分组
                .groupBy((key,value)->value)
                //聚合计算时间间隔
                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                //求单词的个数
                .count()
                .toStream()
                //处理后的结果转换为string字符串
                .map((key,value)->
                    System.out.println("key:"+key+",value:"+value);
                    return new KeyValue<>(key.key().toString(),value.toString());
                )
                //发送消息
                .to("itcast-topic-out");
        return stream;
    

        这里实现的功能还是计算单词个数,假如你有其他计算需求你可以更改里面的逻辑代码以符合你的需求。该类可注入StreamBuilder,其返回值必须是KStream且放入Spring容器中(添加了@Bean注解)。

二:热点文章实时计算

1.实现思路

        实现思路很简单,当用户有点赞、收藏、阅读等行为记录时候,就将消息发送给Kafka进行流式处理,随后Kafka再进行聚合并重新计算文章分值,除此之外还需要更新数据库中的数据。需要注意的是,按常理来说当天的文章热度权重是要比非当天的文章热度权重大的,因此当日文章的热度权重需要乘以3,随后查询Redis中的数据,假如该文章分数大于Redis中最低分文章,这时候就需要进行替换操作,更新Redis数据。 

2.环境搭建

2.1:在文章微服务中集成Kafka生产者配置

(1)修改nacos,增加内容:

kafka:
    bootstrap-servers: 49.234.52.192:9092
    producer:
        retries: 10
        key-serializer: org.apache.kafka.common.serialization.StringSerializer
        value-serializer: org.apache.kafka.common.serialization.StringSerializer
    hosts: 49.234.52.192:9092
    group: $spring.application.name

(2)定义相关实体类、常量

package com.my.model.mess;

import lombok.Data;

@Data
public class UpdateArticleMess 

    /**
     * 修改文章的字段类型
      */
    private UpdateArticleType type;
    /**
     * 文章ID
     */
    private Long articleId;
    /**
     * 修改数据的增量,可为正负
     */
    private Integer add;

    public enum UpdateArticleType
        COLLECTION,COMMENT,LIKES,VIEWS;
    

2.2:记录用户行为

@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
/**
 * 读文章行为记录(阅读量+1)
 * @param map
 * @return
 */
public ResponseResult readBehavior(Map map) 
    if(map == null || map.get("articleId") == null) 
        return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
    

    Long articleId = Long.parseLong((String) map.get("articleId"));
    ApArticle apArticle = getById(articleId);

    if(apArticle != null) 
        //获取文章阅读数
        Integer views = apArticle.getViews();
        if(views == null) 
            views = 0;
        

        //调用Kafka发送消息
        UpdateArticleMess mess = new UpdateArticleMess();
        mess.setArticleId(articleId);
        mess.setType(UpdateArticleMess.UpdateArticleType.VIEWS);
        mess.setAdd(1);
        kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));

        //更新文章阅读数
        LambdaUpdateWrapper<ApArticle> luw = new LambdaUpdateWrapper<>();
        luw.eq(ApArticle::getId,articleId);
        luw.set(ApArticle::getViews,views + 1);
        update(luw);
    

    return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);


/**
 * 用户点赞
 * @param map
 * @return
 */
@Override
public ResponseResult likesBehavior(Map map) 
    if(map == null || map.get("articleId") == null) 
        return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
    

    Long articleId = Long.parseLong((String) map.get("articleId"));
    Integer operation = (Integer) map.get("operation");
    ApArticle apArticle = getById(articleId);

    UpdateArticleMess mess = new UpdateArticleMess();
    mess.setArticleId(articleId);
    mess.setType(UpdateArticleMess.UpdateArticleType.LIKES);

    if(apArticle != null) 
        //获取文章点赞数
        Integer likes = apArticle.getLikes();
        if(likes == null) 
            likes = 0;
        

        //更新文章点赞数
        LambdaUpdateWrapper<ApArticle> luw = new LambdaUpdateWrapper<>();
        luw.eq(ApArticle::getId,articleId);
        if(operation == 0) 
            //点赞
            log.info("用户点赞文章...");
            luw.set(ApArticle::getLikes,likes + 1);
            //分值增加
            mess.setAdd(1);
         else 
            //取消点赞
            log.info("用户取消点赞文章...");
            luw.set(ApArticle::getLikes,likes - 1);
            //分值减少
            mess.setAdd(-1);
        

        //调用Kafka发送消息
        kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));

        update(luw);
    

    return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);


/**
 * 用户收藏
 * @param map
 * @return
 */
@Override
public ResponseResult collBehavior(Map map) 
    if(map == null || map.get("entryId") == null) 
        return ResponseResult.errorResult(AppHttpCodeEnum.PARAM_INVALID);
    

    Long articleId = Long.parseLong((String) map.get("entryId"));
    Integer operation = (Integer) map.get("operation");
    ApArticle apArticle = getById(articleId);

    //消息载体
    UpdateArticleMess mess = new UpdateArticleMess();
    mess.setArticleId(articleId);
    mess.setType(UpdateArticleMess.UpdateArticleType.COLLECTION);

    if(apArticle != null) 
        //获取文章收藏数
        Integer collection = apArticle.getCollection();
        if(collection == null) 
            collection = 0;
        

        //更新文章收藏数
        LambdaUpdateWrapper<ApArticle> luw = new LambdaUpdateWrapper<>();
        luw.eq(ApArticle::getId,articleId);
        if(operation == 0) 
            //收藏
            log.info("用户收藏文章...");
            luw.set(ApArticle::getCollection,collection + 1);
            mess.setAdd(1);
         else 
            //取消收藏
            log.info("用户取消收藏文章...");
            luw.set(ApArticle::getCollection,collection - 1);
            mess.setAdd(-1);
        

        //调用Kafka发送消息
        kafkaTemplate.send(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC,JSON.toJSONString(mess));

        update(luw);
    

    return ResponseResult.okResult(AppHttpCodeEnum.SUCCESS);

        这一步主要是当用户对文章进行访问、点赞、评论或者收藏时候就会更新数据库中的记录,同时还要将该行为记录封装并发送至Kafka。

2.3:定义Stream实现消息接收并聚合

package com.my.article.stream;

import com.alibaba.fastjson.JSON;
import com.my.common.constans.HotArticleConstants;
import com.my.model.mess.ArticleVisitStreamMess;
import com.my.model.mess.UpdateArticleMess;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.time.Duration;

@Configuration
@Slf4j
public class HotArticleStreamHandler 

    @Bean
    public KStream<String,String> kStream(StreamsBuilder streamsBuilder)
        //接收消息
        KStream<String,String> stream = streamsBuilder.stream(HotArticleConstants.HOT_ARTICLE_SCORE_TOPIC);
        //聚合流式处理
        stream.map((key,value)->
            UpdateArticleMess mess = JSON.parseObject(value, UpdateArticleMess.class);
            //重置消息的key:1234343434   和  value: likes:1
            return new KeyValue<>(mess.getArticleId().toString(),mess.getType().name()+":"+mess.getAdd());
        )
                //按照文章id进行聚合
                .groupBy((key,value)->key)
                //时间窗口  每十秒聚合一次
                .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
                /*
                  自行地完成聚合的计算
                 */
                .aggregate(new Initializer<String>() 
                    /**
                     * 初始方法,返回值是消息的value
                     */
                    @Override
                    public String apply() 
                        return "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0";
                    
                    /*
                      真正的聚合操作,返回值是消息的value
                     */
                , new Aggregator<String, String, String>() 
                    /**
                     * 聚合并返回
                     * @param key  文章id
                     * @param value  重置后的value  ps:likes:1
                     * @param aggValue "COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0"
                     * @return  aggValue格式
                     */
                    @Override
                    public String apply(String key, String value, String aggValue) 
                        //用户没有进行任何操作
                        if(StringUtils.isBlank(value))
                            return aggValue;
                        
                        String[] aggAry = aggValue.split(",");
                        //收藏、评论、点赞、阅读量初始值
                        int col = 0,com=0,lik=0,vie=0;
                        for (String agg : aggAry) 
                            //for --> COLLECTION:0
                            String[] split = agg.split(":");
                            //split[0]:COLLECTION,split[1]:0
                            /*
                              获得初始值,也是时间窗口内计算之后的值
                              第一次获取到的值为0
                             */
                            switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0]))
                                case COLLECTION:
                                    col = Integer.parseInt(split[1]);
                                    break;
                                case COMMENT:
                                    com = Integer.parseInt(split[1]);
                                    break;
                                case LIKES:
                                    lik = Integer.parseInt(split[1]);
                                    break;
                                case VIEWS:
                                    vie = Integer.parseInt(split[1]);
                                    break;
                            
                        
                        /*
                          累加操作
                         */
                        String[] valAry = value.split(":");
                        switch (UpdateArticleMess.UpdateArticleType.valueOf(valAry[0]))
                            case COLLECTION:
                                col += Integer.parseInt(valAry[1]);
                                break;
                            case COMMENT:
                                com += Integer.parseInt(valAry[1]);
                                break;
                            case LIKES:
                                lik += Integer.parseInt(valAry[1]);
                                break;
                            case VIEWS:
                                vie += Integer.parseInt(valAry[1]);
                                break;
                        

                        String formatStr = String.format("COLLECTION:%d,COMMENT:%d,LIKES:%d,VIEWS:%d", col, com, lik, vie);
                        log.info("文章的id:",key);
                        log.info("当前时间窗口内的消息处理结果:",formatStr);

                        //必须返回和apply()的返回类型
                        return formatStr;
                    
                , Materialized.as("hot-article-stream-count-001"))
                .toStream()
                .map((key,value)->
                    return new KeyValue<>(key.key().toString(),formatObj(key.key().toString(),value));
                )
                //发送消息
                .to(HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC);

        return stream;


    

    /**
     * 格式化消息的value数据
     * @param articleId  文章id
     * @param value  聚合结果
     * @return  String
     */
    public String formatObj(String articleId,String value)
        ArticleVisitStreamMess mess = new ArticleVisitStreamMess();
        mess.setArticleId(Long.valueOf(articleId));
        //COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0
        String[] valAry = value.split(",");
        for (String val : valAry) 
            String[] split = val.split(":");
            switch (UpdateArticleMess.UpdateArticleType.valueOf(split[0]))
                case COLLECTION:
                    mess.setCollect(Integer.parseInt(split[1]));
                    break;
                case COMMENT:
                    mess.setComment(Integer.parseInt(split[1]));
                    break;
                case LIKES:
                    mess.setLike(Integer.parseInt(split[1]));
                    break;
                case VIEWS:
                    mess.setView(Integer.parseInt(split[1]));
                    break;
            
        
        log.info("聚合消息处理之后的结果为:",JSON.toJSONString(mess));
        return JSON.toJSONString(mess);
    

        这一步是最难但是也是最重要的,首先我们接收到消息之后需要先对其key和value进行重置,因为这时候接收到的数据是一个JSON字符串格式的UpdateArticleMess对象,我们需要将其重置为key value键值对的格式。也即将其格式转化成key为文章id,value为用户行为记录,如key:182738789987,value:LIKES:1,表示用户对该文章点赞一次。随后选择对文章id进行聚合,每10秒钟聚合一次,需要注意的是,apply()函数中返回结构必须是“COLLECTION:0,COMMENT:0,LIKES:0,VIEWS:0”格式。

2.4:重新计算文章分值并更新Redis缓存数据

@Service
@Transactional
@Slf4j
public class ApArticleServiceImpl extends ServiceImpl<ApArticleMapper, ApArticle> implements ApArticleService 
    /**
     * 更新文章分值,同时更新redis中热点文章数据
     * @param mess
     */
    @Override
    public void updateScore(ArticleVisitStreamMess mess) 
        //1.获取文章数据
        ApArticle apArticle = getById(mess.getArticleId());
        //2.计算文章分值
        Integer score = computeScore(apArticle);
        score = score * 3;

        //3.替换当前文章对应频道热点数据
        replaceDataToRedis(apArticle,score,ArticleConstas.HOT_ARTICLE_FIRST_PAGE + apArticle.getChannelId());

        //4.替换推荐频道文章热点数据
        replaceDataToRedis(apArticle,score,ArticleConstas.HOT_ARTICLE_FIRST_PAGE + ArticleConstas.DEFAULT_TAG);
    

    /**
     * 根据权重计算文章分值
     * @param apArticle
     * @return
     */
    private Integer computeScore(ApArticle apArticle) 
        Integer score = 0;
        if(apArticle.getLikes() != null)
            score += apArticle.getLikes() * ArticleConstas.HOT_ARTICLE_LIKE_WEIGHT;
        
        if(apArticle.getViews() != null)
            score += apArticle.getViews();
        
        if(apArticle.getComment() != null)
            score += apArticle.getComment() * ArticleConstas.HOT_ARTICLE_COMMENT_WEIGHT;
        
        if(apArticle.getCollection() != null)
            score += apArticle.getCollection() * ArticleConstas.HOT_ARTICLE_COLLECTION_WEIGHT;
        

        return score;
    

    /**
     * 替换数据并存入到redis
     * @param apArticle 文章信息
     * @param score 文章新的得分
     * @param key redis数据的key值
     */
    private void replaceDataToRedis(ApArticle apArticle,Integer score, String key) 
        String articleListStr = cacheService.get(key);
        if(StringUtils.isNotBlank(articleListStr)) 
            List<HotArticleVo> hotArticleVos = JSON.parseArray(articleListStr, HotArticleVo.class);

            boolean flag = true;

            //如果缓存中存在该文章,直接更新文章分值
            for (HotArticleVo hotArticleVo : hotArticleVos) 
                if(hotArticleVo.getId().equals(apArticle.getId())) 
                    if(key.equals(ArticleConstas.HOT_ARTICLE_FIRST_PAGE + apArticle.getChannelId())) 
                        log.info("频道缓存中存在该文章,文章分值更新-->",apArticle.getChannelName(),apArticle.getId(),hotArticleVo.getScore(),score);
                     else 
                        log.info("推荐频道缓存中存在该文章,文章分值更新-->",apArticle.getId(),hotArticleVo.getScore(),score);
                    
                    hotArticleVo.setScore(score);
                    flag = false;
                    break;
                
            

            //如果缓存中不存在该文章
            if(flag) 
                //缓存中热点文章数少于30,直接增加
                if(hotArticleVos.size() < 30) 
                    log.info("该文章不在缓存,但是文章数少于30,直接添加",apArticle.getId());
                    HotArticleVo hotArticleVo = new HotArticleVo();
                    BeanUtils.copyProperties(apArticle,hotArticleVo);
                    hotArticleVo.setScore(score);
                    hotArticleVos.add(hotArticleVo);
                 else 
                    //缓存中热点文章数大于或等于30
                    //1.排序
                    hotArticleVos = hotArticleVos.stream().sorted(Comparator.
                            comparing(HotArticleVo::getScore).reversed()).collect(Collectors.toList());
                    //2.获取最小得分值
                    HotArticleVo minScoreHotArticleVo = hotArticleVos.get(hotArticleVos.size() - 1);
                    if(minScoreHotArticleVo.getScore() <= score) 
                        //3.移除分值最小文章
                        log.info("替换分值最小的文章...");
                        hotArticleVos.remove(minScoreHotArticleVo);
                        HotArticleVo hotArticleVo = new HotArticleVo();
                        BeanUtils.copyProperties(apArticle,hotArticleVo);
                        hotArticleVo.setScore(score);
                        hotArticleVos.add(hotArticleVo);
                    
                
            

            //重新排序并缓存到redis
            hotArticleVos = hotArticleVos.stream().sorted(Comparator.comparing(HotArticleVo::getScore).reversed())
                    .collect(Collectors.toList());
            cacheService.set(key,JSON.toJSONString(hotArticleVos));
            if(key.equals(ArticleConstas.HOT_ARTICLE_FIRST_PAGE + apArticle.getChannelId())) 
                log.info("成功刷新频道中热点文章缓存数据",apArticle.getChannelName());
             else 
                log.info("成功刷新推荐频道中热点文章缓存数据");
            
        
    

        这一步主要是逻辑处理部分,在这里我们需要完成对文章的得分进行重新计算并根据计算结果更新Redis中的缓存数据。计算到得分之后,我们需要分别对不同频道和推荐频道进行处理,但是处理流程相同。首先我们会先判断缓存中的数据有没有满30条,如果没满则直接该文章添加到缓存中作为热榜文章;如果缓存中已满30条数据,这时候就要分两种情况处理,如果缓存中存在该文章数据,则直接对其得分进行更新,如若不然则需要将该文章分值与缓存中的最低分进行比较,如果改文章得分比最低分高则直接进行替换,否则不做处理。最后还需要对缓存中的数据重新排序并再次发送到Reids中。

2.5:设置监听类

package com.my.article.listener;

import com.alibaba.fastjson.JSON;
import com.my.article.service.ApArticleService;
import com.my.common.constans.HotArticleConstants;
import com.my.model.mess.ArticleVisitStreamMess;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class ArticleIncrHandleListener 

    @Autowired
    private ApArticleService apArticleService;

    @KafkaListener(topics = HotArticleConstants.HOT_ARTICLE_INCR_HANDLE_TOPIC)
    public void onMessage(String mess)
        if(StringUtils.isNotBlank(mess))
            ArticleVisitStreamMess articleVisitStreamMess = JSON.parseObject(mess, ArticleVisitStreamMess.class);
            apArticleService.updateScore(articleVisitStreamMess);
        
    

三:功能测试

打开App端对一篇文章进行浏览并点赞收藏

到控制台查看日志信息 

        可以看到 成功记录用户行为并且将文章得分进行了更改,其处理流程是这样的,首先接收到的是用户的点赞数据,随后接收到用户的浏览记录,最后接收到的是用户的收藏记录,由于前面提到的消息处理是增加而不是更新,所以最后我们可以看到时间窗口处理结果为COLLECTION:1,COMMENT:0,LIKES:1,VIEWS:1,10秒钟之后就会对消息进行聚合,假如这10秒之内还有其他用户也进行了点赞阅读操作,这时候就会继续将消息增加在原来处理结果上面,过了10秒之后就会进行一次聚合处理,也即拿着这批数据进行数据更新操作。

至此该项目的开发就告一段落了,后续有什么优化我会再发文介绍。

友情链接: 牛客网  刷题|面试|找工作神器

通过selenium实时获取斗鱼主播热度数据!斗鱼一哥居然是他?(代码片段)

...据进行抓取。随意打开一个主播的页面,我们想要抓取的热度信息如下: 34949+7177,热度由两部分相加得到,后一部分貌似是近期的活动加成。在Chrome中,点击右键选择“检查”,将会打开开发者工具 点击图中最左边的... 查看详情

apachestrom实时计算系统(代码片段)

ApacheStrom实时计算系统Storm简介ApacheStorm是一个分布式大数据实时计算系统,Storm设计用于在容错和水平可拓展方法中实时处理大数据,是一个数据流框架,可以使用Storm并行的对实时数据执行各种操作。相比于Hadoop的... 查看详情

实时流计算(代码片段)

总结自——吃透实时流计算文章目录1.流计算通用架构数据采集模块数据传输模块数据处理模块数据存储模块2.流计算本质:NIO+异步NIO如何优化IO和CPU都密集的任务异步编程3.反压机制4.死锁:为什么流计算应用突然卡... 查看详情

《大话实时计算》(代码片段)

...f;不知道大数据技术生态圈长啥样?香菇?不知道实时计算到底是什么鬼?淘宝双11实时交易金额统计背后的故事?。。。。。。噢特啦!金融中心大数据特种部队低调推出《开讲啦》系列培训,喊你一起... 查看详情

「实时视频流分析的边缘计算技术」最新2022研究综述(代码片段)

清华大学最新《面向实时视频流分析的边缘计算技术》综述实时视频流分析在智能监控、智慧城市、自动驾驶等场景中具有重要价值.然而计算负载高、带宽需求大、延迟要求严等特点使得实时视频流分析难以通过传统的云计算... 查看详情

heatmap热度图(代码片段)

#!/usr/bin/envpython#-*-coding:utf-8-*-importnumpyasnpimportpandasaspdimportseabornassnsfromscipyimportstatsimportmatplotlibasmplimportmatplotlib.pyplotasplt#热度图heatmapnp.random.seed(0)sns.set()unifor 查看详情

flink实时计算pvuv的几种方法(代码片段)

本文首发于:Java大数据与数据仓库,Flink实时计算pv、uv的几种方法实时统计pv、uv是再常见不过的大数据统计需求了,前面出过一篇SparkStreaming实时统计pv,uv的案例,这里用Flink实时计算pv,uv。我们需要统计不同数据类型每天的pv,... 查看详情

sparkstreaming实时计算框架学习01(代码片段)

文章目录初探SparkStreaming掌握DStream编程模型DStream转换操作DStream窗口操作DStream输出操作使用foreachPartition,将处理结果写到MySQL数据库中初探SparkStreaming从hadoop102的8888端口接受一行或者多行文本内容,并对接收到的内容以... 查看详情

浅谈分布式计算的开发与实现(代码片段)

实时计算接上篇,离线计算是对已经入库的数据进行计算,在查询时对批量数据进行检索、磁盘读取展示。而实时计算是在数据产生时就对其进行计算,然后实时展示结果,一般是秒级。举个例子来说,如果有个大型网站,要实... 查看详情

concurrentihawk—实时并行计算机仿真系统(代码片段)

Concurrent公司的iHawk并行计算机仿真系统是具有高实时特性的实时仿真系统,该仿真系统包含对称多处理器计算机平台、实时操作系统、实时开发工具以及应用软件。系统以MATLAB?/Simulink?软件作为前端建模工具,并可兼容C/C++、Ada... 查看详情

structredstreaming+kafka+mysql(spark实时计算|天猫双十一实时报表分析)(代码片段)

...总结前言每年天猫双十一购物节,都会有一块巨大的实时作战大屏,展现当前的销售情况。这种炫酷的页面背后,其实有着非常强大的技术支撑,而这种场景其实就是实时报表分析。1、业务需求概述​模拟交易订... 查看详情

实时--交互(代码片段)

 一般现在的实时框架两种:①数据(日志log、DB)--->SparkStreaming(计算)---->Mysql/Redis(得到计算结果,一般数据量比较小,直接给前台即可);  DB-->Canal--->ES(es中没有join操作)  如果前台想根据数据进行分... 查看详情

sparkstreaming实时计算(代码片段)

 spark批处理模式:  receiver模式:接收数据流,负责数据的存储维护,缺点:数据维护复杂(可靠性,数据积压等),占用计算资源(core,memory被挤占)  direct模式:数据源由三方组件完成,spark只负责数据拉取计算,... 查看详情

实时监控:基于流计算oceanus(flink)实现系统和应用级实时监控(代码片段)

...高级工程师本文描述了如何使用腾讯云大数据组件来完成实时监控系统的设计和实现,通过实时采集并分析云服务器(CVM)及其App应用的CPU和内存等资源消耗数据,以短信、电话、微信消息等方式实时反馈监控告... 查看详情

实时数仓方案如何选型和构建(代码片段)

目录一、为何需要实时数仓架构二、数仓如何分层&各层用途三、数仓分层的必要性四、从Lambda架构说起五、Kappa架构解决哪些问题六、深入实时数仓架构方案1:Kappa架构方案2:基于标准分层+流计算方案3:... 查看详情

flink+kafka实现wordcount实时计算(代码片段)

1.FlinkFlink介绍:Flink是一个针对流数据和批数据的分布式处理引擎。它主要是由Java代码实现。目前主要还是依靠开源社区的贡献而发展。对Flink而言,其所要处理的主要场景就是流数据,批数据只是流数据的一个极限特例而已。... 查看详情

flink基于flinkcep实时计算商品订单流失量(代码片段)

...g.csdn.net/tzs_1041218129/article/details/108786597假设有个需求需要实时计算商品的订单流失量,规则如下:用户点击商品A,但购买了同类商品B,则商品A记为一次订单流失量;点击商品A到购买同类商品B的有效时间窗口... 查看详情

基于apachehudi和apachesparksql的近实时数仓架构分享(代码片段)

...据数据的延迟情况,数据的时效性一般分为离线、准实时、实时。离线计算一般是以天(T)为界限,比如离线场景最多的就是T-1计算,也就是今天计算昨天产生的数据。准实时计算一般以小时(H)为... 查看详情