yml
spring: data: elasticsearch: client: reactive: endpoints: 192.168.209.160:9200 connection-timeout: 10000#链接到es的超时时间,毫秒为单位,默认10秒(10000毫秒) socket-timeout: 10000#读取和写入的超时时间,单位为毫秒,默认5秒(5000毫秒) elasticsearch: rest: uris: 192.168.209.160:9200# 这两个属性在新版本的springboot中已经不建议使用,9300属于elasticsearch各节点之间的通讯接口。 # 属于lowlevelclient。我们推荐使用9200的RestHighLevelClient去链接 # cluster-nodes: 127.0.0.1:9300# cluster-name: helloElasticsearch
pom
<dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-captcha</artifactId> <version>5.2.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-elasticsearch</artifactId> </dependency>
Controller
package com.fwz.tproject.testfunction.controller;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.fwz.tproject.testfunction.service.ElasticSearchUtils;
import com.fwz.tproject.testfunction.service.IdGeneratorSnowflake;
import com.fwz.tproject.testfunction.service.OrderService;/**
*
*
* @author 冯文哲
* @version 2018-06-11 */@RestController
@RequestMapping(value = "/test")public class MainController {
@Autowired private IdGeneratorSnowflake idGenerator;
@Autowired
ElasticSearchUtils utilsService;
@RequestMapping(value = "createIndex") public String elasticsearch() { if (utilsService.createIndex("fwztest_index", 5, 1, "")) { return "创建成功";
} else { return "创建失败";
}
}
@RequestMapping(value = "addDoc") public String addDoc() { for (int j = 0; j < 1000; j++) {
Map<String, Object> map = new ConcurrentHashMap<String, Object>();
map.put("author_id", idGenerator.snowflakeId());
map.put("title", "这有" + j + "个中国人");
map.put("content", "其中有" + (j - 1) + "个老黑");
map.put("create_date", new Date());
utilsService.addDoc("fwztest_index", String.valueOf(idGenerator.snowflakeId()), map);
} return "新增成功";
}
@RequestMapping(value = "deleteDoc") public String deleteDoc(String id) {
utilsService.deleteDoc("fwztest_index", id); return "删除成功";
}
@RequestMapping(value = "updateDoc") public String updateDoc(String id) {
utilsService.updateDoc("fwztest_index", id, ""); return "修改成功";
}
@RequestMapping(value = "selectDoc") public Map<String, Object> selectDoc(String id) { return utilsService.getDoc("fwztest_index", id);
}
}
Utils
package com.fwz.tproject.testfunction.service;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.Avg;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.stereotype.Service;
@Service
@EnableAsyncpublic class ElasticSearchUtils {
@Autowired private RestHighLevelClient restClient;
Logger logger = LoggerFactory.getLogger(ElasticSearchUtils.class.getName()); /**
* createIndex
*
* @param indexName //索引名称
* @param shards //主分片
* @param replicas //备份分片
* @param mapping //mapping配置
* @return */
public boolean createIndex(String indexName, Integer shards, Integer replicas, String mapping) {
logger.info(restClient.toString());
CreateIndexRequest request = new CreateIndexRequest(indexName);
request.settings(Settings.builder().put("number_of_shards", 5).put("number_of_replicas", 1));
request.mapping( "{\"properties\":{\"author_id\":{\"type\":\"long\"},\"title\":{\"type\":\"text\",\"analyzer\":\"standard\",\"fields\":{\"keyword\":{\"type\":\"keyword\",\"ignore_above\":256}}},\"content\":{\"type\":\"text\",\"analyzer\":\"ik_max_word\",\"fields\":{\"keyword\":{\"type\":\"keyword\",\"ignore_above\":256}}},\"create_date\":{\"type\":\"date\"}}}",
XContentType.JSON);
request.setTimeout(TimeValue.timeValueMinutes(1));
CreateIndexResponse createIndexResponse; try {
createIndexResponse = restClient.indices().create(request, RequestOptions.DEFAULT);
boolean acknowledged = createIndexResponse.isAcknowledged();
logger.info("是否获取ACK:" + acknowledged); return acknowledged;
} catch (IOException e) { // TODO Auto-generated catch block logger.error(e.toString());
} return false;
} /**
*
* addDocument
*
* @param index 索引名称
* @param id 数据ID(为空则使用es内部ID)
* @param source 数据(json 或 Map)
* @return
* @author fwzz
* @version 创建时间:2021年1月27日 下午5:10:42
* */
@Async public Future<Boolean> addDoc(String index, String id, Map<String, Object> source) { // 增, source 里对象创建方式可以是JSON字符串,或者Map,或者XContentBuilder 对象
IndexRequest indexRequest = new IndexRequest(index).source(source); if (id != null && !"".equals(id)) {
indexRequest = indexRequest.id(id);
} try {
IndexResponse res = restClient.index(indexRequest, RequestOptions.DEFAULT);
logger.info("新增数据成功,ID为: " + res.getId()); return new AsyncResult<Boolean>(true);
} catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace();
} return new AsyncResult<Boolean>(false);
} /**
*
* deleteDocument
*
* @param index 索引名称
* @param id 数据ID
* @return
* @author fwzz
* @version 创建时间:2021年1月27日 下午5:19:26
* */
public boolean deleteDoc(String index, String id) { // 删
DeleteRequest deleteRequest = new DeleteRequest(index, id);
DeleteResponse res; try {
res = restClient.delete(deleteRequest, RequestOptions.DEFAULT);
logger.info(res.getResult().toString());
logger.info("删除数据成功,ID为: " + res.getId());
} catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace();
} return false;
} /**
*
* updateDocument
*
* @param index
* @param id
* @param source
* @return
* @author fwzz
* @version 创建时间:2021年1月27日 下午5:25:43
* */
public boolean updateDoc(String index, String id, String source) { // 改, source 里对象创建方式可以是JSON字符串,或者Map,或者XContentBuilder 对象
UpdateRequest updateRequest = new UpdateRequest(index, id).doc(source); try {
UpdateResponse res = restClient.update(updateRequest, RequestOptions.DEFAULT);
logger.info("修改数据成功,ID为: " + res.getId()); return true;
} catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace();
} return false;
} /**
* selectDocument
*
* @param index
* @param id
* @return
* @author fwzz
* @version 创建时间:2021年1月27日 下午5:27:33
* */
public Map<String, Object> getDoc(String index, String id) { // 查
GetRequest getRequest = new GetRequest(index, id); try {
GetResponse res = restClient.get(getRequest, RequestOptions.DEFAULT);
logger.info("查询数据成功,ID为: " + res.getId());
logger.info("查询数据成功,字符串数据为: " + res.getSourceAsString());
Map<String, Object> map = res.getSourceAsMap();
logger.info("查询数据成功,Map数据为: " + map.toString()); return map;
} catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace();
} return null;
} /**
* bulkDemo
*
* @param index
* @param id
* @return
* @author fwzz
* @version 创建时间:2021年1月27日 下午7:35:34
* */
public Boolean bulkRequest(String index, String id) {
BulkRequest request = new BulkRequest(); /**
* map为更新或新增的数据 */
request.add(new IndexRequest(index).source(XContentType.JSON, new HashMap<String, Object>()));
request.add(new DeleteRequest(index, id));
request.add(new UpdateRequest(index, id).doc(XContentType.JSON, new HashMap<String, Object>()));
BulkResponse bulkResponse; try {
bulkResponse = restClient.bulk(request, RequestOptions.DEFAULT); for (BulkItemResponse bulkItemResponse : bulkResponse) { if (bulkItemResponse.isFailed()) {
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
logger.info(failure.getMessage()); continue;
}
DocWriteResponse itemResponse = bulkItemResponse.getResponse(); if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
IndexResponse indexResponse = (IndexResponse) itemResponse;
logger.info(indexResponse.getResult().toString());
} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
UpdateResponse updateResponse = (UpdateResponse) itemResponse;
logger.info(updateResponse.getResult().toString());
} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
logger.info(deleteResponse.getResult().toString());
}
} return true;
} catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace();
} return false;
} /**
* searchQueryDemo 可完全取代getRequest
*
* @return
* @author fwzz
* @version 创建时间:2021年1月27日 下午7:43:57
* */
public Boolean searchQuery() { /**
* 指定index */
SearchRequest searchRequest = new SearchRequest("gdp_tops*");
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); /**
* 指定query */
sourceBuilder.query(QueryBuilders.termQuery("city", "北京市"));
sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
searchRequest.source(sourceBuilder); try {
SearchResponse response = restClient.search(searchRequest, RequestOptions.DEFAULT);
Arrays.stream(response.getHits().getHits()).forEach(i -> {
System.out.println(i.getIndex());
System.out.println(i.getSourceAsMap());
});
logger.info(response.getHits().getTotalHits().toString()); return true;
} catch (IOException e) {
e.printStackTrace();
} return false;
} /**
* aggsSearchDemo
*
* @return
* @author fwzz
* @version 创建时间:2021年1月27日 下午7:46:31
* */
public Boolean aggsQuery() { /**
* 指定index */
SearchRequest searchRequest = new SearchRequest("gdp_tops*");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
TermsAggregationBuilder aggregation = AggregationBuilders.terms("by_company").field("company.keyword");
aggregation.subAggregation(AggregationBuilders.avg("average_age").field("age"));
searchSourceBuilder.aggregation(aggregation);
searchSourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS)); /**
* 分页查询 */
/*
* searchSourceBuilder.from(0); searchSourceBuilder.size(5); */
searchRequest.source(searchSourceBuilder); try { /**
* 处理方法1 (1 2 都尝试一下) */
SearchResponse response = restClient.search(searchRequest, RequestOptions.DEFAULT);
Arrays.stream(response.getHits().getHits()).forEach(i -> {
logger.info(i.getIndex());
logger.info(i.getSourceAsMap().toString());
}); /**
* 处理方法2 (1 2 都尝试一下) */
Aggregations aggregations = response.getAggregations();
Terms byCompanyAggregation = aggregations.get("by_company");
Bucket elasticBucket = byCompanyAggregation.getBucketByKey("Elastic");
Avg averageAge = elasticBucket.getAggregations().get("average_age"); double avg = averageAge.getValue();
logger.info(response.getHits().getTotalHits().toString()); return true;
} catch (IOException e) {
e.printStackTrace();
} return false;
} /**
* searchAsyncDemo
*
* @return
* @author fwzz
* @version 创建时间:2021年1月27日 下午7:50:00
* */
public Boolean searchAsync() { /**
* 指定index */
SearchRequest searchRequest = new SearchRequest("gdp_tops*");
restClient.searchAsync(searchRequest, RequestOptions.DEFAULT, new ActionListener<SearchResponse>() {
@Override public void onResponse(SearchResponse searchResponse) {
SearchHit[] searchHits = searchResponse.getHits().getHits(); for (SearchHit hit : searchHits) { // 结果的Index
String index = hit.getIndex(); // 结果的ID
String id = hit.getId(); // 结果的评分
float score = hit.getScore(); // 查询的结果 JSON字符串形式
String sourceAsString = hit.getSourceAsString(); // 查询的结果 Map的形式
Map<String, Object> sourceAsMap = hit.getSourceAsMap(); // Document的title
String documentTitle = (String) sourceAsMap.get("title"); // 结果中的某个List
List<Object> users = (List<Object>) sourceAsMap.get("user"); // 结果中的某个Map
Map<String, Object> innerObject = (Map<String, Object>) sourceAsMap.get("innerObject");
}
}
@Override public void onFailure(Exception e) {
logger.error(e.toString());
}
}); return true;
} /**
* 有时候需要查询的数据太多,可以考虑使用SearchRequest.scroll()方法拿到scrollId;之后再使用SearchScrollRequest
* 其用法如下:
*
* @return
* @author fwzz
* @version 创建时间:2021年1月27日 下午8:00:14
* */
public Boolean searchScroll() {
SearchRequest searchRequest = new SearchRequest("posts");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.termQuery("city", "北京市"));
searchSourceBuilder.size(5);
searchRequest.source(searchSourceBuilder);
searchRequest.scroll(TimeValue.timeValueMinutes(1L));
SearchResponse searchResponse; try {
searchResponse = restClient.search(searchRequest, RequestOptions.DEFAULT);
String scrollId = searchResponse.getScrollId();
SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
scrollRequest.scroll(TimeValue.timeValueSeconds(30));
SearchResponse searchScrollResponse = restClient.scroll(scrollRequest, RequestOptions.DEFAULT);
scrollId = searchScrollResponse.getScrollId();
SearchHits hits = searchScrollResponse.getHits();
logger.info(hits.getTotalHits().toString()); return true;
} catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace();
} return false;
}
}
全局ID生成工具类
package com.fwz.tproject.testfunction.service;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import cn.hutool.core.lang.Snowflake;
import cn.hutool.core.net.NetUtil;
import cn.hutool.core.util.IdUtil;
@Componentpublic class IdGeneratorSnowflake { private long workerId = 0; private long datacenterId = 1; private Snowflake snowflake = IdUtil.createSnowflake(workerId, datacenterId); private static final Logger log = LoggerFactory.getLogger(IdGeneratorSnowflake.class.getName()); // 依赖注入完成后执行该方法,进行一些初始化工作 @PostConstruct public void init() { try {
workerId = NetUtil.ipv4ToLong(NetUtil.getLocalhostStr());
log.info("当前机器的workerId: {}", workerId);
} catch (Exception e) {
e.printStackTrace();
log.warn("当前机器的workerId获取失败", e); // 释放ID
workerId = NetUtil.getLocalhostStr().hashCode();
}
} // 使用默认机房号获取ID
public synchronized long snowflakeId() { return snowflake.nextId();
} // 自己制定机房号获取ID
public synchronized long snowflakeId(long workerId, long datacenterId) {
Snowflake snowflake = IdUtil.createSnowflake(workerId, datacenterId); return snowflake.nextId();
} /**
* 生成的是不带-的字符审,类似于: 73a64edf935d4952a287739a66f96e06
*
* @return */
public String simpleUUID() { return IdUtil.simpleUUID();
} /**
* 生成的UUID是带-的字符串,类似于: b12b6401-6f9c-4351-b2b6-d8afc9ab9272
*
* @return */
public String randomUUID() { return IdUtil.randomUUID();
} public static void main(String[] args) {
IdGeneratorSnowflake f = new IdGeneratorSnowflake(); for (int i = 0; i < 1000; i++) {
System.out.println(f.snowflakeId(0, 0));
}
}
}
转自:https://www.cnblogs.com/fengwenzhee/p/14336734.html