springboot迁移数据库数据到es+操纵es进行高级检索源码

官萧何      2022-05-11     761

关键词:

实现类源码

package com.txj.bwbd.es.esService.impl;

import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.bean.copier.CopyOptions;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.txj.bwbd.common.CommonConstraint;
import com.txj.bwbd.es.dto.SearchDto;
import com.txj.bwbd.es.esDao.BwbdTypeDao;
import com.txj.bwbd.es.esDao.FgTypeDao;
import com.txj.bwbd.es.esEntity.BwbdType;
import com.txj.bwbd.es.esService.BwbdTypeService;
import com.txj.bwbd.es.esService.FgTypeService;
import com.txj.bwbd.sqlserver.entity.AlTestR3;
import com.txj.bwbd.sqlserver.entity.FgTestR3;
import com.txj.bwbd.sqlserver.entity.WdTestR3;
import com.txj.bwbd.sqlserver.service.IAlTestR3Service;
import com.txj.bwbd.sqlserver.service.IFgTestR3Service;
import com.txj.bwbd.sqlserver.service.IWdTestR3Service;
import org.apache.commons.lang3.StringUtils;
import org.apache.lucene.queryparser.classic.QueryParser;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.MultiMatchQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;


/**
 * @author: 官昌洪
 * @Description:
 * @Date: 2020/2/1 10:31
 * @Param:
 * @return:
 */
@Service
public class BwbdTypeServiceImpl implements BwbdTypeService {

    @Autowired
    BwbdTypeDao bwbdTypeDao;

    @Autowired
    RestHighLevelClient client;

    @Autowired
    IFgTestR3Service iFgTestR3Service;

    @Autowired
    IWdTestR3Service iWdTestR3Service;

