实时--交互(代码片段)

shengyang17 shengyang17     2022-12-09     200

关键词:

 

一般现在的实时框架两种:

①数据(日志log、DB)--->SparkStreaming(计算)---->Mysql / Redis (得到计算结果,一般数据量比较小,直接给前台即可);

  DB-->Canal--->ES(es中没有join操作)

  如果前台想根据数据进行分析,再进行统计,就不能拿结果进行分析,要拿明细宽表;这个宽表时要多个表进行join操作,而上边不管从mysql还是log都是单表操作;

②数据(hive宽表)---->SparkStreaming-----> ES(存储数据量大,也可以实时进行交互);有些可以容忍T+1(可以容忍一天),就可以使用hive进行join组成宽表;

T+0即使有canal得到更新变化的进行反查得到更多数据,在canal中做一个jdbc的查询mysql,实效有点延迟,对mysql的业务数据库也会增加一定的压力;

     技术图片

最终交互效果图:

技术图片

 

根据条件分析将用户的购买行为

数仓中存储了大量的明细数据,但是hadoop存储的数仓计算必须经过mr ,所以即时交互性非常糟糕。为了方便数据分析人员查看信息,数据平台需要提供一个能够根据文字及选项等条件,进行灵活分析判断的数据功能。

 建立gmall-hiveToES的maven模块

 从hive中查询到宽表信息,导入到ES中;resources/ hive-site.xml  ===>找hive中的源数据,要有mysql-connect-java的maven包

宽表dws_sale_detail_daycount的每个字段要和 样例类SaleDetailDaycount的类型要一致,对应不上就用cast进行转换;
只导入当天的数据,加上日期;最后程序会打成jar包,linux中传参数日期;
object SaleApp 
  def main(args: Array[String]): Unit = 
    var date: String = ""
    if (args.length > 0)
      val date = args(0)
    else
      date = "2019-05-09"
    
    val sparkConf: SparkConf = new SparkConf().setAppName("gmall").setMaster("local[*]")
    val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()
    // 读取hive 的宽表
    sparkSession.sql("use gmall")
    import sparkSession.implicits._
    //sparkSession.sql("select * from  dws_sale_detail_daycount where dt=‘2019-05-09‘ and order_price is not null").show()
    val sqlSale = "select user_id,sku_id,user_gender," +
      "cast(user_age as int) user_age," +
      "user_level," +
      "cast(order_price as double) sku_price," +
      "sku_name,sku_tm_id, sku_category3_id,sku_category2_id, sku_category1_id,sku_category3_name,sku_category2_name,sku_category1_name,spu_id," +
      "cast(sku_num as long) sku_num, " +
      "cast(order_count as long) order_count," +
      "cast(order_amount as double) order_amount," +
      "dt from dws_sale_detail_daycount where dt=‘"+date+"‘ and order_price is not null"
    //如果hive中有大量null数据是不行的
    val saleRdd: RDD[SaleDetailDaycount] = sparkSession.sql(sqlSale).as[SaleDetailDaycount].rdd  
    //saleRdd.foreach(println)
    /*val filterRDD: RDD[SaleDetailDaycount] = saleRdd.filter(row => row != null)//过滤掉空null的,使用sql语句进行过滤
    filterRDD.foreach(println)*/
    // 往es中写入
    saleRdd.foreachPartition  saleItr =>
      var i = 0
      val listBuffer: ListBuffer[SaleDetailDaycount] = new ListBuffer
      for (saleDetail <- saleItr) 
        listBuffer += saleDetail
        i += 1
        //达到100进行批量保存
        if (i%100 == 0)
          MyEsUtil.insertEsBatch(GmallConstant.ES_INDEX_SALE, listBuffer.toList)
          listBuffer.clear()
        
      
      //零头 批量保存
      if (listBuffer.size > 0)
        MyEsUtil.insertEsBatch(GmallConstant.ES_INDEX_SALE, listBuffer.toList)
      
    

  

根据宽表搭建es中的索引结构 

分析宽表字段:

字段一共分3类 : 需要分词匹配的,需要索引(过滤、聚合、排序)的不需要索引的
 string  comment 用户 id,
    string comment 商品 Id,
    string comment 用户性别,
    string  comment 用户年龄,
    string comment 用户等级,
    decimal(10,2) comment 订单价格,
    string   comment 商品名称,
    string   comment 品牌id,
    string comment 商品三级品类id,
    string comment 商品二级品类id,
    string comment 商品一级品类id,
     string comment 商品三级品类名称,
    string comment 商品二级品类名称,
      string comment 商品一级品类名称,
    string comment 商品 spu,
    int comment 购买个数,
    string comment 当日下单单数,
    string comment 当日下单金额

 
