springBoot使用elasticsearch

配置文件内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# ES配置文件
elasticsearch:
# 是否是单机
isSingleton: false
# 启用x pack 的证书,值可以为空
capath: elastic/ca/elastic-certificates.p12
# 暂时保留,置空
keyStorePass:
# 连接账号
username: elastic
# 连接密码
password: elastic
# 连接url,多个使用 , 连接
nodeIp: 127.0.0.1
# 连接端口,多个使用 , 连接, 建议需要与ip的数量匹配
nodePort: 9200
# 连接模式,多个使用 , 连接, 建议需要与ip的数量匹配
nodeScheme: http
# 索引名称,
index: index

新增读取实体(缺少package,根据实际位置生成即可)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

/**
* @author HanBin
* @date 2021/12/6 9:16
*/
@Component
@ConfigurationProperties(prefix = "elasticsearch")
public class EsConfig {

@Value("${elasticsearch.isSingleton}")
private Boolean isSingleton;

@Value("${elasticsearch.capath}")
private String capath;

@Value("${elasticsearch.keyStorePass}")
private String keyStorePass;

@Value("${elasticsearch.username}")
private String username;

@Value("${elasticsearch.password}")
private String password;

@Value("${elasticsearch.nodeIp}")
private String nodeIp;

@Value("${elasticsearch.nodePort}")
private String nodePort;

@Value("${elasticsearch.nodeScheme}")
private String nodeScheme;
}

es连接配置组件(缺少package,根据实际位置生成即可)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import com.dlxx.ztxtgl.elasticsearch.model.EsConfig;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.apache.http.ssl.TrustStrategy;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.springframework.util.StringUtils;

import javax.annotation.PostConstruct;
import javax.net.ssl.SSLContext;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.Arrays;

/**
* @author HanBin
* @date 2021/12/6 9:23
* 来源地址: https://blog.csdn.net/qq_39198749/article/details/117702067
*/
@Component
public class EsClient {

public RestHighLevelClient restHighLevelClient;

public final static Logger logger = LoggerFactory.getLogger(EsClient.class);

@Autowired
private EsConfig esConfig;

@PostConstruct
private void init() {
//初始化RestHighLevelClient
this.initRestHighLevelClient();
}

private EsClient() {
}

private void initRestHighLevelClient() {
try {
if (restHighLevelClient == null) {
if (!esConfig.getIsSingleton()) {
//正式环境 使用ssl认证 集群连接
logger.info("******************ES集群连接开始****************");
restHighLevelClient = getClusterHighLevelClient();
logger.info("******************ES集群连接成功****************");
return;
}
//单机连接
logger.info("******************单机ES连接开始****************");
restHighLevelClient = getSingleHighLevelClient();
logger.info("******************单机ES连接成功****************");
}
} catch (Exception e) {
logger.error("es连接出现异常:{}", e.toString());
}
}

/**
* 集群连接
*
* @return RestHighLevelClient
*/
private RestHighLevelClient getClusterHighLevelClient() throws Exception {
SSLContextBuilder sslBuilder = SSLContexts.custom().loadTrustMaterial(null, new TrustStrategy() {
// 信任所有证书,本地证书
@Override
public boolean isTrusted(X509Certificate[] chain, String authType) throws CertificateException {
return true;
}
}
);
final SSLContext sslContext = sslBuilder.build();

final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(esConfig.getUsername(), esConfig.getPassword()));
logger.info("连接ES集群: {}:{}", esConfig.getNodeIp(), esConfig.getNodePort());
RestClientBuilder rclientBuilder = RestClient.builder(getHosts())
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setSSLContext(sslContext)
.setSSLHostnameVerifier(new NoopHostnameVerifier())
.setDefaultCredentialsProvider(credentialsProvider);
}
});
restHighLevelClient = new RestHighLevelClient(rclientBuilder);
return restHighLevelClient;
}

