第三十五节 ElasticSearch存储订单并检索

亮子 2025-03-16 22:10:24 220 0 0 0

一、需求分析

根据公开的信息,淘宝平台上的商品数量已经超过了数十亿个,每天有成千上万的新商品上架。根据公开的数据,京东平台上的商品种类已经达到数亿个。如何存储和检索商品是一个现实而有严峻的问题。

将商品信息存入 Elasticsearch 的现实需求主要源于现代电商和零售行业对高效搜索、灵活查询和实时数据分析的迫切需求。以下是具体场景和实际问题的描述:


1. 电商平台的高效搜索体验

  • 需求场景
    用户在电商平台搜索商品时,期望输入关键词(如“红色连衣裙”或“防水运动鞋”)后,毫秒级返回相关结果,并支持模糊匹配、拼写纠错、同义词扩展(如搜索“跑步鞋”也能匹配“运动鞋”)。
  • 传统方案的痛点
  • 关系型数据库(如MySQL)的 LIKE 查询性能低下,无法应对海量商品数据(如百万级SKU)。
  • 难以实现分词、权重排序(如销量高的商品优先展示)或多字段联合检索(如同时匹配标题、描述、标签等)。
  • Elasticsearch 的解决能力
  • 倒排索引:支持全文搜索和分词(如中文IK分词),快速定位关键词。
  • 相关性评分:根据关键词匹配度、销量、评分等字段动态排序结果。
  • 模糊查询:支持拼写纠错(fuzzy search)和同义词扩展,提升搜索容错性。

2. 复杂过滤与聚合分析

  • 需求场景
    用户需要根据多个条件筛选商品(如价格区间、品牌、规格、库存状态),并期望平台提供实时数据分析(如“热销商品排行榜”或“用户点击量趋势”)。
  • 传统方案的痛点
  • 关系型数据库的多字段联合查询(如 WHERE price BETWEEN 100 AND 500 AND brand='A')在大数据量下性能急剧下降。
  • 无法高效生成聚合统计结果(如按类别统计商品数量、按地区分析销量)。
  • Elasticsearch 的解决能力
  • 组合查询(Bool Query):支持多条件组合过滤(mustshouldmust_not)。
  • 聚合分析(Aggregation):实时统计商品价格分布、品类占比、用户行为(如点击/购买趋势)。
  • 范围查询与地理搜索:支持按地理位置筛选附近商家的库存商品(如“附近5公里有货”)。

3. 实时数据同步与更新

  • 需求场景
    商品价格、库存状态、上下架信息需要实时更新,并在搜索结果中立即生效(例如秒杀活动中库存扣减或价格变动)。
  • 传统方案的痛点
  • 数据库与缓存(如Redis)之间的数据同步延迟可能导致用户看到过期信息(如已售罄商品仍显示可购买)。
  • Elasticsearch 的解决能力
  • 近实时(Near Real-Time, NRT):数据写入后1秒内可被检索,保障信息的及时性。
  • 与业务系统集成:通过Logstash、Kafka或应用层双写,实现商品数据的实时同步。

4. 应对高并发与海量数据

  • 需求场景
    在大促活动(如双11)期间,电商平台可能面临每秒数万次的搜索请求,需保障系统稳定性和低延迟。
  • 传统方案的痛点
  • 单机数据库或缓存无法承载高并发查询,容易成为性能瓶颈。
  • 分库分表方案复杂,且难以支持跨分片的复杂查询。
  • Elasticsearch 的解决能力
  • 分布式架构:数据自动分片(Sharding)和副本(Replica),支持水平扩展,轻松应对高并发。
  • 负载均衡与容灾:节点故障时自动切换,保障服务可用性。

5. 个性化推荐与用户行为分析

  • 需求场景
    根据用户历史行为(如搜索、点击、购买记录)推荐相关商品,提升转化率。
  • 传统方案的痛点
  • 基于规则或离线批处理的推荐系统延迟高,无法实时响应用户行为。
  • Elasticsearch 的解决能力
  • 结合用户画像:存储用户行为日志,通过聚合分析生成实时兴趣标签。
  • 混合搜索(Hybrid Search):结合关键词搜索和协同过滤(如“购买此商品的人也买了XXX”)。