user_id        需要过滤匹配的 
sku_id         需要过滤匹配的 
user_gender    需要过滤匹配的 
user_age       需要过滤匹配的 
user_level     需要过滤匹配的 
sku_price      需要过滤匹配的 
sku_name        需要分词匹配的
sku_tm_id        需要过滤匹配的
sku_category1_id     需要过滤匹配的
sku_category2_id     需要过滤匹配的
sku_category3_id     需要过滤匹配的
sku_category1_name     需要分词匹配的
sku_category2_name     需要分词匹配的
sku_category3_name     需要分词匹配的
spu_id               需要过滤匹配的
sku_num              需要过滤匹配的
order_count           需要过滤匹配的
order_amount          需要过滤匹配的

 

建立mapping时,要考虑要不要分词,要不要索引

mapping表结构定义
ES字段定义要考虑:
  1.某个字段要不要分词;(分词时用来查询的;是否要全文索引,是否需要查询) 商品名称、文章、文章标题 取决于字段类型;

   分词时要选择text, keyword不分词;
   关键词查询, ; 中文的索引需要选分词器:ik有两种:ik_smart(尽可能精简的分)、 ik_max_word(尽可能多的分),商品名称一般用这个分词器;
  2.某个字段要不要索引; index=true就是索引,index=false就不用索引 过滤 排序 聚合

text既分词又索引,但不能聚合;

首先要安装分词器  https://www.cnblogs.com/shengyang17/p/10583596.html  中文分词

PUT gmall_sale_detail

  "mappings": 
    "_doc":
      "properties":
         "user_id":
           "type":"keyword"
         ,
         "sku_id":
           "type":"keyword"
         ,
         "user_gender":
           "type":"keyword"
         ,
         "user_age":
           "type":"short"
         ,
         "user_level":
           "type":"keyword"
         ,
         "sku_price":
           "type":"double" 
         ,
         "sku_name":
           "type":"text",
           "analyzer": "ik_max_word"
         ,
         "sku_tm_id ":
           "type":"keyword"
         ,
         "sku_category3_id":
           "type":"keyword"
         ,
         "sku_category2_id":
           "type":"keyword"
         ,
         "sku_category1_id":
           "type":"keyword"
         ,
         "sku_category3_name":
           "type":"text",
           "analyzer": "ik_max_word"
         ,
         "sku_category2_name":
           "type":"text",
           "analyzer": "ik_max_word"
         ,
         "sku_category1_name":
           "type":"text",
           "analyzer": "ik_max_word"
         ,
         "spu_id":
           "type":"keyword"
         ,
         "sku_num":
           "type":"long"
         ,
         "order_count":
           "type":"long"
         ,
         "order_amount":
           "type":"long"
         ,
         "dt":
           "type":"keyword"
          
      
    
  
 

需要利用关键词查询

传入路径及参数:

http://localhost:8070/sale_detail?date=2019-05-09&&startpage=1&&size=5&&keyword=手机

返回格式JSON串:

技术图片

 

DSL查询语句:

match匹配;  小米且 手机,使用operator: and;#######过滤: 日期、关键词、匹配

######日期+关键字匹配
GET gmall_sale_detail/_search "query": "bool": "filter": "term": "dt": "2019-05-09" , "must": [ "match": "sku_category1_name": "query": "手机", "operator": "and" ] , "size": 100

  聚合性别和 年龄

 

##聚合性别
GET gmall_sale_detail/_search

  "query": 
    "bool": 
      "filter": 
        "term": 
          "dt": "2019-05-09"
        
      , 
      "must": [
        "match":
          "sku_category1_name": 
            "query": "手机",
            "operator": "and"
          
          
          
        
     ] 
    
  
  , "aggs":  
    "groupby_gender": 
      "terms": 
        "field": "user_gender",
        "size": 2
      
    
  
  ,
  "size": 100

  同理聚合年龄;这两个聚合是并列的,不能写在一块:

技术图片
##聚合年龄
GET gmall_sale_detail/_search

  "query": 
    "bool": 
      "filter": 
        "term": 
          "dt": "2019-05-09"
        
      , 
      "must": [
        "match":
          "sku_category1_name": 
            "query": "手机",
            "operator": "and"
          
          
          
        
     ] 
    
  
  , "aggs":  
    "groupby_age": 
      "terms": 
        "field": "user_age",
        "size": 100
      
    
  
  ,
  "size": 100
