根据公开的信息,淘宝平台上的商品数量已经超过了数十亿个,每天有成千上万的新商品上架。根据公开的数据,京东平台上的商品种类已经达到数亿个。如何存储和检索商品是一个现实而有严峻的问题。
将商品信息存入 Elasticsearch 的现实需求主要源于现代电商和零售行业对高效搜索、灵活查询和实时数据分析的迫切需求。以下是具体场景和实际问题的描述:
LIKE
查询性能低下,无法应对海量商品数据(如百万级SKU)。fuzzy search
)和同义词扩展,提升搜索容错性。WHERE price BETWEEN 100 AND 500 AND brand='A'
)在大数据量下性能急剧下降。must
、should
、must_not
)。通过将商品信息存入Elasticsearch,企业能够在竞争激烈的市场中快速响应用户需求,同时降低技术维护成本,实现数据驱动的业务增长。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
在 application.properties
或 application.yml
中配置 Elasticsearch 的连接信息:
# application.properties
spring.elasticsearch.rest.uris=http://localhost:9200
spring.data.elasticsearch.repositories.enabled=true
或使用 YAML 格式:
# application.yml
spring:
elasticsearch:
rest:
uris: http://localhost:9200
data:
elasticsearch:
repositories:
enabled: true
package com.bwie.es;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.DateFormat;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
import java.io.Serializable;
import java.util.Date;
/**
* @author 军哥
* @version 1.0
* @description: TODO
* @date 2025/3/1 9:27
*/
@Data
@Document(indexName = "es_prod", shards = 1, replicas = 1)
public class EsProd implements Serializable {
@Id
@Field(type = FieldType.Keyword)
private Integer prodId;
@Field(type = FieldType.Text, analyzer = "ik_max_word")
private String prodName;
@Field(type = FieldType.Double)
private Double prodPrice;
@Field(type = FieldType.Integer)
private Integer prodStock;
@Field(type = FieldType.Integer)
private Integer prodType;
@Field(type = FieldType.Integer)
private Integer userId;
@Field(type = FieldType.Integer)
private Integer prodStatus;
@Field(type = FieldType.Keyword, index = false)
private String nickName;
@Field(type = FieldType.Keyword, index = false)
private String typeName;
@Field(type = FieldType.Keyword, index = false)
private String statusName;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss", timezone = "GMT+8", shape = JsonFormat.Shape.STRING)
@Field(type = FieldType.Date, format = DateFormat.custom, pattern = "yyyy-MM-dd HH:mm:ss")
private Date createTime;
@Field(type = FieldType.Keyword)
private String prodCode;
}
创建索引:
@Override
public R createIndex() {
//-- 检查索引是否存在,如果存在,删除索引
IndexOperations indexOps = elasticsearchRestTemplate.indexOps(EsProd.class);
boolean exists = indexOps.exists();
if(exists) {
System.out.println("索引存在,删除索引");
indexOps.delete();
}
//-- 创建索引
indexOps.create();
//-- 创建映射
Document mapping = indexOps.createMapping();
indexOps.putMapping(mapping);
return R.OK();
}
从数据库同步到ElasticSearch中:
package com.bwie.task;
import cn.hutool.core.bean.BeanUtil;
import com.bwie.es.EsProd;
import com.bwie.mapper.TbProdMapper;
import com.bwie.pojo.TbProd;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;
import org.springframework.data.elasticsearch.core.IndexOperations;
import org.springframework.data.elasticsearch.core.document.Document;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
/**
* @author 军哥
* @version 1.0
* @description: TODO
* @date 2025/3/1 10:19
*/
@Component
@Slf4j
public class SyncProdTask {
@Autowired
TbProdMapper tbProdMapper;
@Autowired
ElasticsearchRestTemplate elasticsearchRestTemplate;
/**
* @description 同步商品到ES中,每分钟执行一次
* @params []
* @return void
* @author 军哥
* @date 2025/3/1 10:20
*/
// @Scheduled(cron = "0 0/1 * * * ?")
public void syncProdToEs() {
log.info("同步商品到ES中...");
//-- 判断索引是否存在,如果不存在,创建索引
IndexOperations indexOps = elasticsearchRestTemplate.indexOps(EsProd.class);
boolean exists = indexOps.exists();
if(!exists) {
System.out.println("索引不存在,创建索引");
indexOps.create();
//-- 创建映射
Document mapping = indexOps.createMapping();
indexOps.putMapping(mapping);
}
//-- 获取商品列表
List<TbProd> tbProds = tbProdMapper.getProdList();
//-- 循环商品列表,并保存到ES中
for (TbProd tbProd : tbProds) {
EsProd esProd = BeanUtil.toBean(tbProd, EsProd.class);
elasticsearchRestTemplate.save(esProd);
}
log.info("同步商品到ES中完成...");
}
}
@Override
public R addProdToEs(Integer prodId) {
TbProd tbProd = tbProdMapper.selectById(prodId);
if(tbProd == null) {
return R.ERROR(404, "商品不存在");
}
//-- 将商品信息添加到ES中
EsProd esProd = BeanUtil.toBean(tbProd, EsProd.class);
elasticsearchRestTemplate.save(esProd);
return R.OK();
}
@Override
public R getProdFromEs(Integer prodId) {
EsProd esProd = elasticsearchRestTemplate.get(String.valueOf(prodId), EsProd.class);
return R.OK(esProd);
}
@Override
public R updateProdToEs(Integer prodId) {
TbProd tbProd = tbProdMapper.selectById(prodId);
if(tbProd == null) {
return R.ERROR(404, "商品不存在");
}
//-- 将商品信息添加到ES中
// save 根据ID来判断,如果ID存在,修改,如果不存在,添加
EsProd esProd = BeanUtil.toBean(tbProd, EsProd.class);
elasticsearchRestTemplate.save(esProd);
return R.OK();
}
@Override
public R deleteProdFromEs(Integer prodId) {
elasticsearchRestTemplate.delete(""+prodId, EsProd.class);
return R.OK();
}
@Override
public R getProdListFromEs(MyPageVo myPageVo) {
//-- 构造分页参数(ES页面是从0开始的)
Integer pageNum = 0;
if(myPageVo.getPageNum() > 0) {
pageNum = myPageVo.getPageNum() - 1;
}
PageRequest pageRequest = PageRequest.of(pageNum, myPageVo.getPageSize());
//-- 构造查询条件
BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
// 根据商品名称搜索:must=and,should=or
if(!StringUtils.isEmpty(myPageVo.getProdName())) {
boolQueryBuilder.must(QueryBuilders.matchQuery("prodName", myPageVo.getProdName()));
}
// 根据商品类型搜索:matchQuery 分词查找,termQuery 精确查找
if(myPageVo.getProdType() != null) {
boolQueryBuilder.must(QueryBuilders.termQuery("prodType", myPageVo.getProdType()));
}
// 根据商品状态搜索
if(myPageVo.getProdStatus() != null) {
boolQueryBuilder.must(QueryBuilders.termQuery("prodStatus", myPageVo.getProdStatus()));
}
// 根据价格区间搜索
if(myPageVo.getMinPrice() != null && myPageVo.getMaxPrice() != null) {
boolQueryBuilder.must(
QueryBuilders.rangeQuery("prodPrice")
.gte(myPageVo.getMinPrice())
.lte(myPageVo.getMaxPrice())
);
}
// 根据时间区间搜索
if(!StringUtils.isEmpty(myPageVo.getBeginTime()) && !StringUtils.isEmpty(myPageVo.getEndTime())) {
boolQueryBuilder.must(
QueryBuilders.rangeQuery("createTime")
.format("yyyy-MM-dd HH:mm:ss")
.timeZone("GMT+8")
.gte(myPageVo.getBeginTime())
.lte(myPageVo.getEndTime())
);
}
// 高亮查询
HighlightBuilder highlightBuilder = new HighlightBuilder();
highlightBuilder.field("prodName").preTags("<font color='red'>").postTags("</font>");
// 价格排序
SortBuilder sortBuilder = SortBuilders.fieldSort("prodPrice").order(SortOrder.DESC);
//-- 执行查询
NativeSearchQueryBuilder nativeSearchQueryBuilder = new NativeSearchQueryBuilder();
nativeSearchQueryBuilder.withQuery(boolQueryBuilder)
.withPageable(pageRequest)
.withHighlightBuilder(highlightBuilder)
.withSort(sortBuilder);
SearchHits<EsProd> searchHits = elasticsearchRestTemplate.search(nativeSearchQueryBuilder.build(), EsProd.class);
//-- 解析查询结果
List<EsProd> records = new ArrayList<>();
Long total = searchHits.getTotalHits();
for (SearchHit<EsProd> searchHit : searchHits) {
EsProd esProd = searchHit.getContent();
//-- 获取高亮字段
List<String> list = searchHit.getHighlightField("prodName");
if(list != null && list.size() > 0) {
esProd.setProdName(list.get(0));
}
records.add(esProd);
}
HashMap<String, Object> map = new HashMap<>();
map.put("records", records);
map.put("total", total);
return R.OK(map);
}
ES分词器通过扫描文章中的每一个词,对每一个词建立一个索引,指明该词在文章中出现的次数和位置,当用户查询时,检索程序就根据事先建立的索引进行查找,并将查找的结果反馈给用户的检索方式。这种建立索引的方式叫倒排索引。
当数据写入 ES 时,数据将会通过 分词 被切分为不同的term,ES 将term 与其对应的文档列表建立一种映射关系,这种结构就是 倒排索引。如下图所示:
脑裂问题,就是同一个集群中的不同节点,对于集群的状态,有了不一样的理解。
由于并发访问量的提高,导致了我们两个节点的集群(分片数默认为5,副本为1,没有固定的master,都是集群中的节点又做data又做master)状态变成了red,出现了大量的坏片,并且坏掉的都是主分片及其副本。分析发现,是ES集群出现了脑裂问题(俗称精神分裂),即**集群中不同的节点对于master的选择出现了分歧,出现了多个master竞争,导致主分片和副本的识别也发生了分歧,对一些分歧中的分片标识为了坏片。**
1.网络问题:集群间的网络延迟导致一些节点访问不到master,认为master挂掉了从而选举出新的master,并对master上的分片和副本标红,分配新的主分片
2.节点负载:主节点的角色既为master又为data,访问量较大时可能会导致ES停止响应造成大面积延迟,此时其他节点得不到主节点的响应认为主节点挂掉了,会重新选取主节点。
3.内存回收:data节点上的ES进程占用的内存较大,引发JVM的大规模内存回收,造成ES进程失去响应。
1、减少误判。discovery.zen.ping_timeout节点状态的响应时间,默认为3s,可以适当调大,减少误判。
2、选举触发。discovery.zen.minimum_master_nodes:1,该参数是用于控制选举行为发生的最小集群主节点数量。增大该参数,这样参与选举的节点增多,减少选举。
3、角色分离:即master节点与data节点分离,限制角色。
主节点配置为:
node.master: true node.data: false
从节点配置为:
node.master: false node.data: true
我们项目中使用中文分词器IK分词器。
1、在ik插件对应的配置文件目录下创建一个自定义词库文件 my.dic。直接在文件中添加词语即可,每一个词语一行。
2、修改ik的IKAnalyzer.cfg.xml配置文件。
3、将修改好的IK配置文件复制到集群中的所有节点中。
4、重启ES验证一下自定义词库的分词效果。
默认情况下,不加from,size的话,ES会返回前10条记录。加上from,size就会查询指定的条数。其中from代表起始行号,size代表查询行数。
1、深度分页(from+size)
2、快照查询(scroll)
3、Search After