ES + Spring boot的正确姿势 (ES系列三)

您所在的位置:网站首页 spring对应的词 ES + Spring boot的正确姿势 (ES系列三)

ES + Spring boot的正确姿势 (ES系列三)

2023-12-24 21:28| 来源: 网络整理| 查看: 265

前言

在前边我们探讨了ES的基本概念以及根据不同的场景选择数据迁移的方案。在这一篇我们来探讨如何与Spring boot集成,以及为了平滑地从Mysql迁移到ES中我们如何”翻译SQL“。

一、Spring Boot集成ES

第一步我们就要实现Spring boot和ES集成,在Spring boot中主要有Java REST Client、spring-data-elasticsearch两种方式,这里我建议使用Elasticsearch官方提供的Java High Level REST Client来集成,也方便在生产环境中使用阿里云的ES云服务。关键的版本信息如下:

ES集群:7.3.0 ES相关依赖:7.3.0

这里有两点需要注意:

High Level Client能够向上兼容,例如7.3.0版本的Java High Level REST Client能确保与大于等于7.3.0版本的Elasticsearch集群通信。为了保证最大程度地使用最新版客户端的特性,推荐High Level Client版本与集群版本一致。

在集成的过程中可能会踩到一些坑,因为Spring Boot的版本、ES集群的版本、High Level Client的版本之间会存在”关联关系“,所以当Demo无法正常跑起来的时候能做的就是多尝试一些High Level Client版本。

1、pom依赖 org.elasticsearch.client elasticsearch-rest-high-level-client 7.3.0 org.elasticsearch elasticsearch 7.3.0 org.elasticsearch.client elasticsearch-rest-client 7.3.0 org.elasticsearch.plugin rank-eval-client 7.3.0 org.elasticsearch.plugin lang-mustache-client 7.3.0 2、初始化客户端 @Configuration public class EsConfig { @Value("${elasticsearch.host}") public String host; /** * 之前使用transport的接口的时候是9300端口,现在使用HighLevelClient则是9200端口 */ @Value("${elasticsearch.port:9200}") public int port; public static final String SCHEME = "http"; @Value("${elasticsearch.username:admin}") public String username; @Value("${elasticsearch.authenticationPassword}") public String authenticationPassword; @Bean(name = "remoteHighLevelClient") public RestHighLevelClient restHighLevelClient() { final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, authenticationPassword)); RestClientBuilder builder = RestClient.builder(new HttpHost(host, port, SCHEME)). setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder .setDefaultCredentialsProvider(credentialsProvider)); return new RestHighLevelClient(builder); } }

在上边的代码中需要注意username和authenticationPassword的认证信息都是在Kibana中设置的。

二、Java API

下面的代码片段均能在单元测试中正常运行,在执行下边的单元测试之前,我们先创建一个_template,大家可以选择在Kibana提供的Dev Tools里边执行。

PUT _template/hero_template { "index_patterns":[ "hero*" ], "mappings":{ "properties":{ "@timestamp":{ "type":"date" }, "id":{ "type":"integer" }, "name":{ "type":"keyword" }, "country":{ "type":"keyword" }, "birthday":{ "type":"keyword" }, "longevity":{ "type":"integer" } } } } 1、创建索引 @Test public void createIndex() throws IOException { IndexRequest request = new IndexRequest("hero"); request.id("1"); Map map = new HashMap(); map.put("id", "1"); map.put("name", "曹操"); map.put("country", "魏"); map.put("birthday", "公元155年"); map.put("longevity", "65"); request.source(map); IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT); long version = indexResponse.getVersion(); assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult()); assertEquals(1, version); }

在ES中索引是我们存储、查询数据的逻辑单元,在ES7.0之后对应的是Mysql中表的概念。上边的代码我们创建了一个名为hero的索引,然后我们创建一个map作为我们插入的第一条数据,然后设置到IndexRequest请求对象中。