View Code

 把DSL语句转变成代码实现:

SpringBoot---gmall-publisher

publisherServerImpl.java

技术图片
    /** 宽表导入ES中,es中进行过滤、匹配、聚合 **/
    @Override
    public SaleInfo getSaleInfo(String date, String keyword, int startPage, int pagesize, String aggsFieldName, int aggsize) 
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
        //过滤日期
        boolQueryBuilder.filter(new TermQueryBuilder("dt", date));
        //匹配: 商品关键词
        boolQueryBuilder.must(new MatchQueryBuilder("sku_category1_name", keyword).operator(MatchQueryBuilder.Operator.AND));
        searchSourceBuilder.query(boolQueryBuilder);
        //聚合
        TermsBuilder termsAggs = AggregationBuilders.terms("groupby_" + aggsFieldName).field(aggsFieldName).size(aggsize);
        searchSourceBuilder.aggregation(termsAggs);
        //分页
        searchSourceBuilder.from((startPage-1) * pagesize);
        searchSourceBuilder.size(pagesize);

        Search search = new Search.Builder(searchSourceBuilder.toString()).addIndex(GmallConstant.ES_INDEX_SALE).addType(GmallConstant.ES_TYPE_DEFAULT).build();

        SaleInfo saleInfo = new SaleInfo();
        List<Map> detailList = new ArrayList<>();

        try 
            SearchResult searchResult = jestClient.execute(search);
            //总数
            saleInfo.setTotal(searchResult.getTotal()); //要set; 不然后边查询时会报java.lang.NullPointerException: null
            //明细
            List<SearchResult.Hit<Map, Void>> hits = searchResult.getHits(Map.class);
            for (SearchResult.Hit<Map, Void> hit : hits) 
                Map source = hit.source;
                detailList.add(source);
            
            saleInfo.setDetail(detailList);
            //饼图(聚合结果)
            Map aggsTempMap = new HashMap<>();
            List<TermsAggregation.Entry> buckets = searchResult.getAggregations().getTermsAggregation("groupby_" + aggsFieldName).getBuckets();
            for (TermsAggregation.Entry bucket : buckets) 
                aggsTempMap.put(bucket.getKey(), bucket.getCount());
            
            saleInfo.setTempAggsMap(aggsTempMap);

         catch (IOException e) 
            e.printStackTrace();
        
        return saleInfo;
    
View Code

 

启动SpringBoot的类:com.atguigu.gmall.publisher.GmallPublisherApplication

技术图片

启动SpirngBoot---db-chart的主类:com.demo.DemoApplication

  localhost:8089/table技术图片

 

websocket+netty实时视频弹幕交互功能(java版)(代码片段)

...;不整个弹幕都说不过去了,今天笔者就抽空做了一个实时视频弹幕交互功能的实现,不得不说这样的形式为看视频看直播,讲义PPT,抽奖等形式增加了许多乐趣。1技术选型1.1netty官方对 查看详情

websocket+netty实时视频弹幕交互功能(java版)(代码片段)

...;不整个弹幕都说不过去了,今天笔者就抽空做了一个实时视频弹幕交互功能的实现,不得不说这样的形式为看视频看直播,讲义PPT,抽奖等形式增加了许多乐趣。1技术选型1.1netty官方对 查看详情

利用python和jupyternotebook交互式小部件生成方波,可以实时调节谐波个数和基波频率(代码片段)

最近在试着使用jupyterwaget来实现一些滑块的功能。这里是用python创造一个方波。方波本质就是一个不同次谐波的正弦函数的叠加,因此代码很简单:importipywidgetsaswidgetsfromIPython.displayimportdisplayimportmatplotlib.pyplotaspltimportnumpyasnp%matp... 查看详情

实时音频编程:实践与技巧(代码片段)

...;AQuestion1:你是选择传递还是共享对象?Question2:是否与实时线程交互?Question3:共享数据是否足够小?Question4:获取共享资源是否允许失败?Question5:实时线程会修改线程间共享的数据吗?Question6:非实时... 查看详情

一小时搭建实时数据分析平台(代码片段)

实时数据分析门槛较高,我们如何用极少的开发工作就完成实时数据平台的搭建,做出炫酷的图表呢?如何快速的搭建实时数据分析平台,首先我们需要实时数据的接入端,我们选择高扩展性、容错性、速度极快的消息系统Kafka... 查看详情

如何构建高性能可视化架构?一个交互式实时数据引擎的架构设计(代码片段)

