Spring Boot + Elasticsearch 快速整合指南
引言
Elasticsearch作為高性能的分布式搜索引擎,在現(xiàn)代應(yīng)用開(kāi)發(fā)中被廣泛使用。具有如下特點(diǎn):
- 一個(gè)分布式的實(shí)時(shí)文檔存儲(chǔ)引擎,每個(gè)字段都可以被索引與搜索。
- 一個(gè)分布式實(shí)時(shí)分析搜索引擎,支持各種查詢和聚合操作。
- 能勝任上百個(gè)服務(wù)節(jié)點(diǎn)的擴(kuò)展,并可以支持PB級(jí)別的結(jié)構(gòu)化或者非結(jié)構(gòu)化數(shù)據(jù)。
倒排索引
倒排索引是整個(gè)Elasticsearch的核心,正常的搜索以一本書為例,應(yīng)該是由目錄 -> 章節(jié) -> 頁(yè)碼 -> 內(nèi)容這樣的查找順序,這樣是正排索引的思想。
但是設(shè)想一下,我在一本書中快速查找elasticsearch這個(gè)關(guān)鍵字所在的頁(yè)面該怎么辦?
倒排索引的思路是通過(guò)單詞到文檔ID的關(guān)系對(duì)應(yīng)。
圖片
本文將詳細(xì)介紹通過(guò)ElasticsearchRepository和ElasticsearchRestTemplate兩種方式實(shí)現(xiàn)整合的方法。
案例
使用 ElasticsearchRepository
ElasticsearchRepository是Spring Data提供的接口,通過(guò)繼承該接口,可快速實(shí)現(xiàn)基本的CRUD操作,極大地簡(jiǎn)化了開(kāi)發(fā)流程。
1. 創(chuàng)建Repository接口:繼承ElasticsearchRepository,并指定實(shí)體類和主鍵類型,還可自定義查詢方法。
public interface DemoRepository extends ElasticsearchRepository<Demo, String> {
// 自定義查詢方法
List<Demo> findByImsi(String imsi);
// 使用@Query注解定義DSL查詢
@Query("{\"bool\": {\"must\": [{\"match\": {\"imsi\": \"?0\"}}], \"filter\": {\"range\": {\"costTime\": {\"gte\": ?1, \"lte\": ?2}}}}}")
List<Demo> findByImsiAndPriceRange(String imsi, double min, double max);
}2. 服務(wù)層實(shí)現(xiàn):在服務(wù)類中注入Repository,調(diào)用其方法完成數(shù)據(jù)操作。
@Service
public class DemoService {
@Autowired
private DemoRepository demoRepository;
public Demo save(Demo demo) {
return demoRepository.save(demo);
}
public Optional<Demo> findById(String id) {
return demoRepository.findById(id);
}
public List<Demo> findByName(String imsi) {
return demoRepository.findByImsi(imsi);
}
public Iterable<Demo> findAll() {
return demoRepository.findAll();
}
public void delete(Demo demo) {
demoRepository.delete(demo);
}
public List<Demo> findByImsiAndPriceRange(String imsi, double min, double max) {
return demoRepository.findByImsiAndPriceRange(imsi, min, max);
}
}使用 ElasticsearchRestTemplate
1. 配置ElasticsearchRestTemplate
@Configuration
public class ElasticsearchConfig extends AbstractElasticsearchConfiguration {
@Value("${spring.elasticsearch.uris: localhost:9200}")
private String[] uris;
@Bean(name = { "elasticsearchOperations", "elasticsearchRestTemplate" })
public ElasticsearchRestTemplate elasticsearchTemplate() {
return new ElasticsearchRestTemplate(elasticsearchClient());
}
@Override
public RestHighLevelClient elasticsearchClient() {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("username", "password"));
HttpHost[] httpHosts = Arrays.stream(uris).map(HttpHost::create).toArray(HttpHost[]::new);
RestClientBuilder restClientBuilder = RestClient.builder(httpHosts)
.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
return new RestHighLevelClient(restClientBuilder);
}
}2. 服務(wù)層實(shí)現(xiàn):在服務(wù)類中注入ElasticsearchRestTemplate,通過(guò)構(gòu)建查詢條件實(shí)現(xiàn)各種數(shù)據(jù)操作。
@Service
public class DslQueryService {
@Autowired
private ElasticsearchRestTemplate elasticsearchRestTemplate;
// 1. 基本Match查詢
public List<Demo> searchByKeyword(String keyword) {
NativeSearchQuery query = new NativeSearchQueryBuilder()
.withQuery(QueryBuilders.matchQuery("imsi", keyword))
.build();
SearchHits<Demo> searchHits = elasticsearchRestTemplate.search(query, Demo.class);
return searchHits.getSearchHits().stream()
.map(SearchHit::getContent)
.collect(Collectors.toList());
}
// 2. 組合Bool查詢
public List<Demo> complexSearch(String imsi, Double min, Double max, String desc) {
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
if (imsi != null &&!imsi.isEmpty()) {
boolQuery.must(QueryBuilders.matchQuery("imsi", imsi));
}
if (min != null && max != null) {
boolQuery.filter(QueryBuilders.rangeQuery("costTime").gte(min).lte(max));
}
if (desc != null &&!desc.isEmpty()) {
boolQuery.filter(QueryBuilders.termQuery("desc", desc));
}
NativeSearchQuery query = new NativeSearchQueryBuilder()
.withQuery(boolQuery)
.withPageable(PageRequest.of(0, 20))
.build();
SearchHits<Demo> searchHits = elasticsearchRestTemplate.search(query, Demo.class);
return searchHits.getSearchHits().stream()
.map(SearchHit::getContent)
.collect(Collectors.toList());
}
// 3. 聚合查詢示例
public void getCategoryCounts() {
SearchRequest searchRequest = new SearchRequest("demo");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must(QueryBuilders.termsQuery("imsi", "test","000","1"));
searchSourceBuilder.query(boolQueryBuilder);
searchSourceBuilder.size(0);
searchSourceBuilder.trackTotalHits(true);
Script scriptGroup = new Script("doc['imsi'].value");
TermsAggregationBuilder termsAggregationBuilder = AggregationBuilders.terms("by_imsi").script(scriptGroup).size(10);
termsAggregationBuilder.subAggregation(AggregationBuilders.sum("sumTime").field("costTime"));
// Map<String, String> bucketsPathsMap = new HashMap<>();
// bucketsPathsMap.put("sumTime", "sumTime");
// BucketSelectorPipelineAggregationBuilder selectorPipelineAggregationBuilder = PipelineAggregatorBuilders
// .bucketSelector("having_count", bucketsPathsMap, new Script("params.sumTime<10000"));
// termsAggregationBuilder.subAggregation(selectorPipelineAggregationBuilder);
TopHitsAggregationBuilder topHit = new TopHitsAggregationBuilder("top_result").size(10);
termsAggregationBuilder.subAggregation(topHit);
searchSourceBuilder.aggregation(termsAggregationBuilder);
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = elasticsearchRestTemplate.execute(client -> {
return client.search(searchRequest, RequestOptions.DEFAULT);
});
Terms terms = (Terms) searchResponse.getAggregations().get("by_imsi");
for(Terms.Bucket bucket : terms.getBuckets()) {
Aggregations aggregations = bucket.getAggregations();
Sum sum = aggregations.get("sumTime");
System.out.println(bucket.getKeyAsString()+":"+bucket.getDocCount()+":"+sum.getValueAsString());
}
}
// 4. 滾動(dòng)查詢示例
public List<Demo> scrollSearch(String scrollId, int pageSize) {
SearchScrollHits<Demo> searchScrollHits;
if (scrollId == null) {
NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(QueryBuilders.matchAllQuery())
.withPageable(PageRequest.of(0, pageSize))
.build();
searchScrollHits = elasticsearchRestTemplate.searchScrollStart(30000L, searchQuery, Demo.class, IndexCoordinates.of("demo"));
} else {
searchScrollHits = elasticsearchRestTemplate.searchScrollContinue(scrollId, 30000L, Demo.class, IndexCoordinates.of("demo"));
}
elasticsearchRestTemplate.searchScrollClear(Collections.singletonList(searchScrollHits.getScrollId()));
return searchScrollHits.getSearchHits().stream()
.map(SearchHit::getContent)
.collect(Collectors.toList());
}
}測(cè)試方法
@Slf4j
@SpringBootTest
public class TestDemo {
@Autowired
private DemoService demoService;
@Autowired
private DslQueryService dslQueryService;
@Autowired
private ElasticsearchRestTemplate elasticsearchRestTemplate;
@Test
public void test1(){
demoService.findById("vkwztJMBXiMbcxs-8Npt").ifPresent(demo -> log.info(demo.toString()));
}
@Test
public void test2(){
demoService.findByImsiAndPriceRange("test", 0.0, 50.0).forEach(demo -> log.info(demo.toString()));
}
@Test
public void test3(){
dslQueryService.searchByKeyword("test").forEach(demo -> log.info(demo.toString()));
}
@Test
public void test4(){
dslQueryService.getCategoryCounts();
}
@Test
public void test5(){
dslQueryService.scrollSearch(null, 10).forEach(demo -> log.info(demo.toString()));
}
@Test
public void test6(){
Boolean flag = elasticsearchRestTemplate.indexOps(Demo.class).exists();
if (flag == false) {
log.info(" createIndex.......");
elasticsearchRestTemplate.indexOps(Demo.class).create();
elasticsearchRestTemplate.indexOps(Demo.class).putMapping(Demo.class);
} else {
String indexName = elasticsearchRestTemplate.getIndexCoordinatesFor(Demo.class).getIndexName();
log.info(" refreshIndex......");
refreshAsync(indexName);
}
}
@Test
public void test7(){
List list = new ArrayList();
Demo bean = new Demo("test", "test", "test", "test", "test", 1L);
IndexQuery indexQuery = new IndexQueryBuilder().withSource(JSONObject.toJSONString(bean)).build();
list.add(indexQuery);
elasticsearchRestTemplate.bulkIndex(list, Demo.class);
}
public void refreshAsync(String index) {
try {
elasticsearchRestTemplate.execute(client -> client.indices().refreshAsync(refreshRequest(index), RequestOptions.DEFAULT, new ActionListener<RefreshResponse>() {
@Override
public void onResponse(RefreshResponse refreshResponse) {
}
@Override
public void onFailure(Exception e) {
log.info("failed callback to refresh index={},exception--->{}" + index, e);
}
}));
} catch (Exception e) {
log.info("failed to refresh index={},exception--->{}" + index, e);
}
}
}復(fù)制聚合場(chǎng)景
- 使用嵌套的terms聚合實(shí)現(xiàn)三級(jí)分組:時(shí)間、域和 IMSI
- 對(duì)每個(gè)分組添加計(jì)數(shù)聚合,計(jì)算總數(shù)和失敗數(shù)
- 使用filter聚合篩選失敗記錄(resulCode 不為 "0000")
// 構(gòu)建基礎(chǔ)查詢條件
BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
// 時(shí)間范圍條件
String startTime = jsonParam.getString("startTime");
String endTime = jsonParam.getString("endTime");
if (!StringUtils.isEmpty(startTime) && !StringUtils.isEmpty(endTime)) {
startTime = startTime + ":00";
endTime = endTime + ":59";
boolQueryBuilder.must(QueryBuilders.rangeQuery("rtime").gte(startTime).lte(endTime));
}
// 數(shù)據(jù)域權(quán)限條件
List<String> vpndomains = CommonTools.strList(perms);
if (CollectionUtils.isNotEmpty(vpndomains)) {
boolQueryBuilder.must(QueryBuilders.termsQuery("vpdndomain", vpndomains));
}
// 數(shù)據(jù)源類型條件
if (!StringUtils.isEmpty(publicPerms) && publicPerms.contains("oldora")) {
boolQueryBuilder.must(QueryBuilders.matchQuery("sourceType", 4));
} else {
boolQueryBuilder.must(QueryBuilders.matchQuery("sourceType", 1));
}
// 構(gòu)建聚合查詢
NativeSearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(boolQueryBuilder)
// 使用date_histogram聚合按分鐘分組
.addAggregation(
AggregationBuilders.dateHistogram("by_minute")
.field("rtime")
.fixedInterval(DateHistogramInterval.MINUTE)
.format("yyyy-MM-dd HH:mm")
.subAggregation(
AggregationBuilders.terms("by_domain")
.field("vpdndomain")
.subAggregation(
AggregationBuilders.terms("by_imsi")
.field("imsi")
.subAggregation(
// 統(tǒng)計(jì)總數(shù)
AggregationBuilders.count("total_count").field("_index")
)
.subAggregation(
// 統(tǒng)計(jì)失敗數(shù)
AggregationBuilders.filter("fail_count",QueryBuilders.boolQuery()
.mustNot(QueryBuilders.termQuery("resulCode", "0000")))
)
)
)
)
.build();
// 執(zhí)行查詢
SearchHits<Authlog> searchHits = elasticsearchRestTemplate.search(searchQuery, Authlog.class, IndexCoordinates.of(authlog_index_name));
// 處理聚合結(jié)果
List<Map> statsList = new ArrayList<>();
Integer overLimitCount = jsonParam.getInteger("overLimitCount");
Histogram timeTerms = searchHits.getAggregations().get("by_minute");
if (timeTerms != null) {
for (Histogram.Bucket timeBucket : timeTerms.getBuckets()) {
String rtime = timeBucket.getKeyAsString();
Terms domainTerms = timeBucket.getAggregations().get("by_domain");
for (Terms.Bucket domainBucket : domainTerms.getBuckets()) {
String vpdndomain = domainBucket.getKeyAsString();
Terms imsiTerms = domainBucket.getAggregations().get("by_imsi");
for (Terms.Bucket imsiBucket : imsiTerms.getBuckets()) {
String imsi = imsiBucket.getKeyAsString();
// 獲取總數(shù)
ValueCount totalCount = imsiBucket.getAggregations().get("total_count");
long total = totalCount.getValue();
// 跳過(guò)不滿足閾值的記錄
if (total < overLimitCount) continue;
// 獲取失敗數(shù)
Filter failCount = imsiBucket.getAggregations().get("fail_count");
long fail = failCount.getDocCount();
// 構(gòu)建結(jié)果
Map<Object, Object> result = MapUtil.builder()
.put("rtime", rtime)
.put("vpdndomain", vpdndomain)
.put("imsi", imsi)
.put("total", total)
.put("fail", fail)
.map();
statsList.add(result);
}
}
}
}在實(shí)際項(xiàng)目中,可根據(jù)需求靈活選擇:
- 對(duì)于簡(jiǎn)單的CRUD操作和基礎(chǔ)查詢,優(yōu)先選擇ElasticsearchRepository,其簡(jiǎn)潔的代碼結(jié)構(gòu)能快速完成開(kāi)發(fā)。
- 若涉及復(fù)雜的查詢邏輯、聚合分析或自定義操作,ElasticsearchRestTemplate更能滿足需求,開(kāi)發(fā)者可通過(guò)構(gòu)建DSL實(shí)現(xiàn)強(qiáng)大的搜索功能。
