/**
* 单机连接
*
* @return RestHighLevelClient
*/
private RestHighLevelClient getSingleHighLevelClient() {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(esConfig.getUsername(), esConfig.getPassword()));
logger.info("连接单机ES: {}:{}", esConfig.getNodeIp(), esConfig.getNodePort());
RestClientBuilder rclientBuilder = RestClient.builder(getHosts())
.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
return httpClientBuilder.setSSLHostnameVerifier(new NoopHostnameVerifier())
.setDefaultCredentialsProvider(credentialsProvider);
}
});
restHighLevelClient = new RestHighLevelClient(rclientBuilder);
return restHighLevelClient;
}

private HttpHost[] getHosts() {
String[] ipData = esConfig.getNodeIp().split(",");
String[] portData = esConfig.getNodePort().split(",");
String[] schemeData = esConfig.getNodeScheme().split(",");
int hostLength = 0;
for (String ipDatum : ipData) {
if (!StringUtils.isEmpty(ipDatum)) {
hostLength++;
}
}
HttpHost[] httpHosts = new HttpHost[hostLength];
for (int i = 0; i < ipData.length; i++) {
int port = 9200;
if ((portData.length - 1) >= i) {
port = Integer.parseInt(portData[i]);
}
String scheme = "http";
if ((schemeData.length - 1) >= i) {
scheme = schemeData[i];
}
if (!StringUtils.isEmpty(ipData[i])) {
httpHosts[i] = new HttpHost(ipData[i], port, scheme);
}
}
return httpHosts;
}
}

使用实例 , service层

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
import com.dlxx.ztxtgl.elasticsearch.config.EsClient;
import com.dlxx.ztxtgl.elasticsearch.dao.EsDao;
import com.dlxx.ztxtgl.elasticsearch.model.ServerData;
import com.dlxx.ztxtgl.utils.MapUtils;
import org.apache.commons.lang.time.DateFormatUtils;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.PutMappingRequest;
import org.elasticsearch.index.query.*;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import org.elasticsearch.common.text.Text;
import java.io.IOException;
import java.util.*;

/**
* @author HanBin
* @date 2021/12/6 15:45
*/

@Service
public class EsService {

public final static Logger logger = LoggerFactory.getLogger(EsService.class);

private final EsClient esClient;



@Value("${elasticsearch.index:index}")
private String esIndex;

public EsService(EsClient esClient) {
this.esClient = esClient;
}

/**
* 创建index
*/
public Object createIndex() throws IOException {
RestHighLevelClient restHighLevelClient = esClient.restHighLevelClient;
CreateIndexRequest createIndexRequest = new CreateIndexRequest(esIndex);
return restHighLevelClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
}

/**
* 创建字段映射
*/
public Object createMapping() throws IOException {
RestHighLevelClient restHighLevelClient = esClient.restHighLevelClient;
PutMappingRequest putMappingRequest = new PutMappingRequest(esIndex);

Map<String, Object> source = new HashMap<>(1);

Map<String, Object> child = new HashMap<>(10);

Map<String, String> childChild = new HashMap<>(1);
childChild.put("type", "text");
child.put("id", childChild);

childChild = new HashMap<>(1);
childChild.put("type", "integer");
child.put("type", childChild);

childChild = new HashMap<>(1);
childChild.put("type", "integer");
child.put("subType", childChild);

childChild = new HashMap<>(3);
childChild.put("type", "text");
childChild.put("analyzer", "ik_max_word");
childChild.put("search_analyzer", "ik_max_word");
child.put("title", childChild);
child.put("subtitle", childChild);
child.put("info", childChild);
child.put("department", childChild);
child.put("sourceSystem", childChild);

childChild = new HashMap<>(2);
childChild.put("type", "date");
childChild.put("format", "yyyy-MM-dd HH:mm:ss || yyyy-MM-dd");
child.put("createTime", childChild);
child.put("updateTime", childChild);

childChild = new HashMap<>(1);
childChild.put("type", "date");
child.put("@timestamp", childChild);

source.put("properties", child);
putMappingRequest.source(source);
return restHighLevelClient.indices().putMapping(putMappingRequest, RequestOptions.DEFAULT);
}

public boolean saveData(Map<String, Object> saveData) {
try {
saveData.put("@timestamp", new Date());
IndexRequest indexRequest = new IndexRequest(esIndex);
indexRequest.id(saveData.get("id").toString());
indexRequest.source(saveData);
RestHighLevelClient restHighLevelClient = esClient.restHighLevelClient;
IndexResponse indexResponse = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);
logger.info(indexResponse.getResult().toString());
return true;
} catch (Exception e) {
logger.error("保存失败:" + e.getMessage());
return false;
}
}