总结:现实需求的核心驱动

  • 业务层面
  • 提升用户搜索体验,减少跳出率,促进转化。
  • 支持复杂的运营分析(如实时监控热销商品、优化库存策略)。
  • 技术层面
  • 解决传统数据库在海量数据、高并发、复杂查询下的性能瓶颈。
  • 简化开发流程(Elasticsearch提供RESTful API,无需复杂SQL或JOIN操作)。

通过将商品信息存入Elasticsearch,企业能够在竞争激烈的市场中快速响应用户需求,同时降低技术维护成本,实现数据驱动的业务增长。

二、SpringBoot集成ElasticSearch


1、添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>

2、配置 Elasticsearch 连接

application.propertiesapplication.yml 中配置 Elasticsearch 的连接信息:

# application.properties
spring.elasticsearch.rest.uris=http://localhost:9200
spring.data.elasticsearch.repositories.enabled=true

或使用 YAML 格式:

# application.yml
spring:
  elasticsearch:
    rest:
      uris: http://localhost:9200
  data:
    elasticsearch:
      repositories:
        enabled: true

3、创建ES的实体类

package com.bwie.es;

import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.DateFormat;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;

import java.io.Serializable;
import java.util.Date;

/**
 * @author 军哥
 * @version 1.0
 * @description: TODO
 * @date 2025/3/1 9:27
 */

@Data
@Document(indexName = "es_prod", shards = 1, replicas = 1)
public class EsProd implements Serializable {

    @Id
    @Field(type = FieldType.Keyword)
    private Integer prodId;

    @Field(type = FieldType.Text, analyzer = "ik_max_word")
    private String prodName;

    @Field(type = FieldType.Double)
    private Double prodPrice;

    @Field(type = FieldType.Integer)
    private Integer prodStock;

    @Field(type = FieldType.Integer)
    private Integer prodType;

    @Field(type = FieldType.Integer)
    private Integer userId;

    @Field(type = FieldType.Integer)
    private Integer prodStatus;


    @Field(type = FieldType.Keyword, index = false)
    private String nickName;
    @Field(type = FieldType.Keyword, index = false)
    private String typeName;
    @Field(type = FieldType.Keyword, index = false)
    private String statusName;

    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8", shape = JsonFormat.Shape.STRING)
    @Field(type = FieldType.Date, format = DateFormat.custom, pattern = "yyyy-MM-dd HH:mm:ss")
    private Date createTime;

    @Field(type = FieldType.Keyword)
    private String prodCode;
}

4、创建索引

创建索引:

@Override
public R createIndex() {
    //-- 检查索引是否存在,如果存在,删除索引
    IndexOperations indexOps = elasticsearchRestTemplate.indexOps(EsProd.class);
    boolean exists = indexOps.exists();
    if(exists) {
        System.out.println("索引存在,删除索引");
        indexOps.delete();
    }

    //-- 创建索引
    indexOps.create();

    //-- 创建映射
    Document mapping = indexOps.createMapping();
    indexOps.putMapping(mapping);

    return R.OK();
}

从数据库同步到ElasticSearch中:

package com.bwie.task;

import cn.hutool.core.bean.BeanUtil;
import com.bwie.es.EsProd;
import com.bwie.mapper.TbProdMapper;
import com.bwie.pojo.TbProd;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.data.elasticsearch.core.IndexOperations;
import org.springframework.data.elasticsearch.core.document.Document;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * @author 军哥
 * @version 1.0
 * @description: TODO
 * @date 2025/3/1 10:19
 */

@Component
@Slf4j
public class SyncProdTask {

    @Autowired
    TbProdMapper tbProdMapper;

    @Autowired
    ElasticsearchRestTemplate elasticsearchRestTemplate;

