博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ES java api
阅读量:5966 次
发布时间:2019-06-19

本文共 10113 字,大约阅读时间需要 33 分钟。

2.0之后ES的java api用法有了很大变化。在此记录一些。

java应用程序连接ES集群,笔者使用的是TransportClient,获取TransportClient的代码设计为单例模式(见getClient方法)。同时包含了设置自动提交文档的代码。注释比较详细,不再赘述。

下方另有提交文档、提交搜索请求的代码。

1、连接ES集群代码如下: 

1 package elasticsearch;  2   3 import com.vividsolutions.jts.geom.GeometryFactory;  4 import com.vividsolutions.jts.geom.MultiPolygon;  5 import com.vividsolutions.jts.geom.Polygon;  6 import com.vividsolutions.jts.io.ParseException;  7 import com.vividsolutions.jts.io.WKTReader;  8 import org.apache.commons.logging.Log;  9 import org.apache.commons.logging.LogFactory; 10 import org.elasticsearch.action.bulk.BulkProcessor; 11 import org.elasticsearch.action.bulk.BulkRequest; 12 import org.elasticsearch.action.bulk.BulkResponse; 13 import org.elasticsearch.client.transport.TransportClient; 14 import org.elasticsearch.common.settings.Settings; 15 import org.elasticsearch.common.transport.InetSocketTransportAddress; 16 import org.elasticsearch.common.unit.ByteSizeUnit; 17 import org.elasticsearch.common.unit.ByteSizeValue; 18 import org.elasticsearch.common.unit.TimeValue; 19  20 import java.net.InetAddress; 21 import java.util.Date; 22  23 /** 24  * Created by ZhangDong on 2015/12/25. 25  */ 26 public class EsClient { 27     static Log log = LogFactory.getLog(EsClient.class); 28  29     //    用于提供单例的TransportClient BulkProcessor 30     static public TransportClient tclient = null; 31     static BulkProcessor staticBulkProcessor = null; 32  33 //【获取TransportClient 的方法】 34     public static TransportClient getClient() { 35         try { 36             if (tclient == null) { 37                 String EsHosts = "10.10.2.1:9300,10.10.2.2:9300"; 38                 Settings settings = Settings.settingsBuilder() 39                         .put("cluster.name", "wshare_es")//设置集群名称 40                         .put("tclient.transport.sniff", true).build();//自动嗅探整个集群的状态,把集群中其它机器的ip地址加到客户端中 41  42                 tclient = TransportClient.builder().settings(settings).build(); 43                 String[] nodes = EsHosts.split(","); 44                 for (String node : nodes) { 45                     if (node.length() > 0) {
//跳过为空的node(当开头、结尾有逗号或多个连续逗号时会出现空node) 46 String[] hostPort = node.split(":"); 47 tclient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(hostPort[0]), Integer.parseInt(hostPort[1]))); 48 49 } 50 } 51 }//if 52 } catch (Exception e) { 53 e.printStackTrace(); 54 } 55 return tclient; 56 } 57 //【设置自动提交文档】 58 public static BulkProcessor getBulkProcessor() { 59 //自动批量提交方式 60 if (staticBulkProcessor == null) { 61 try { 62 staticBulkProcessor = BulkProcessor.builder(getClient(), 63 new BulkProcessor.Listener() { 64 @Override 65 public void beforeBulk(long executionId, BulkRequest request) { 66 //提交前调用 67 // System.out.println(new Date().toString() + " before"); 68 } 69 70 @Override 71 public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { 72 //提交结束后调用(无论成功或失败) 73 // System.out.println(new Date().toString() + " response.hasFailures=" + response.hasFailures()); 74 log.info( "提交" + response.getItems().length + "个文档,用时" 75 + response.getTookInMillis() + "MS" + (response.hasFailures() ? " 有文档提交失败!" : "")); 76 // response.hasFailures();//是否有提交失败 77 } 78 79 @Override 80 public void afterBulk(long executionId, BulkRequest request, Throwable failure) { 81 //提交结束且失败时调用 82 log.error( " 有文档提交失败!after failure=" + failure); 83 } 84 }) 85 86 .setBulkActions(1000)//文档数量达到1000时提交 87 .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))//总文档体积达到5MB时提交 // 88 .setFlushInterval(TimeValue.timeValueSeconds(5))//每5S提交一次(无论文档数量、体积是否达到阈值) 89 .setConcurrentRequests(1)//加1后为可并行的提交请求数,即设为0代表只可1个请求并行,设为1为2个并行 90 .build(); 91 // staticBulkProcessor.awaitClose(10, TimeUnit.MINUTES);//关闭,如有未提交完成的文档则等待完成,最多等待10分钟 92 } catch (Exception e) {
//关闭时抛出异常 93 e.printStackTrace(); 94 } 95 }//if 96 97 98 99 100 101 return staticBulkProcessor;102 }103 }

 2、插入文档的代码(自动批量提交方式,注释中另有手动批量提交、单个文档提交的方式)

1 package elasticsearch; 2  3 import org.apache.commons.logging.Log; 4 import org.apache.commons.logging.LogFactory; 5 import org.elasticsearch.action.index.IndexRequest; 6  7  8 /** 9  * Created by ZhangDong on 2015/12/25.10  */11 public class EsInsert2 {12     static Log log = LogFactory.getLog(EsInsert2.class);13     public static void add(String json) {14                 try {  //EsClient.getBulkProcessor()是位于上方EsClient类中的方法15                     EsClient.getBulkProcessor().add(new IndexRequest("设置的index name", "设置的type name","要插入的文档的ID").source(json));//添加文档,以便自动提交16                 } catch (Exception e) {17                     log.error("add文档时出现异常:e=" + e + " json=" + json);18                 }19     }20 }21 //手动 批量更新22 //        BulkRequestBuilder bulkRequest = tclient.prepareBulk();23 //        for(int i=500;i<1000;i++){24 //            //业务对象25 //            String json = "";26 //            IndexRequestBuilder indexRequest = tclient.prepareIndex("twitter", "tweet")27 //                    //指定不重复的ID28 //                    .setSource(json).setId(String.valueOf(i));29 //            //添加到builder中30 //            bulkRequest.add(indexRequest);31 //        }32 //33 //        BulkResponse bulkResponse = bulkRequest.execute().actionGet();34 //        if (bulkResponse.hasFailures()) {35 //            // process failures by iterating through each bulk response item36 //            System.out.println(bulkResponse.buildFailureMessage());37 //        }38 39 //单个文档提交40 //        String json = "{\"relationship\":{},\"tags\":[\"camera\",\"video\"]}";41 //        IndexResponse response = getClient().prepareIndex("dots", "scan", JSON.parseObject(json).getString("rid")).setSource(json).get();42 //        return response.toString();

 

3、进行搜索的代码,其中有适用于复杂搜索逻辑的BoolQuery用法,以及关键词高亮的配置、在某个字段精确搜索、全文搜索、匹配全部文档、搜索同时返回聚类信息的用法:

1 package service; 2  3 import elasticsearch.EsClient; 4 import org.apache.commons.logging.Log; 5 import org.apache.commons.logging.LogFactory; 6 import org.elasticsearch.action.search.SearchRequestBuilder; 7 import org.elasticsearch.action.search.SearchResponse; 8 import org.elasticsearch.index.query.*; 9 import org.elasticsearch.search.aggregations.AggregationBuilders;10 import org.springframework.stereotype.Service;11 12 /**13  * Created by ZhangDong on 2016/1/5.14  */15 @Service16 public class SearchService2 {17 18     Log log = LogFactory.getLog(getClass());19     public SearchResponse getSimpleSearchResponse( int page, int pagesize){20 21         BoolQueryBuilder mustQuery = QueryBuilders.boolQuery();22         mustQuery.must(QueryBuilders.matchAllQuery()); // 添加第1条must的条件 此处为匹配所有文档23 24         mustQuery.must(QueryBuilders.matchPhraseQuery("title", "时间简史"));//添加第2条must的条件 title字段必须为【时间简史】25         // ↑ 放入筛选条件(termQuery为精确搜索,大小写敏感且不支持*) 实验发现matchPhraseQuery可对中文精确匹配term26 27         mustQuery.must(QueryBuilders.matchQuery("auther", "霍金")); // 添加第3条must的条件28 29         QueryBuilder queryBuilder = QueryBuilders.queryStringQuery("物理")//.escape(true)//escape 转义 设为true,避免搜索[]、结尾为!的关键词时异常 但无法搜索*30                 .defaultOperator(QueryStringQueryBuilder.Operator.AND);//不同关键词之间使用and关系31         mustQuery.must(queryBuilder);//添加第4条must的条件 关键词全文搜索筛选条件32 33         SearchRequestBuilder searchRequestBuilder = EsClient.getClient().prepareSearch("index name").setTypes("type name")34                 .setQuery(mustQuery)35                 .addHighlightedField("*")/*星号表示在所有字段都高亮*/.setHighlighterRequireFieldMatch(false)//配置高亮显示搜索结果36                 .setHighlighterPreTags("
<高亮前缀标签>
").setHighlighterPostTags("
<高亮后缀标签>
");//配置高亮显示搜索结果37 38 searchRequestBuilder = searchRequestBuilder.addAggregation(AggregationBuilders.terms("agg1(聚类返回时根据此key获取聚类结果)")39 .size(1000)/*返回1000条聚类结果*/.field("要在文档中聚类的字段,如果是嵌套的则用点连接父子字段,如【person.company.name】"));40 41 SearchResponse searchResponse = searchRequestBuilder.setFrom((page - 1) * pagesize)//分页起始位置(跳过开始的n个)42 .setSize(pagesize)//本次返回的文档数量43 .execute().actionGet();//执行搜索44 45 log.info("response="+searchResponse);46 return searchResponse;47 }48 }

 

4、ES中使用delete-by-query插件,DSL方式按条件删除数据的方法:

ES2.1中,默认的文档删除方式只有按ID删除方法:

curl -XDELETE 'localhost:9200/customer/external/2?pretty'

(参考:)

按条件删除需要安装delete-by-query插件,在线安装方式可使用命令

plugin install delete-by-query

随后会从https://download.elastic.co/elasticsearch/release/org/elasticsearch/plugin/delete-by-query/2.1.0/delete-by-query-2.1.0.zip处下载插件安装包。但是本人使用的某个ES环境是离线的,需要手动下载上述URL对应的ZIP,放置于elasticsearch-2.1.0文件夹下,与bin、config等文件夹同级,同时还要下载 https://download.elastic.co/elasticsearch/release/org/elasticsearch/plugin/delete-by-query/2.1.0/delete-by-query-2.1.0.zip.md5 校验文件放于同一位置(XXX.sha1应该也可以),使用以下命令离线安装:

bin/plugin install file:delete-by-query-2.1.0.zip

其中delete-by-query-2.1.0.zip是相对路径,绝对路径应该也可以,随后便安装成功了。

安装成功后查看,发现其实就是解压delete-by-query-2.1.0.zip的内容放置于elasticsearch-2.1.0/plugins/delete-by-query 文件夹下,猜测手动解压也可以使用。

注意:如果是ES集群,需要对每个节点都安装这个插件,而且每个节点安装后要重启ES。

使用DSL方式按条件删除文档的方法:

DELETE方式,请求http://localhost:9200/index_name/type_name/_queryhttp payload内容:{  "query":{    "match_all":{}  }}上述query为匹配全部文档。

转载地址:http://vjxax.baihongyu.com/

你可能感兴趣的文章
axios 拦截 , 页面跳转, token 验证(自己摸索了一天搞出来的)
查看>>
有序的双链表
查看>>
程序员全国不同地区,微信(面试 招聘)群。
查看>>
【干货】界面控件DevExtreme视频教程大汇总!
查看>>
闭包 !if(){}.call()
查看>>
python MySQLdb安装和使用
查看>>
Java小细节
查看>>
poj - 1860 Currency Exchange
查看>>
chgrp命令
查看>>
Java集合框架GS Collections具体解释
查看>>
洛谷 P2486 BZOJ 2243 [SDOI2011]染色
查看>>
数值积分中的辛普森方法及其误差估计
查看>>
Web service (一) 原理和项目开发实战
查看>>
跑带宽度多少合适_跑步机选购跑带要多宽,你的身体早就告诉你了
查看>>
深入理解Java的接口和抽象类
查看>>
Javascript异步数据的同步处理方法
查看>>
iis6 zencart1.39 伪静态规则
查看>>
SQL Server代理(3/12):代理警报和操作员
查看>>
Linux备份ifcfg-eth0文件导致的网络故障问题
查看>>
2018年尾总结——稳中成长
查看>>