/**
* 搜索部分
* 使用了高亮 HighlightBuilder
* 使用了分页 searchSourceBuilder.from((pageNo - 1) * pageSize);
* searchSourceBuilder.size(pageSize);
* 使用了排序 sort
*/
public Object searchData(String search, Integer pageNo, Integer pageSize) throws IOException {
if (pageNo <= 0) {
pageNo = 1;
}
if (pageSize > 1000 || pageSize < 1) {
pageSize = 20;
}
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
String[] searchField = new String[] {"title", "subtitle", "info", "department", "sourceSystem"};
QueryBuilder queryBuilder = QueryBuilders.multiMatchQuery(search, searchField);

searchSourceBuilder.query(queryBuilder);
searchSourceBuilder.sort("updateTime", SortOrder.DESC);
searchSourceBuilder.from((pageNo - 1) * pageSize);
searchSourceBuilder.size(pageSize);

HighlightBuilder highlightBuilder = new HighlightBuilder();
highlightBuilder.field("title");
highlightBuilder.field("subtitle");
highlightBuilder.field("info");
highlightBuilder.field("department");
highlightBuilder.field("sourceSystem");
highlightBuilder.requireFieldMatch(false);
highlightBuilder.preTags("<strong>");
highlightBuilder.postTags("</strong>");
searchSourceBuilder.highlighter(highlightBuilder);
SearchRequest searchRequest = new SearchRequest().indices(esIndex).source(searchSourceBuilder);
RestHighLevelClient restHighLevelClient = esClient.restHighLevelClient;
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
SearchHits hits = searchResponse.getHits();
Long totalCount = hits.getTotalHits().value;
Map<String, Object> result = new HashMap<>(2);
long totalPage = totalCount % pageSize != 0 ? (totalCount / pageSize) + 1 : totalCount / pageSize;
result.put("total", totalCount);
result.put("list", getHitList(hits));
result.put("pageNo", pageNo);
result.put("pageSize", pageSize);
result.put("totalPage", totalPage);
return result;
}

private List<Map<String,Object>> getHitList(SearchHits hits) {
List<Map<String,Object>> list = new ArrayList<>();
Map<String,Object> map;
for(SearchHit searchHit : hits){
map = new HashMap<>();
// 处理源数据
map.put("source",searchHit.getSourceAsMap());
// 处理高亮数据
Map<String,Object> hitMap = new HashMap<>();
searchHit.getHighlightFields().forEach((k,v) -> {
StringBuilder highStr = new StringBuilder();
for(Text text : v.getFragments()) {
highStr.append(text.string());
}
hitMap.put(v.getName(), highStr.toString());
});
map.put("highlight",hitMap);
list.add(map);
}
return list;
}






/**
* 批量保存方法
*/
public boolean bulkSaveData(List<Map<String, Object>> list) {
BulkRequest bulkRequest = new BulkRequest();
boolean result = true;
for (Map<String, Object> item : list) {
String id = item.get("id");
// 生成时间戳
item.put("@timestamp", new Date());
IndexRequest indexRequest = new IndexRequest(esIndex);
indexRequest.id(id);
indexRequest.source(item);
bulkRequest.add(indexRequest);
// 此处使用单次保存数量上限,可以根据实际修改
if (bulkRequest.numberOfActions() == 50) {
boolean resultData = bulkSave(bulkRequest);
bulkRequest = new BulkRequest();
if (result) {
result = !resultData;
}
}
}
boolean resultData = bulkSave(bulkRequest);
if (result) {
result = !resultData;
}
return result;

}

public boolean bulkSave(BulkRequest bulkRequest) {
try {
RestHighLevelClient restHighLevelClient = esClient.restHighLevelClient;
BulkResponse bulkItemResponses = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
return bulkItemResponses.hasFailures();
} catch (Exception e) {
System.out.println(e.getMessage());
logger.error(e.getMessage());
return true;
}
}
}

其他调用,可以新增对应控制器,传对应参数进行测试