    /**
     * @description 同步商品到ES中,每分钟执行一次
     * @params []
     * @return void
     * @author 军哥
     * @date 2025/3/1 10:20
     */
//    @Scheduled(cron = "0 0/1 * * * ?")
    public void syncProdToEs() {
        log.info("同步商品到ES中...");

        //-- 判断索引是否存在,如果不存在,创建索引
        IndexOperations indexOps = elasticsearchRestTemplate.indexOps(EsProd.class);
        boolean exists = indexOps.exists();
        if(!exists) {
            System.out.println("索引不存在,创建索引");
            indexOps.create();

            //-- 创建映射
            Document mapping = indexOps.createMapping();
            indexOps.putMapping(mapping);
        }


        //-- 获取商品列表
        List<TbProd> tbProds = tbProdMapper.getProdList();

        //-- 循环商品列表,并保存到ES中
        for (TbProd tbProd : tbProds) {
            EsProd esProd = BeanUtil.toBean(tbProd, EsProd.class);
            elasticsearchRestTemplate.save(esProd);
        }

        log.info("同步商品到ES中完成...");
    }

}

三、ElasticSearch数据操作


1、保存文档

@Override
public R addProdToEs(Integer prodId) {

    TbProd tbProd = tbProdMapper.selectById(prodId);
    if(tbProd == null) {
        return R.ERROR(404, "商品不存在");
    }

    //-- 将商品信息添加到ES中
    EsProd esProd = BeanUtil.toBean(tbProd, EsProd.class);
    elasticsearchRestTemplate.save(esProd);

    return R.OK();
}

2、根据ID读取文档

@Override
public R getProdFromEs(Integer prodId) {
    EsProd esProd = elasticsearchRestTemplate.get(String.valueOf(prodId), EsProd.class);
    return R.OK(esProd);
}

3、修改文档

@Override
public R updateProdToEs(Integer prodId) {
    TbProd tbProd = tbProdMapper.selectById(prodId);
    if(tbProd == null) {
        return R.ERROR(404, "商品不存在");
    }

    //-- 将商品信息添加到ES中
    // save 根据ID来判断,如果ID存在,修改,如果不存在,添加
    EsProd esProd = BeanUtil.toBean(tbProd, EsProd.class);
    elasticsearchRestTemplate.save(esProd);

    return R.OK();
}

4、删除文档

@Override
public R deleteProdFromEs(Integer prodId) {
    elasticsearchRestTemplate.delete(""+prodId, EsProd.class);
    return R.OK();
}

四、ElasticSearch 难点

1、多条件+排序+高亮+分词

@Override
public R getProdListFromEs(MyPageVo myPageVo) {
    //-- 构造分页参数(ES页面是从0开始的)
    Integer pageNum = 0;
    if(myPageVo.getPageNum() > 0) {
        pageNum = myPageVo.getPageNum() - 1;
    }
    PageRequest pageRequest = PageRequest.of(pageNum, myPageVo.getPageSize());

    //-- 构造查询条件
    BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();

    // 根据商品名称搜索:must=and,should=or
    if(!StringUtils.isEmpty(myPageVo.getProdName())) {
        boolQueryBuilder.must(QueryBuilders.matchQuery("prodName", myPageVo.getProdName()));
    }

    // 根据商品类型搜索:matchQuery 分词查找,termQuery 精确查找
    if(myPageVo.getProdType() != null) {
        boolQueryBuilder.must(QueryBuilders.termQuery("prodType", myPageVo.getProdType()));
    }

    // 根据商品状态搜索
    if(myPageVo.getProdStatus() != null) {
        boolQueryBuilder.must(QueryBuilders.termQuery("prodStatus", myPageVo.getProdStatus()));
    }

    // 根据价格区间搜索
    if(myPageVo.getMinPrice() != null && myPageVo.getMaxPrice() != null) {
        boolQueryBuilder.must(
                QueryBuilders.rangeQuery("prodPrice")
                        .gte(myPageVo.getMinPrice())
                        .lte(myPageVo.getMaxPrice())
        );
    }

    // 根据时间区间搜索
    if(!StringUtils.isEmpty(myPageVo.getBeginTime()) && !StringUtils.isEmpty(myPageVo.getEndTime())) {
        boolQueryBuilder.must(
                QueryBuilders.rangeQuery("createTime")
                        .format("yyyy-MM-dd HH:mm:ss")
                        .timeZone("GMT+8")
                        .gte(myPageVo.getBeginTime())
                        .lte(myPageVo.getEndTime())
        );
    }

    // 高亮查询
    HighlightBuilder highlightBuilder = new HighlightBuilder();
    highlightBuilder.field("prodName").preTags("<font color='red'>").postTags("</font>");

    // 价格排序
    SortBuilder sortBuilder = SortBuilders.fieldSort("prodPrice").order(SortOrder.DESC);


    //-- 执行查询
    NativeSearchQueryBuilder nativeSearchQueryBuilder = new NativeSearchQueryBuilder();
    nativeSearchQueryBuilder.withQuery(boolQueryBuilder)
            .withPageable(pageRequest)
            .withHighlightBuilder(highlightBuilder)
            .withSort(sortBuilder);
    SearchHits<EsProd> searchHits = elasticsearchRestTemplate.search(nativeSearchQueryBuilder.build(), EsProd.class);

    //-- 解析查询结果
    List<EsProd> records = new ArrayList<>();
    Long total = searchHits.getTotalHits();

    for (SearchHit<EsProd> searchHit : searchHits) {
        EsProd esProd = searchHit.getContent();

        //-- 获取高亮字段
        List<String> list = searchHit.getHighlightField("prodName");
        if(list != null && list.size() > 0) {
            esProd.setProdName(list.get(0));
        }

        records.add(esProd);
    }

    HashMap<String, Object> map = new HashMap<>();
    map.put("records", records);
    map.put("total", total);
    return R.OK(map);
}