在分析SecDB、Athena、Quartz几个实时金融与风险分析平台的时候,发现了Perspective——一个FinTech开源基金会FinOS旗下开源的交互式分析和可视化组件库,由摩根大通(J.P.MorganChase)公司开源出去的流式数据可视化组件... 查看详情

如何构建高性能可视化架构?一个交互式实时数据引擎的架构设计(代码片段)

在分析SecDB、Athena、Quartz几个实时金融与风险分析平台的时候,发现了Perspective——一个FinTech开源基金会FinOS旗下开源的交互式分析和可视化组件库,由摩根大通(J.P.MorganChase)公司开源出去的流式数据可视化组件... 查看详情

10亿+/秒!看阿里如何搞定实时数仓高吞吐实时写入与更新(代码片段)

...1a;Hologres(原交互式分析)是阿里云自研的一站式实时数仓,这个云原生系统融合了实时服务和分析大数据的场景,全面兼容PostgreSQL协议并与大数据生态无缝打通,能用同一套数据架构同时支持实时写入实时... 查看详情

sip(gb28181)信令交互-视频点播与回播(代码片段)

   客户端发起的实时点播消息示范:(请求视频信令与断开视频信息和回播基本无差别)1、请求视频流INVITEsip:00000000001310018021@192.168.40.66:7100SIP/2.0Via:SIP/2.0/UDP192.168.40.55:7100;rport;branch=z9hG4bK2480933505From:<sip:1201 查看详情

华为云mrs基于hudi和hetuengine构建实时数据湖最佳实践(代码片段)

数据湖与实时数据湖是什么?各个行业企业都在构建企业级数据湖,将企业内多种格式数据源汇聚的大数据平台,通过严格的数据权限和资源管控,将数据和算力开放给各种使用者。一份数据支持多种分析,是数据湖最大的特点... 查看详情

rtmp协议(代码片段)

RTMP是RealTimeMessagingProtocol(实时消息传输协议)的首字母缩写。该协议基于TCP,是一个协议族,包括RTMP基本协议及RTMPT/RTMPS/RTMPE等多种变种。RTMP是一种设计用来进行实时数据通信的网络协议,主要用来在Flash/AI... 查看详情

搭建内置内部库的vue3组件在线解释交互器(代码片段)

...释交互器可参考https://sfc.vuejs.org/输入代码后可在浏览器实时看到执行效果,对使用一些UI组件库,能快速体验,试不同的参数很方便REPLread、eval、print、loop,如他的名字一样,拿到用户输入,执行,输... 查看详情

搭建内置内部库的vue3组件在线解释交互器(代码片段)

...释交互器可参考https://sfc.vuejs.org/输入代码后可在浏览器实时看到执行效果,对使用一些UI组件库,能快速体验,试不同的参数很方便REPLread、eval、print、loop,如他的名字一样,拿到用户输入,执行,输... 查看详情

bms产品控制策略和整车交互策略(代码片段)

...述温度监测功能是指在BMS得电正常工作时,采集器会实时采集电池模组各采样点的温度,最高、最低温度及平均温度并实时传输给BMS辅助控制器和BMS主控。5.1.2.2功能制定原因实时监测动力电池的温度信息,供BMS主控... 查看详情

matlab-实时编辑器介绍(代码片段)

在实时编辑器中,可以创建随代码一起显示代码输出的实时脚本。添加格式化文本、方程、图像和超链接用于增强记叙脚本,以及将实时脚本作为交互式文档与其他人共享。在实时编辑器中创建实时脚本。要创建实时脚... 查看详情

链接xcos和scilabgui以实时控制模型参数(代码片段)

...是一个scilab/xcosnewbee。我有一个具有多个参数的模型我想实时更改块参数(在实时图表上看到)。所以我想在gui格式中为多个参数设置TK比例块的功能。目前我有一个可以调整参数的gui,但是当我重新开始模拟时,对参数的更改... 查看详情

zynq7020开发记录(持续更新)--ps和pl间的数据交互(代码片段)

...下两个方面的考量:数据量,即容量。数据交互速度,即实时性。这两个方式直接决定着我们使用哪种手段来实现这个流程。下面本博客将用3种方式,来实现这个交互流程。方式1采用寄存器方式驱动部分test1_dma_dri.c#include<linu... 查看详情

阿里云产品-智能语音交互快速测评(代码片段)

...智能人机交互体验,适用于智能客服、质检、会议纪要、实时字幕等多个企业应用场景。其主要有以下优势:定制识别及发音:可用于客服、阅读、虚拟人等场景易接入:并提供API和多种SDK识别准确率高:持续迭代提升响应速度... 查看详情