2、批量插入 @Test public void bulkRequestTest() throws IOException { BulkRequest request = new BulkRequest(); request.add(new IndexRequest("hero").id("2") .source(XContentType.JSON,"id", "2", "name", "刘备", "country", "蜀", "birthday", "公元161年", "longevity", "61")); request.add(new IndexRequest("hero").id("3") .source(XContentType.JSON,"id", "3", "name", "孙权", "country", "吴", "birthday", "公元182年", "longevity", "61")); request.add(new IndexRequest("hero").id("4") .source(XContentType.JSON,"id", "4", "name", "诸葛亮", "country", "蜀", "birthday", "公元181年", "longevity", "53")); request.add(new IndexRequest("hero").id("5") .source(XContentType.JSON,"id", "5", "name", "司马懿", "country", "魏", "birthday", "公元179年", "longevity", "72")); request.add(new IndexRequest("hero").id("6") .source(XContentType.JSON,"id", "6", "name", "荀彧", "country", "魏", "birthday", "公元163年", "longevity", "49")); request.add(new IndexRequest("hero").id("7") .source(XContentType.JSON,"id", "7", "name", "关羽", "country", "蜀", "birthday", "公元160年", "longevity", "60")); request.add(new IndexRequest("hero").id("8") .source(XContentType.JSON,"id", "8", "name", "周瑜", "country", "吴", "birthday", "公元175年", "longevity", "35")); BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT); assertFalse(bulkResponse.hasFailures()); }

在kibana中查询到的数据如下图

我们后边的查询、更新等等操作都是基于这里的数据。

3、更新数据 @Test public void updateTest() throws IOException { Map jsonMap = new HashMap(); jsonMap.put("country", "魏"); UpdateRequest request = new UpdateRequest("hero", "7").doc(jsonMap); UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT); assertEquals(DocWriteResponse.Result.UPDATED, updateResponse.getResult()); }

上边的代码如果用SQL来表示就是下边这样

> update hero set country='魏' where id=7; 4、插入/更新数据 @Test public void insertOrUpdateOne(){ Hero hero = new Hero(); hero.setId(5); hero.setName("曹丕"); hero.setCountry("魏"); hero.setBirthday("公元187年"); hero.setLongevity(39); IndexRequest request = new IndexRequest("hero"); request.id(hero.getId().toString()); request.source(JSON.toJSONString(hero), XContentType.JSON); try { IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT); // 1 assertEquals(DocWriteResponse.Result.UPDATED, indexResponse.getResult()); } catch (Exception e) { throw new RuntimeException(e); } }

注意在上边代码中标注1的这行代码,是不是和前边创建索引很像?这里使用方法index()我们可以轻松的实现创建索引、插入数据、更新数据于一体,当指定的索引不存在时即创建索引,当数据不存在时就插入,数据存在时就更新。

5、删除数据 @Test public void deleteByIdTest() throws IOException { DeleteRequest deleteRequest = new DeleteRequest("hero"); deleteRequest.id("1"); DeleteResponse deleteResponse = client.delete(deleteRequest, RequestOptions.DEFAULT); assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult()); }

上边我们删除了在前边创建id=1的数据,其对应的SQL如下:

> delete from hero where id=1;

当然,在ES中我们不仅仅可以使用主键来删除,我们还可以通过其他的字段条件来删除。

@Test public void deleteByQueryRequestTest() throws IOException { DeleteByQueryRequest request = new DeleteByQueryRequest("hero"); request.setConflicts("proceed"); request.setQuery(new TermQueryBuilder("country", "吴")); BulkByScrollResponse bulkResponse = client.deleteByQuery(request, RequestOptions.DEFAULT); assertEquals(0, bulkResponse.getBulkFailures().size()); }

对应的SQL:

> delete from hero where country='吴'; 6、复合操作

在上边的增删改都是一次只能操作一种类型,而ES还给我们提供了一次进行多种类型的操作,例如下边的代码