五、拓展


1、什么是倒排索引?

ES分词器通过扫描文章中的每一个词,对每一个词建立一个索引,指明该词在文章中出现的次数和位置,当用户查询时,检索程序就根据事先建立的索引进行查找,并将查找的结果反馈给用户的检索方式。这种建立索引的方式叫倒排索引。
当数据写入 ES 时,数据将会通过 分词 被切分为不同的term,ES 将term 与其对应的文档列表建立一种映射关系,这种结构就是 倒排索引。如下图所示:

图片alt

3、ES脑裂是怎么回事?(扩展)

脑裂问题,就是同一个集群中的不同节点,对于集群的状态,有了不一样的理解。

由于并发访问量的提高,导致了我们两个节点的集群(分片数默认为5,副本为1,没有固定的master,都是集群中的节点又做data又做master)状态变成了red,出现了大量的坏片,并且坏掉的都是主分片及其副本。分析发现,是ES集群出现了脑裂问题(俗称精神分裂),即**集群中不同的节点对于master的选择出现了分歧,出现了多个master竞争,导致主分片和副本的识别也发生了分歧,对一些分歧中的分片标识为了坏片。**

理解思路

  • 原因1:网络问题,导致由多个主
  • 原因2:主节点负载过大

4、“脑裂”问题是怎么形成的?(扩展)

1.网络问题:集群间的网络延迟导致一些节点访问不到master,认为master挂掉了从而选举出新的master,并对master上的分片和副本标红,分配新的主分片

2.节点负载:主节点的角色既为master又为data,访问量较大时可能会导致ES停止响应造成大面积延迟,此时其他节点得不到主节点的响应认为主节点挂掉了,会重新选取主节点。

3.内存回收:data节点上的ES进程占用的内存较大,引发JVM的大规模内存回收,造成ES进程失去响应。

5、脑裂问题如何解决?(扩展)

1、减少误判。discovery.zen.ping_timeout节点状态的响应时间,默认为3s,可以适当调大,减少误判。
2、选举触发。discovery.zen.minimum_master_nodes:1,该参数是用于控制选举行为发生的最小集群主节点数量。增大该参数,这样参与选举的节点增多,减少选举。
3、角色分离:即master节点与data节点分离,限制角色。
主节点配置为:
node.master: true node.data: false
从节点配置为:
node.master: false node.data: true

6、ES如何添加自定义词库、热更新词库?(扩展)

我们项目中使用中文分词器IK分词器。
1、在ik插件对应的配置文件目录下创建一个自定义词库文件 my.dic。直接在文件中添加词语即可,每一个词语一行。
2、修改ik的IKAnalyzer.cfg.xml配置文件。
3、将修改好的IK配置文件复制到集群中的所有节点中。
4、重启ES验证一下自定义词库的分词效果。

7、ES如何分页?

默认情况下,不加from,size的话,ES会返回前10条记录。加上from,size就会查询指定的条数。其中from代表起始行号,size代表查询行数。

1、深度分页(from+size)
2、快照查询(scroll)
3、Search After