ElasticSearch原生接口
最后更新:2025-07-24 11:08:52
|
状态:未完成
|
相关数据库:
ElasticSearch-Elasticsearch
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
/*
PUT index_user/_bulk
{"index":{"_index":"index_user", "_id":"10011"}}
{"id":1001, "name":"a b", "age":20}
{"index":{"_index":"index_user", "_id":"10012"}}
{"id":1002, "name":"b c", "age":20}
{"index":{"_index":"index_user", "_id":"10013"}}
{"id":1003, "name":"c d", "age":30}*/
public boolean inserts(String table, Collection list){
boolean result = false;
String pk = "_id";
String method = "PUT";
String endpoint = table+"/_bulk";
String body = null;
StringBuilder builder = new StringBuilder();
for(Object entity:list){
Object _id = BeanUtil.getFieldValue(entity, pk);
if (null == _id) {
pk = "id";
_id = BeanUtil.getFieldValue(entity, pk);
}
builder.append("{\"index\":{\"_index\":\"").append(table).append("\", \"_id\":\"").append(_id).append("\"}}\n");
builder.append(BeanUtil.object2json(entity)).append("\n");
}
Request request = new Request(
method,
endpoint);
body = BeanUtil.object2json(builder.toString());
request.setJsonEntity(body);
HttpResponse response = exe(request);
if(response.getStatus() == 200 || response.getStatus() == 201){
result = true;
}
return result;
}
/**
*PUT index_user/_bulk
* {"index":{"_index":"index_user", "_id":"10011"}}
* {"id":1001, "name":"a b", "age":20}
* {"index":{"_index":"index_user", "_id":"10012"}}
* {"id":1002, "name":"b c", "age":20}
* {"index":{"_index":"index_user", "_id":"10013"}}
* {"id":1003, "name":"c d", "age":30}
* @param table
* @param set
* @return
*/
public boolean insert(String table, DataSet set){
boolean result = false;
String method = "PUT";
String endpoint = "*/_bulk";
String body = null;
StringBuilder builder = new StringBuilder();
for(DataRow row:set){
String pk = "_id";
Object _id = BeanUtil.getFieldValue(row, pk);
if (null == _id) {
pk = "id";
_id = BeanUtil.getFieldValue(row, pk);
}
row.remove("_id");
builder.append("{\"index\":{\"_index\":\"").append(table).append("\", \"_id\":\"").append(_id).append("\"}}\n");
builder.append(row.toJSON().replace(">",">").replace("<","<")).append("\n");
}
Request request = new Request(
method,
endpoint);
body = builder.toString();
request.setJsonEntity(body);
HttpResponse response = exe(request);
return result;
}
public boolean insert(String table, DataRow entity){
boolean result = false;
String pk = "_id";
String method = "POST";
String endpoint = null;
String body = null;
//一般需要设置用于索引的主键 如法规id = l100 问答id = q100
Object _id = BeanUtil.getFieldValue(entity, pk);
if (null == _id) {
pk = "id";
_id = BeanUtil.getFieldValue(entity, pk);
}
endpoint = table + "/_doc/";
if (BasicUtil.isNotEmpty(_id)) {
method = "PUT";
endpoint += _id;
}
entity.remove("_id");
Request request = new Request(
method,
endpoint);
body = BeanUtil.object2json(entity).replace(">",">").replace("<","<");
request.setJsonEntity(body);
HttpResponse response = exe(request);
if(BasicUtil.isEmpty(_id)){
DataRow row = DataRow.parse(response.getText());
_id = row.getString(pk);
if(BasicUtil.isNotEmpty(_id)){
BeanUtil.setFieldValue(entity, pk, _id);
}
}
return result;
}
private HttpResponse exe(Request request){
HttpResponse result = new HttpResponse();
try {
Response response = ((RestClient)RuntimeHolder.runtime().getProcessor()).performRequest(request);
int status = response.getStatusLine().getStatusCode();
result.setStatus(status);
//{"_index":"index_user","_id":"102","_version":3,"result":"updated","_shards":{"total":2,"successful":2,"failed":0},"_seq_no":9,"_primary_term":1}
String content = FileUtil.read(response.getEntity().getContent()).toString();
result.setText(content);
log.warn("[status:{}]", status);
}catch (Exception e){
e.printStackTrace();
}
return result;
}
public DataSet search(String table, DataRow body){
DataSet set = null;
String method = "POST";
String endpoint = table+"/_search";
Request request = new Request(
method,
endpoint);
String json = body.toLowerKey(true).toJSON();
log.warn("[search][body:{}]", body);
request.setJsonEntity(json);
HttpResponse response = exe(request);
if(response.getStatus() == 200) {
String txt = response.getText();
DataRow row = DataRow.parseJson(txt);
Object total = row.get("hits", "total", "value");
PageNavi navi = new DefaultPageNavi();
navi.setTotalRow(BasicUtil.parseInt(total,0));
navi.setPageRows(body.getInt("size", 10));
set = new DataSet();
set.setNavi(navi);
DataSet hits = row.getRow("hits").getSet("hits");
for(DataRow hit:hits){
DataRow item = hit.getRow("_source");
item.put("_id", hit.get("_id"));
DataRow highlight = hit.getRow("highlight");
if(null != highlight){
for(String key:highlight.keySet()){
List vals = highlight.getList(key);
if(null != vals && vals.size()>0){
item.put(key, vals.get(0));
}
}
}
set.add(item);
}
}
return set;
}
/*GET _analyze
{
"analyzer": "ik_max_word",
"text": ["马铃薯真好吃"]
}
*/
public LinkedHashMap<String,DataRow> analyze(String key){
return analyze(key, null);
}
public LinkedHashMap<String,DataRow> analyze(String key, String mode){
LinkedHashMap<String,DataRow> maps = new LinkedHashMap<>();
DataRow body = new DataRow(KeyAdapter.KEY_CASE.SRC);
if(BasicUtil.isEmpty(mode)){
mode = "ik_smart";
}
body.put("analyzer", mode);
body.put("text", new String[]{key});
Request request = new Request(
"GET",
"_analyze");
request.setJsonEntity(BeanUtil.object2json(body));
HttpResponse response = exe(request);
if(response.getStatus() == 200) {
DataRow row = DataRow.parseJson(response.getText());
DataSet tokens = row.getSet("tokens");
for(DataRow token:tokens){
String k = token.getString("token");
if(k.length() > 1){
maps.put(k, token);
}
}
}
return maps;
}