@Test public void bulkDiffRequestTest() throws IOException { BulkRequest request = new BulkRequest(); request.add(new DeleteRequest("hero", "3")); request.add(new UpdateRequest("hero", "7") .doc(XContentType.JSON,"longevity", "70")); BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT); BulkItemResponse[] bulkItemResponses = bulkResponse.getItems(); for (BulkItemResponse item : bulkItemResponses){ DocWriteResponse itemResponse = item.getResponse(); switch (item.getOpType()) { case UPDATE: UpdateResponse updateResponse = (UpdateResponse) itemResponse; break; case DELETE: DeleteResponse deleteResponse = (DeleteResponse) itemResponse; } assertEquals(RestStatus.OK, item.status()); } }

我们使用了BulkRequest对象,将DeleteRequest、UpdateRequest两种操作add到BulkRequet中,然后将返回的BulkItemResponse[]数组根据不同的操作类型进行分类处理即可。当然据我所知,目前Mysql并没有类似的语法支持,如果有希望大家留言指正哈。

7、查询

到这里才是我们真正的重点,在ES里边支持多种类型的查询,例如**”精确“(和RDBMS有所区别)查询、模糊查询、相关性查询、范围查询、全文检索、分页查询、排序、聚合**等等查询功能,在Mysql中的大部分查询功能在ES中均能实现。同还允许我们选择同步、异步的方式来执行查询

单条件查询 + limit @Test public void selectByUserTest(){ SearchRequest request = new SearchRequest("hero"); SearchSourceBuilder builder = new SearchSourceBuilder(); builder.query(new TermQueryBuilder("country", "魏")); // 相当于mysql里边的limit 1; builder.size(1); request.source(builder); try { SearchResponse response = client.search(request, RequestOptions.DEFAULT); SearchHit[] hits = response.getHits().getHits(); assertEquals(1, hits.length); } catch (Exception e) { throw new RuntimeException(e); } }

上边的单元测试中,我们用user作为查询条件,并且限制返回条数,类似SQL如下

> select * from posts where country='魏' limit 1; 多条件查询 + 排序 + 分页 @Test public void boolQueryTest(){ SearchRequest request = new SearchRequest("hero"); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder(); boolQueryBuilder.must(termQuery("country", "魏")); boolQueryBuilder.must(rangeQuery("longevity").gte(50)); sourceBuilder.query(boolQueryBuilder); sourceBuilder.from(0).size(2); sourceBuilder.query(boolQueryBuilder); sourceBuilder.sort("longevity", SortOrder.DESC); request.source(sourceBuilder); SearchResponse response = null; try { response = client.search(request, RequestOptions.DEFAULT); } catch (IOException e) { log.error("Query by Condition execution failed: {}", e.getMessage(), e); } assert response != null; assertEquals(0, response.getShardFailures().length); SearchHit[] hits = response.getHits().getHits(); List herosList = new ArrayList(hits.length); for (SearchHit hit : hits) { herosList.add(JSON.parseObject(hit.getSourceAsString(), Hero.class)); } log.info("print info: {}, size: {}", herosList.toString(), herosList.size()); }

上边的将曹魏集团的寿命50岁以上的英雄查询出来,并根据寿命从高到低排序,只截取两位英雄,其对应的sql:

> select * from hero where country='魏' and longevity >= 50 order by longevity DESC limit 2;

这里要注意,我们在ES提供的API中使用多条件查询时需要将多个条件封装到BoolQueryBuilder对象中,其支持下边几种查询类型

private static final String MUSTNOT = "mustNot"; private static final String MUST_NOT = "must_not"; private static final String FILTER = "filter"; private static final String SHOULD = "should"; private static final String MUST = "must";

具体解释参考官方文档

总结

在这部分我们先分享了如何将Spring boot和ES集成,以及最佳实践的建议——采用Java High Level REST Client来构建我们的API,然后分享了相关的依赖以及如何初始化客户端。

紧接着我们开始用High Level REST Client实现了创建索引、批量插入、更新数据、插入/更新数据、删除数据、复合操作,最后我们用两个简单的例子实现了查询数据,当然还有很多的查询例子没有展示出来,建议大家根据自己的需求,去官网查询使用的方法。

参考

Java High Level REST Client



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3