    @Autowired
    IAlTestR3Service iAlTestR3Service;

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void syncFg() {
        List<FgTestR3> r3s = iFgTestR3Service.list((new QueryWrapper<FgTestR3>())
                .select("webGuid").groupBy("webGuid"));
        List<String> webGuids = r3s.stream().filter(fgTestR3 -> StringUtils.isNotEmpty(fgTestR3
                .getWebGuid())).map(fgTestR3 -> fgTestR3.getWebGuid()).collect(Collectors.toList());

        for (String webGuid : webGuids) {
            List<FgTestR3> fgTestR3s = iFgTestR3Service.list((new QueryWrapper<FgTestR3>())
                    .eq("webGuid", webGuid));
            List<BwbdType> bwbdTypes = new ArrayList<>();
            for (FgTestR3 fgTestR3 : fgTestR3s) {
                BwbdType bwbdType = new BwbdType();
                BeanUtil.copyProperties(fgTestR3, bwbdType, CopyOptions.create().ignoreNullValue());
                bwbdType.setId(BwbdType.DATA_TYPE_FG + BwbdType.ID_SPLIT + fgTestR3.getAutoid());
                bwbdType.setDataType(BwbdType.DATA_TYPE_FG);
                bwbdTypes.add(bwbdType);
            }
            bwbdTypeDao.saveAll(bwbdTypes);
        }

    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void syncWd() {
        List<WdTestR3> r3s = iWdTestR3Service.list((new QueryWrapper<WdTestR3>())
                .select("webGuid").groupBy("webGuid"));
        List<String> webGuids = r3s.stream().filter(wdTestR3 -> StringUtils.isNotEmpty(wdTestR3
                .getWebGuid())).map(wdTestR3 -> wdTestR3.getWebGuid()).collect(Collectors.toList());

        for (String webGuid : webGuids) {
            List<WdTestR3> wdTestR3s = iWdTestR3Service.list((new QueryWrapper<WdTestR3>())
                    .eq("webGuid", webGuid));
            List<BwbdType> bwbdTypes = new ArrayList<>();
            for (WdTestR3 wdTestR3 : wdTestR3s) {
                BwbdType bwbdType = new BwbdType();
                BeanUtil.copyProperties(wdTestR3, bwbdType, CopyOptions.create().ignoreNullValue());
                bwbdType.setContents(wdTestR3.getRequestContent() + wdTestR3.getContents()
                        + wdTestR3.getResponseContent());
                bwbdType.setId(BwbdType.DATA_TYPE_WD + BwbdType.ID_SPLIT + wdTestR3.getAutoid());
                bwbdType.setDataType(BwbdType.DATA_TYPE_WD);
                bwbdTypes.add(bwbdType);
            }
            bwbdTypeDao.saveAll(bwbdTypes);
        }

    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    public void syncAl() {
        List<AlTestR3> r3s = iAlTestR3Service.list((new QueryWrapper<AlTestR3>())
                .select("webGuid").groupBy("webGuid"));
        List<String> webGuids = r3s.stream().filter(fgTestR3 -> StringUtils.isNotEmpty(fgTestR3
                .getWebGuid())).map(fgTestR3 -> fgTestR3.getWebGuid()).collect(Collectors.toList());

        for (String webGuid : webGuids) {
            List<AlTestR3> alTestR3s = iAlTestR3Service.list((new QueryWrapper<AlTestR3>())
                    .eq("webGuid", webGuid));
            List<BwbdType> bwbdTypes = new ArrayList<>();
            for (AlTestR3 alTestR3 : alTestR3s) {
                BwbdType bwbdType = new BwbdType();
                BeanUtil.copyProperties(alTestR3, bwbdType, CopyOptions.create().ignoreNullValue());
                bwbdType.setId(BwbdType.DATA_TYPE_AL + BwbdType.ID_SPLIT + alTestR3.getAutoid());
                bwbdType.setDataType(BwbdType.DATA_TYPE_AL);
                bwbdType.setNumbers(alTestR3.getCaseNumber());
                bwbdTypes.add(bwbdType);
            }

            if (bwbdTypes.size() > 20000) {
                splitSave(bwbdTypes);
            } else {
                bwbdTypeDao.saveAll(bwbdTypes);
            } 
        }

    }

    /**
     * @author: 官昌洪
     * @Description: 直接保存会内存溢出,切片保存数据
     * @Date: 2020/2/5 11:43
     * @Param: 
     * @return: 
     */
    private void splitSave(List<BwbdType> bwbdTypes) {
        int size = bwbdTypes.size();
        int step = 2000;
        int fromIndex = 0;
        int toIndex = step;
        while (toIndex < size) {
            List<BwbdType> optList = bwbdTypes.subList(fromIndex, toIndex);
            bwbdTypeDao.saveAll(optList);
            fromIndex = toIndex;
            toIndex = toIndex + step;
        }

        if (fromIndex < size) {
            toIndex = size;
            List<BwbdType> optList = bwbdTypes.subList(fromIndex, toIndex);
            bwbdTypeDao.saveAll(optList);
        }
    }

    @Override
    public List<BwbdType> improveSearch(SearchDto searchDto) {

        String text = searchDto.getTerm();
        text = QueryParser.escape(text);  // 主要就是这一句把特殊字符都转义,那么lucene就可以识别
        // 搜索请求对象
        SearchRequest searchRequest = new SearchRequest(BwbdType.ES_INDEX);
        // 指定类型
        searchRequest.types(BwbdType.ES_TYPE);
        // 搜索源构建对象
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        // 搜索方式
        // 首先构造多关键字查询条件
        MultiMatchQueryBuilder matchQueryBuilder = QueryBuilders
                .multiMatchQuery(text, BwbdType.PROPERTY_NUMBERS
                        , BwbdType.PROPERTY_TITLES, BwbdType.PROPERTY_CONTENTS)
                .field(BwbdType.PROPERTY_NUMBERS, 100)
                .field(BwbdType.PROPERTY_TITLES, 10)
                .field(BwbdType.PROPERTY_CONTENTS, 1).minimumShouldMatch(BwbdType.MATCH_LEVEL_THREE);
        // 添加条件到布尔查询
        BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
        boolQueryBuilder.must(matchQueryBuilder);
//        // 通过布尔查询来构造过滤查询
//        boolQueryBuilder.filter(QueryBuilders.matchQuery("economics","L"));
        // 将查询条件封装给查询对象
        searchSourceBuilder.query(boolQueryBuilder);
        searchSourceBuilder.size(searchDto.getSize());
        searchSourceBuilder.from(searchDto.getPage() - 1);
        // ***********************

        // 高亮查询
        HighlightBuilder highlightBuilder = new HighlightBuilder();
        highlightBuilder.preTags(CommonConstraint.LIGHT_TAG_START); // 高亮前缀
        highlightBuilder.postTags(CommonConstraint.LIGHT_TAG_END); // 高亮后缀
        List<HighlightBuilder.Field> fields = highlightBuilder.fields();
        fields.add(new HighlightBuilder
                .Field(BwbdType.PROPERTY_NUMBERS)); // 高亮字段
        fields.add(new HighlightBuilder
                .Field(BwbdType.PROPERTY_TITLES)); // 高亮字段
        fields.add(new HighlightBuilder
                .Field(BwbdType.PROPERTY_CONTENTS)); // 高亮字段
        // 添加高亮查询条件到搜索源
        searchSourceBuilder.highlighter(highlightBuilder);

        // ***********************

//        // 设置源字段过虑,第一个参数结果集包括哪些字段,第二个参数表示结果集不包括哪些字段
//        searchSourceBuilder.fetchSource(new String[]{"name","studymodel","price","timestamp"},new String[]{});
        // 向搜索请求对象中设置搜索源
        searchRequest.source(searchSourceBuilder);
        // 执行搜索,向ES发起http请求
        SearchResponse searchResponse = null;
        try {
            searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
        } catch (IOException e) {
            e.printStackTrace();
        }
        List<BwbdType> bwbdTypes = obtainFgType(searchResponse);


        return bwbdTypes;
    }

    private List<BwbdType> obtainFgType(SearchResponse searchResponse) {
        // 搜索结果
        SearchHits hits = searchResponse.getHits();
        // 匹配到的总记录数
        long totalHits = hits.getTotalHits();
        // 得到匹配度高的文档
        SearchHit[] searchHits = hits.getHits();

        List<BwbdType> bwbdTypes = new ArrayList<>();

        for (SearchHit hit : searchHits) {
            String content = hit.getSourceAsString();//使用ES的java接口将实体类对应的内容转换为json字符串
            BwbdType bwbdType = JSONObject.parseObject(content,BwbdType.class); //生成pojo对象
            // 获取高亮查询的内容。如果存在,则替换原来的name
            Map<String, HighlightField> highlightFields = hit.getHighlightFields();
            if( highlightFields != null ){
                HighlightField nameField = highlightFields.get(bwbdType.PROPERTY_NUMBERS);
                if(nameField!=null){
                    Text[] fragments = nameField.getFragments();
                    StringBuffer stringBuffer = new StringBuffer();
                    for (Text str : fragments) {
                        stringBuffer.append(str.string());
                    }
                    String numbers = stringBuffer.toString();
                    bwbdType.setNumbers(numbers);
                }

                HighlightField titlesField = highlightFields.get(bwbdType.PROPERTY_TITLES);
                if(titlesField!=null){
                    Text[] fragments = titlesField.getFragments();
                    StringBuffer stringBuffer = new StringBuffer();
                    for (Text str : fragments) {
                        stringBuffer.append(str.string());
                    }
                    String titles = stringBuffer.toString();
                    bwbdType.setTitles(titles);
                }

                HighlightField contentsField = highlightFields.get(bwbdType.PROPERTY_CONTENTS);
                if(contentsField!=null){
                    Text[] fragments = contentsField.getFragments();
                    StringBuffer stringBuffer = new StringBuffer();
                    for (Text str : fragments) {
                        stringBuffer.append(str.string());
                    }
                    String contents = stringBuffer.toString();
                    bwbdType.setContents(contents);
                }
            }
            bwbdTypes.add(bwbdType);
        }
        return bwbdTypes;
    }
}

 

es7.6.2集群迁移(从一套es集群迁移数据到另一套集群)(代码片段)

有时有需要从ES集群中去除多个节点的需求,比如迁移一套ES集群到另外一套ES集群,这时可以先将新的ES节点加入到现有集群里,再将老ES节点下线。一实验环境 ​​​​​二实验步骤2.1集群扩容-添加新节点可参考ES... 查看详情

es7.6.2集群迁移(从一套es集群迁移数据到另一套集群)(代码片段)

有时有需要从ES集群中去除多个节点的需求,比如迁移一套ES集群到另外一套ES集群,这时可以先将新的ES节点加入到现有集群里,再将老ES节点下线。一实验环境 ​​​​​二实验步骤2.1集群扩容-添加新节点可参考ES... 查看详情

es版本升级并迁移数据(代码片段)

ES6.2.3(3节点)460G数据迁移到ES7.4.1(5节点)目标现在有一个ES集群(3节点,3个节点既是master也是data),存储的数据约460G。现在需要升级ES版本为7.4.1,新集群采用12个节点(4个master,8个data节点)演练... 查看详情

es版本升级并迁移数据(代码片段)

ES6.2.3(3节点)460G数据迁移到ES7.4.1(5节点)目标现在有一个ES集群(3节点,3个节点既是master也是data),存储的数据约460G。现在需要升级ES版本为7.4.1,新集群采用12个节点(4个master,8个data节点)演练... 查看详情

es数据备份与恢复

参考技术A场景:ES线上的数据和服务迁移到另外的机器上去老ES机器ip:172.16.0.1新ES机器ip:172.16.0.2一.首先,2.重启ES二.在老机器上3.创建备份仓库在/data/es/snapshot下新建名为bro_backup的仓库4.备份数据三.在新机器上(将备份数据打... 查看详情

elasticsearch数据迁移

参考技术A将es数据从一个集群迁移到另外一个集群,使用elasticdump进行迁移,elasticdump地址:https://github.com/elasticsearch-dump/elasticsearch-dump迁移单个索引:参数说明:--input:源地址,可为ES集群URL、文件或stdin,可指定索引,格式为:pr... 查看详情

es实战es集群节点迁移与缩容(代码片段)

ES集群节点迁移与缩容文章目录ES集群节点迁移与缩容master节点迁移场景一场景二场景三data节点迁移数据迁移操作1、查询集群原来的配置2、清空节点数据3、检查是否排空数据迁移原则缩容前置检查项master节点迁移场景一集群上... 查看详情

使用elasticdump迁移数据到新es集群(代码片段)

...e/details/79209236https://blog.csdn.net/laoyang360/article/details/65449407迁移方法通过logstash的input和output配置迁移(配置灵活适用于长期数据同步等)通过迁移工具如elasticdump等(适用于备份一次性小量数据操作)通过elasticsarch自带快照功能(... 查看详情

es版本升级并迁移数据(代码片段)

ES6.2.3(3节点)460G数据迁移到ES7.4.1(5节点)目标现在有一个ES集群(3节点,3个节点既是master也是data),存储的数据约460G。现在需要升级ES版本为7.4.1,新集群采用12个节点(4个master,8个data节点)演练... 查看详情

elasticsearch数据迁移与集群容灾

参考技术A本文讨论如何跨集群迁移ES数据以及如何实现ES的同城跨机房容灾和异地容灾。在ES的生产实践中,往往会遇到以下问题:根据业务需求,存在以下场景:如果是第一种场景,数据迁移过程中可以停止写入,可以采用诸... 查看详情

es实战es集群节点迁移与缩容(代码片段)

ES集群节点迁移与缩容文章目录ES集群节点迁移与缩容master节点迁移场景一场景二场景三data节点迁移数据迁移操作1、查询集群原来的配置2、清空节点数据3、检查是否排空数据迁移原则缩容前置检查项master节点迁移场景一集群上... 查看详情

es数据迁移_snapshot(不需要安装其他软件)(代码片段)

参考文章:三种常用的Elasticsearch数据迁移方案ES基于Snapshot(快照)的数据备份和还原CDH修改ElasticSearch配置文件不生效问题目录1、更改老ES和新ES的config/elasticsearch.yml2、重启老ES,在老ES执行Postman中创建备份目录... 查看详情

elasticsearch海量数据使用简述

...查询或多条件匹配查询,数据量较小的情况下通过简单的数据库模糊查询是可以解决的,但是对于数据量庞大的情况,数据库模糊查询就会出现性能问题。这种情况下的一种解决方案就是根据查询内容构建反向索引,借助搜索引... 查看详情

使用elasticsearch-dump迁移es数据

...样的地方就是--type=mapping,意思是把原始索引的mapping结构迁移给目标索引。然后在执行--type=data的6)就可以把数据迁移过去啦 如果索引很多,你还是懒得一个个去迁移,那么你可以改用这个命令:./elasticdump--input=http://192.168.1.... 查看详情

es数据库重建索引——reindex(数据迁移)(代码片段)

一、应用背景  ES在创建好索引后,mapping的properties属性类型是不能更改的,只能添加。如果说需要修改字段就需要重新建立索引然后把旧数据导到新索引。1、当你的数据量过大,而你的索引最初创建的分片数量不... 查看详情

es索引迁移snapshot-迁移部分索引

Snapshot-迁移部分索引源集群192.168.40.180192.168.40.181192.168.40.182目标集群192.168.40.61192.168.40.62192.168.40.63生产需要额外,挂载硬盘SnapshotAPI是Elasticsearch用于对数据进行备份和恢复的一组API接口,可以通过SnapshotAPI进行跨集群的数据迁移... 查看详情

elasticsearch数据迁移(代码片段)

现需要将某集群下一个索引下的所有数据迁移到另一个集群上,elasticsearch-dump,Elasticsearch-Exporter试了一下都不好使,只能老实的写代码来实现importosimportsysimportpyesimportdatetimeindex_list=[["alias-offer","offer"]]ES_URL="http://ip1:9200/"NEW_ES_UR 查看详情

es集群分片unassigned

...ds)之前:之后:可以看到删除了之前的分片数据,并重新迁移分片数据,数据文件慢慢变大此时状态为:RELOCATINGES将逐个分片进行迁移可以看到节点1已经迁移完成等待迁移完成此时在ES的head插件上可以看到全部变成绿色了,之... 查看详情