elasticsearchjavaapi很全的整理

张永清      2022-05-20     625

关键词:

Elasticsearch 的API 分为 REST Client API(http请求形式)以及 transportClient API两种。相比来说transportClient API效率更高,transportClient 是通过Elasticsearch内部RPC的形式进行请求的,连接可以是一个长连接,相当于是把客户端的请求当成

Elasticsearch 集群的一个节点,当然 REST Client API 也支持http keepAlive形式的长连接,只是非内部RPC形式。但是从Elasticsearch 7 后就会移除transportClient 。主要原因是transportClient 难以向下兼容版本。

本文中所有的讲解和操作都是基于jdk 1.8 和elasticsearch 6.2.4版本。

备注:本文参考了很多Elasticsearch 的官方文档以及部l网络资料做的综合整理。  本人github 参考代码:https://github.com/597365581/bigdata_tools/tree/master/yongqing-bigdata-tools/yongqing-elasticsearch-tool

一、High REST Client

High Client 基于 Low Client, 主要目的是暴露一些 API,这些 API 可以接受请求对象为参数,返回响应对象,而对请求和响应细节的处理都是由 client 自动完成的。

API 在调用时都可以是同步或者异步两种形式
同步 API 会导致阻塞,一直等待数据返回
异步 API 在命名上会加上 async 后缀,需要有一个 listener 作为参数,等这个请求返回结果或者发生错误时,这个 listener 就会被调用,listener主要是解决自动回调的问题,有点像安卓 开发里面的listener监听回调。

Elasticsearch REST APi 官方 地址:https://www.elastic.co/guide/en/elasticsearch/reference/6.2/index.html

Maven 依赖

<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.2.4</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.2.4</version>
</dependency>

client初始化:

RestHighLevelClient 实例依赖 REST low-level client builder

public class ElasticSearchClient {
private String[] hostsAndPorts;

public ElasticSearchClient(String[] hostsAndPorts) {
this.hostsAndPorts = hostsAndPorts;
}
public RestHighLevelClient getClient() {
        RestHighLevelClient client = null;
        List<HttpHost> httpHosts = new ArrayList<HttpHost>();
        if (hostsAndPorts.length > 0) {
            for (String hostsAndPort : hostsAndPorts) {
                String[] hp = hostsAndPort.split(":");
                httpHosts.add(new HttpHost(hp[0], Integer.valueOf(hp[1]), "http"));
            }
            client = new RestHighLevelClient(
                    RestClient.builder(httpHosts.toArray(new HttpHost[0])));
        } else {
            client = new RestHighLevelClient(
                    RestClient.builder(new HttpHost("127.0.0.1", 9200, "http")));
        }
        return client;
    }
}

 

文档 API(High level rest 客户端支持下面的 文档(Document) API):

  • 单文档 API:
  • index API
  • Get API
  • Delete API
  • Update API
  • 多文档 API:
  • Bulk API
  • Multi-Get API

1、Index API:
IndexRequest:
封装好的参考方法:

private IndexRequest getIndexRequest(String index, String indexType, String docId, Map<String, Object> dataMap) {
        IndexRequest indexRequest = null;
        if (null == index || null == indexType) {
            throw new ElasticsearchException("index or indexType must not be null");
        }
        if (null == docId) {
            indexRequest = new IndexRequest(index, indexType);
        } else {
            indexRequest = new IndexRequest(index, indexType, docId);
        }
        return indexRequest;
    }

    /**
     * 同步执行索引
     *
     * @param index
     * @param indexType
     * @param docId
     * @param dataMap
     * @throws IOException
     */
    public IndexResponse execIndex(String index, String indexType, String docId, Map<String, Object> dataMap) throws IOException {
        return getClient().index(getIndexRequest(index, indexType, docId, dataMap).source(dataMap));
    }

    /**
     * 异步执行
     *
     * @param index
     * @param indexType
     * @param docId
     * @param dataMap
     * @param indexResponseActionListener
     * @throws IOException
     */
    public void asyncExecIndex(String index, String indexType, String docId, Map<String, Object> dataMap, ActionListener<IndexResponse> indexResponseActionListener) throws IOException {
        getClient().indexAsync(getIndexRequest(index, indexType, docId, dataMap).source(dataMap), indexResponseActionListener);
    }

API解释:  

 

IndexRequest request = new IndexRequest(
        "posts",  // 索引 Index
        "doc",  // Type 
        "1");  // 文档 Document Id 
String jsonString = "{" +
        "\"user\":\"kimchy\"," +
        "\"postDate\":\"2013-01-30\"," +
        "\"message\":\"trying out Elasticsearch\"" +
        "}";
request.source(jsonString, XContentType.JSON); // 文档源格式为 json string

Document Source
document source 可以是下面的格式

本文作者:张永清,转载请注明出处:Elasticsearch Java API 很全的整理

Map类型的输入:

Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("user", "kimchy");
jsonMap.put("postDate", new Date());
jsonMap.put("message", "trying out Elasticsearch");
IndexRequest indexRequest = new IndexRequest("posts", "doc", "1")
        .source(jsonMap);  // 会自动将 Map 转换为 JSON 格式

XContentBuilder : 这是 Document Source 提供的帮助类,专门用来产生 json 格式的数据:

XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
    builder.field("user", "kimchy");
    builder.timeField("postDate", new Date());
    builder.field("message", "trying out Elasticsearch");
}
builder.endObject();
IndexRequest indexRequest = new IndexRequest("posts", "doc", "1")
        .source(builder);

Object 键对:

IndexRequest indexRequest = new IndexRequest("posts", "doc", "1")
        .source("user", "kimchy",
                "postDate", new Date(),
                "message", "trying out Elasticsearch"); 

同步索引:

IndexResponse indexResponse = client.index(request);

异步索引:异步执行函数需要添加 listener, 而对于 index 而言,这个 listener 的类型就是 ActionListener

client.indexAsync(request, listener); 

异步方法执行后会立刻返回,在索引操作执行完成后,ActionListener 就会被回调:

执行成功,调用 onResponse 函数
执行失败,调用 onFailure 函数

ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() {
    @Override
    public void onResponse(IndexResponse indexResponse) {
        
    }

    @Override
    public void onFailure(Exception e) {
        
    }
};

IndexResponse:
不管是同步回调还是异步回调,如果调用成功,都会返回 IndexRespose 对象。 

String index = indexResponse.getIndex();
String type = indexResponse.getType();
String id = indexResponse.getId();
long version = indexResponse.getVersion();
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
   // 文档第一次创建 
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
   // 文档之前已存在,当前是重写
}
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
    // 成功的分片数量少于总分片数量 
}
if (shardInfo.getFailed() > 0) {
    for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
        String reason = failure.reason();  // 处理潜在的失败信息
    }
}

在索引时有版本冲突的话,会抛出 ElasticsearchException

IndexRequest request = new IndexRequest("posts", "doc", "1")
        .source("field", "value")
        .version(1); // 这里是文档版本号
try {
    IndexResponse response = client.index(request);
} catch(ElasticsearchException e) {
    if (e.status() == RestStatus.CONFLICT) {
       // 冲突了 
    }
}

如果将 opType 设置为 create, 而且如果索引的文档与已存在的文档在 index, type 和 id 上均相同,也会抛出冲突异常。

IndexRequest request = new IndexRequest("posts", "doc", "1")
        .source("field", "value")
        .opType(DocWriteRequest.OpType.CREATE);
try {
    IndexResponse response = client.index(request);
} catch(ElasticsearchException e) {
    if (e.status() == RestStatus.CONFLICT) {
        
    }
}

2、GET API
GET 请求
每个 GET 请求都必须需传入下面 3 个参数:

  • Index
  • Type
  • Document id
GetRequest getRequest = new GetRequest(
        "posts", 
        "doc",  
        "1");  

可选参数
下面的参数都是可选的, 里面的选项并不完整,如要获取完整的属性,请参考 官方文档

不获取源数据,默认是获取的

request.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE); 

配置返回数据中包含指定字段

String[] includes = new String[]{"message", "*Date"};
String[] excludes = Strings.EMPTY_ARRAY;
FetchSourceContext fetchSourceContext =
        new FetchSourceContext(true, includes, excludes);
request.fetchSourceContext(fetchSourceContext); 

配置返回数据中排除指定字段

String[] includes = Strings.EMPTY_ARRAY;
String[] excludes = new String[]{"message"};
FetchSourceContext fetchSourceContext =
        new FetchSourceContext(true, includes, excludes);
request.fetchSourceContext(fetchSourceContext); 

实时 默认为 true

request.realtime(false);

版本

request.version(2); 

版本类型

request.versionType(VersionType.EXTERNAL);

同步执行

GetResponse getResponse = client.get(getRequest);

异步执行
此部分与 index 相似, 只有一点不同, 返回类型为 GetResponse

Get Response
返回的 GetResponse 对象包含要请求的文档数据(包含元数据和字段)

 

String index = getResponse.getIndex();
String type = getResponse.getType();
String id = getResponse.getId();
if (getResponse.isExists()) {
    long version = getResponse.getVersion();
    String sourceAsString = getResponse.getSourceAsString(); // string 形式   
    Map<String, Object> sourceAsMap = getResponse.getSourceAsMap(); // map 
    byte[] sourceAsBytes = getResponse.getSourceAsBytes(); // 字节形式 
} else {
   // 没有发现请求的文档 
}

在请求中如果包含特定的文档版本,如果与已存在的文档版本不匹配, 就会出现冲突

try {
    GetRequest request = new GetRequest("posts", "doc", "1").version(2);
    GetResponse getResponse = client.get(request);
} catch (ElasticsearchException exception) {
    if (exception.status() == RestStatus.CONFLICT) {
        // 版本冲突        
    }
}
封装好的参考方法:
  /**
     * @param index
     * @param indexType
     * @param docId
     * @param includes  返回需要包含的字段,可以传入空
     * @param excludes  返回需要不包含的字段,可以传入为空
     * @param excludes  version
     * @param excludes  versionType
     * @return
     * @throws IOException
     */

    public GetResponse getRequest(String index, String indexType, String docId, String[] includes, String[] excludes, Integer version, VersionType versionType) throws IOException {
        if (null == includes || includes.length == 0) {
            includes = Strings.EMPTY_ARRAY;
        }
        if (null == excludes || excludes.length == 0) {
            excludes = Strings.EMPTY_ARRAY;
        }
        GetRequest getRequest = new GetRequest(index, indexType, docId);
        FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes);
        getRequest.realtime(true);
        if (null != version) {
            getRequest.version(version);
        }
        if (null != versionType) {
            getRequest.versionType(versionType);
        }
        return getClient().get(getRequest.fetchSourceContext(fetchSourceContext));
    }

    /**
     * @param index
     * @param indexType
     * @param docId
     * @param includes
     * @param excludes
     * @return
     * @throws IOException
     */

    public GetResponse getRequest(String index, String indexType, String docId, String[] includes, String[] excludes) throws IOException {
        return getRequest(index, indexType, docId, includes, excludes, null, null);
    }

    /**
     * @param index
     * @param indexType
     * @param docId
     * @return
     * @throws IOException
     */
    public GetResponse getRequest(String index, String indexType, String docId) throws IOException {
        GetRequest getRequest = new GetRequest(index, indexType, docId);
        return getClient().get(getRequest);
    }

3、Exists API

如果文档存在 Exists API 返回 true, 否则返回 fasle。

Exists Request

GetRequest 用法和 Get API 差不多,两个对象的可选参数是相同的。由于 exists() 方法只返回 true 或者 false, 建议将获取 _source 以及任何存储字段的值关闭,尽量使请求轻量级。

GetRequest getRequest = new GetRequest(
    "posts",  // Index
    "doc",    // Type
    "1");     // Document id
getRequest.fetchSourceContext(new FetchSourceContext(false));  // 禁用 _source 字段
getRequest.storedFields("_none_"); // 禁止存储任何字段   

同步请求

boolean exists = client.exists(getRequest);

异步请求
异步请求与 Index API 相似,此处不赘述,只粘贴代码。如需详细了解,请参阅官方地址

ActionListener<Boolean> listener = new ActionListener<Boolean>() {
    @Override
    public void onResponse(Boolean exists) {
        
    }

    @Override
    public void onFailure(Exception e) {
        
    }
};

client.existsAsync(getRequest, listener); 

封装的参考方法:

   /**
     * @param index
     * @param indexType
     * @param docId
     * @return
     * @throws IOException
     */
    public Boolean existDoc(String index, String indexType, String docId) throws IOException {
        GetRequest getRequest = new GetRequest(index, indexType, docId);
        getRequest.fetchSourceContext(new FetchSourceContext(false));
        getRequest.storedFields("_none_");
        return getClient().exists(getRequest);
    }

4、Delete API

Delete Request
DeleteRequest 必须传入下面参数

DeleteRequest request = new DeleteRequest(
        "posts",   // index 
        "doc",     // doc
        "1");      // document id

可选参数
超时时间

request.timeout(TimeValue.timeValueMinutes(2)); 
request.timeout("2m"); 

刷新策略

request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); 
request.setRefreshPolicy("wait_for");    

版本

request.version(2); 

版本类型

request.versionType(VersionType.EXTERNAL); 
同步执行
DeleteResponse deleteResponse = client.delete(request);

异步执行

ActionListener<DeleteResponse> listener = new ActionListener<DeleteResponse>() {
    @Override
    public void onResponse(DeleteResponse deleteResponse) {
        
    }

    @Override
    public void onFailure(Exception e) {
        
    }
};


client.deleteAsync(request, listener);
Delete Response

 

DeleteResponse 可以检索执行操作的信息

String index = deleteResponse.getIndex();
String type = deleteResponse.getType();
String id = deleteResponse.getId();
long version = deleteResponse.getVersion();
ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
    // 成功分片数目小于总分片
}
if (shardInfo.getFailed() > 0) {
    for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
        String reason = failure.reason(); // 处理潜在失败
    }
}

也可以来检查文档是否存在

DeleteRequest request = new DeleteRequest("posts", "doc", "does_not_exist");
DeleteResponse deleteResponse = client.delete(request);
if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
    // 文档不存在
}
版本冲突时也会抛出 `ElasticsearchException

try {
    DeleteRequest request = new DeleteRequest("posts", "doc", "1").version(2);
    DeleteResponse deleteResponse = client.delete(request);
} catch (ElasticsearchException exception) {
    if (exception.status() == RestStatus.CONFLICT) {
        // 版本冲突
    }
}

封装好的参考方法:

本文作者:张永清,转载请注明出处:Elasticsearch Java API 很全的整理

  /**
     * @param index
     * @param indexType
     * @param docId
     * @param timeValue
     * @param refreshPolicy
     * @param version
     * @param versionType
     * @return
     * @throws IOException
     */
    public DeleteResponse deleteDoc(String index, String indexType, String docId, TimeValue timeValue, WriteRequest.RefreshPolicy refreshPolicy, Integer version, VersionType versionType) throws IOException {
        DeleteRequest deleteRequest = new DeleteRequest(index, indexType, docId);
        if (null != timeValue) {
            deleteRequest.timeout(timeValue);
        }
        if (null != refreshPolicy) {
            deleteRequest.setRefreshPolicy(refreshPolicy);
        }
        if (null != version) {
            deleteRequest.version(version);
        }
        if (null != versionType) {
            deleteRequest.versionType(versionType);
        }
        return getClient().delete(deleteRequest);
    }

    /**
     * @param index
     * @param indexType
     * @param docId
     * @return
     * @throws IOException
     */
    public DeleteResponse deleteDoc(String index, String indexType, String docId) throws IOException {
        return deleteDoc(index, indexType, docId, null, null, null, null);
    }

5、Update API

Update Request
UpdateRequest 的必需参数如下

UpdateRequest request = new UpdateRequest(
        "posts",  // Index
        "doc",  // 类型
        "1");   // 文档 Id

使用脚本更新

部分文档更新:
在更新部分文档时,已存在文档与部分文档会合并。

部分文档可以有以下形式:

JSON 格式:

UpdateRequest request = new UpdateRequest("posts", "doc", "1");
String jsonString = "{" +
        "\"updated\":\"2017-01-01\"," +
        "\"reason\":\"daily update\"" +
        "}";
request.doc(jsonString, XContentType.JSON); 

Map 格式:

Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("updated", new Date());
jsonMap.put("reason", "daily update");
UpdateRequest request = new UpdateRequest("posts", "doc", "1")
        .doc(jsonMap); 

XContentBuilder 对象:

XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
    builder.timeField("updated", new Date());
    builder.field("reason", "daily update");
}
builder.endObject();
UpdateRequest request = new UpdateRequest("posts", "doc", "1")
        .doc(builder);  
Object key-pairs

UpdateRequest request = new UpdateRequest("posts", "doc", "1")
        .doc("updated", new Date(),
             "reason", "daily update"); 

Upserts:如果文档不存在,可以使用 upserts 方法将文档以新文档的方式创建。

UpdateRequest request = new UpdateRequest("posts", "doc", "1")
        .doc("updated", new Date(),
             "reason", "daily update"); 

upserts 方法支持的文档格式与 update 方法相同。

可选参数:
超时时间

request.timeout(TimeValue.timeValueSeconds(1)); 
request.timeout("1s"); 

刷新策略

request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); 
request.setRefreshPolicy("wait_for");  

冲突后重试次数

request.retryOnConflict(3);

获取数据源,默认是开启的

request.fetchSource(true); 

包括特定字段

String[] includes = new String[]{"updated", "r*"};
String[] excludes = Strings.EMPTY_ARRAY;
request.fetchSource(new FetchSourceContext(true, includes, excludes)); 

排除特定字段

String[] includes = Strings.EMPTY_ARRAY;
String[] excludes = new String[]{"updated"};
request.fetchSource(new FetchSourceContext(true, includes, excludes)); 

指定版本

request.version(2); 

禁用 noop detection

request.scriptedUpsert(true); 

 

设置如果更新的文档不存在,就必须要创建一个

request.docAsUpsert(true); 

同步执行

UpdateResponse updateResponse = client.update(request);

异步执行

ActionListener<UpdateResponse> listener = new ActionListener<UpdateResponse>() {
    @Override
    public void onResponse(UpdateResponse updateResponse) {
        
    }

    @Override
    public void onFailure(Exception e) {
        
    }
};

client.updateAsync(request, listener); 

Update Response

String index = updateResponse.getIndex();
String type = updateResponse.getType();
String id = updateResponse.getId();
long version = updateResponse.getVersion();
if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
    // 文档已创建
} else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
    // 文档已更新
} else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
    // 文档已删除
} else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
    // 文档不受更新的影响
}

如果在 UpdateRequest 中使能了获取源数据,响应中则包含了更新后的源文档信息。

GetResult result = updateResponse.getGetResult(); 
if (result.isExists()) {
    String sourceAsString = result.sourceAsString();  // 将获取的文档以 string 格式输出
    Map<String, Object> sourceAsMap = result.sourceAsMap(); // 以 Map 格式输出
    byte[] sourceAsBytes = result.source();  // 字节形式
} else {
    // 默认情况下,不会返回文档源数据
}

也可以检测是否分片失败

ReplicationResponse.ShardInfo shardInfo = updateResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
    // 成功的分片数量小于总分片数量
}
if (shardInfo.getFailed() > 0) {
    for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
        String reason = failure.reason(); // 得到分片失败的原因
    }
}

如果在执行 UpdateRequest 时,文档不存在,响应中会包含 404 状态码,而且会抛出 ElasticsearchException 。

UpdateRequest request = new UpdateRequest("posts", "type", "does_not_exist")
        .doc("field", "value");
try {
    UpdateResponse updateResponse = client.update(request);
} catch (ElasticsearchException e) {
    if (e.status() == RestStatus.NOT_FOUND) {
        // 处理文档不存在的情况
    }
}

如果版本冲突,也会抛出 ElasticsearchException

UpdateRequest request = new UpdateRequest("posts", "doc", "1")
        .doc("field", "value")
        .version(1);
try {
    UpdateResponse updateResponse = client.update(request);
} catch(ElasticsearchException e) {
    if (e.status() == RestStatus.CONFLICT) {
        // 处理版本冲突的情况
    }
}

封装好的参考方法:

   /**
     * @param index
     * @param indexType
     * @param docId
     * @param dataMap
     * @param timeValue
     * @param refreshPolicy
     * @param version
     * @param versionType
     * @param docAsUpsert
     * @param includes
     * @param excludes
     * @return
     * @throws IOException
     */
    public UpdateResponse updateDoc(String index, String indexType, String docId, Map<String, Object> dataMap, TimeValue timeValue, WriteRequest.RefreshPolicy refreshPolicy, Integer version, VersionType versionType, Boolean docAsUpsert, String[] includes, String[] excludes) throws IOException {
        UpdateRequest updateRequest = new UpdateRequest(index, indexType, docId);
        updateRequest.doc(dataMap);
        if (null != timeValue) {
            updateRequest.timeout(timeValue);
        }
        if (null != refreshPolicy) {
            updateRequest.setRefreshPolicy(refreshPolicy);
        }
        if (null != version) {
            updateRequest.version(version);
        }
        if (null != versionType) {
            updateRequest.versionType(versionType);
        }
        updateRequest.docAsUpsert(docAsUpsert);
        //冲突时重试的次数
        updateRequest.retryOnConflict(3);
        if (null == includes && null == excludes) {
            return getClient().update(updateRequest);
        } else {
            if (null == includes || includes.length == 0) {
                includes = Strings.EMPTY_ARRAY;
            }
            if (null == excludes || excludes.length == 0) {
                excludes = Strings.EMPTY_ARRAY;
            }
            return getClient().update(updateRequest.fetchSource(new FetchSourceContext(true, includes, excludes)));
        }
    }

    /**
     * 更新时不存在就插入
     *
     * @param index
     * @param indexType
     * @param docId
     * @param dataMap
     * @return
     * @throws IOException
     */
    public UpdateResponse upDdateocAsUpsert(String index, String indexType, String docId, Map<String, Object> dataMap) throws IOException {
        return updateDoc(index, indexType, docId, dataMap, null, null, null, null, true, null, null);
    }

    /**
     * 存在才更新
     *
     * @param index
     * @param indexType
     * @param docId
     * @param dataMap
     * @return
     * @throws IOException
     */
    public UpdateResponse updateDoc(String index, String indexType, String docId, Map<String, Object> dataMap) throws IOException {
        return updateDoc(index, indexType, docId, dataMap, null, null, null, null, false, null, null);
    }

 

6、Bulk API 批量处理

批量请求
使用 BulkRequest 可以在一次请求中执行多个索引,更新和删除的操作。

BulkRequest request = new BulkRequest();  
request.add(new IndexRequest("posts", "doc", "1")  
        .source(XContentType.JSON,"field", "foo")); // 将第一个 IndexRequest 添加到批量请求中
request.add(new IndexRequest("posts", "doc", "2")  
        .source(XContentType.JSON,"field", "bar")); // 第二个
request.add(new IndexRequest("posts", "doc", "3")  
        .source(XContentType.JSON,"field", "baz")); // 第三个

在同一个 BulkRequest 也可以添加不同的操作类型

BulkRequest request = new BulkRequest();
request.add(new DeleteRequest("posts", "doc", "3")); 
request.add(new UpdateRequest("posts", "doc", "2") 
        .doc(XContentType.JSON,"other", "test"));
request.add(new IndexRequest("posts", "doc", "4")  
        .source(XContentType.JSON,"field", "baz"));

可选参数
超时时间

request.timeout(TimeValue.timeValueMinutes(2)); 
request.timeout("2m"); 

刷新策略

request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); 
request.setRefreshPolicy("wait_for"); 

设置在批量操作前必须有几个分片处于激活状态

 

request.waitForActiveShards(2); 
request.waitForActiveShards(ActiveShardCount.ALL);  // 全部分片都处于激活状态
request.waitForActiveShards(ActiveShardCount.DEFAULT);  // 默认
request.waitForActiveShards(ActiveShardCount.ONE);  // 一个

同步请求

BulkResponse bulkResponse = client.bulk(request);

异步请求

ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>很全很全的javascript模块讲解

模块通常是指编程语言所提供的代码组织机制,利用此机制可将程序拆解为独立且通用的代码单元。所谓模块化主要是解决代码分割、作用域隔离、模块之间的依赖管理以及发布到生产环境时的自动化打包与处理等多个方面。模... 查看详情

python-cheatsheet,一款很全的python小抄库(代码片段)

最近开始整理自己写过的系列文章,发现草稿箱竟然有好多存货,比如这一篇,原计划2020年11月发布,好家伙,这都2021年8月了。最近这两天先补齐一下吧,本篇文章属于《Python那些库儿》专栏。今天要推... 查看详情

python-cheatsheet,一款很全的python小抄库(代码片段)

最近开始整理自己写过的系列文章,发现草稿箱竟然有好多存货,比如这一篇,原计划2020年11月发布,好家伙,这都2021年8月了。最近这两天先补齐一下吧,本篇文章属于《Python那些库儿》专栏。今天要推... 查看详情

关于三维重建很全的思维导图总结,有各算法的链接

最近博主对自己所了解的三维重建整个模块的知识,做了一个整理,画了个思维导图,希望能够对大家有所帮助。了解有限,势必有错误和纰漏,还请大家批评指正,后续我在掌握更多新的知识后会不断完... 查看详情

awesome-go:很全的go语言资源合集

参考技术Aawesome-go:一个很全的go语言框架,库,软件合集前面发过关于awsone-python,awsonedjango,flask。最近在学习golang,所以找到awsone-go由于内容太多,这里只是列出主要的目录,每一项下面又有很多内容。具体详细的内容,请到官... 查看详情

node初探(很全的helloworld工程)

1、使用node的简单体会    这两天稍微学了一下node,体会了一下传说中的异步编程语言,然后写了个简单的小demo。    node给我的感觉首先是短小精悍,开启一个服务器竟然只需要短短的几行代码,使用现成的... 查看详情

github掘金:很全的java权限认证框架!(代码片段)

点击上方关注“终端研发部”设为“星标”,和你一起掌握更多数据库知识来源:GitHub上sa-token 项目今天给大家推荐的这个开源项目超级棒,可能是史上功能最全的Java权限认证框架!这个开源项目就是:sa-tok... 查看详情

mixly2.0(米思齐)软件离线版介绍(很全的功能介绍说明)

米思齐最新版本软件怎么下载?怎么安装?怎么更新?界面上各种图标都是什么意思?最新版本和之前相比有哪些新的功能?……恭喜你找到了这里,对于刚才的问题下面的内容将会一一解答(内容篇幅很长,看不完记得收藏或... 查看详情

99%的人看了它都会说这是一篇很全的tomcat服务❤️❤️[⭐建议收藏⭐](代码片段)

文章目录🔥Tomcatjava服务器详解关于作者一、Tomcat服务器详解1.Tomcat简介2.tomcat项目部署2.1部署实验环境2.2java环境安装2.3Tomcat部署3.Tomcat目录介绍4.Tomcat配置文件5.Tomcat配置环境变量6.Tomcat端口介绍7.WEB站点部署--将java代码打包... 查看详情

99%的人看了它都会说这是一篇很全的tomcat服务❤️❤️[⭐建议收藏⭐](代码片段)

文章目录🔥Tomcatjava服务器详解关于作者一、Tomcat服务器详解1.Tomcat简介2.tomcat项目部署2.1部署实验环境2.2java环境安装2.3Tomcat部署3.Tomcat目录介绍4.Tomcat配置文件5.Tomcat配置环境变量6.Tomcat端口介绍7.WEB站点部署--将java代码打包... 查看详情

elasticsearchjavaapi:index创建删除cluster管理

ElasticsearchJavaAPI(二):index创建删除cluster管理 elastic官网有权威的javaapi英文的需要耐心看这里整理下基本操作创建maven工程添加依赖<dependency><groupId>org.elasticsearch.client</groupId><artifactId>transport&l 查看详情

spark资料下载

很全的spark资料下载,包含pdf书籍和培训学校视频教程,1.spark多语言编程:spark多语言开发2.tachyon:tachyon3.sparkR:sparkR所有内容请点击:所有内容其它正在整理上传中 查看详情

通宵整理出来的springboot笔记(很全)(代码片段)

SpringBoot概述SpringBoot提供了一种快速使用Spring的方式,基于约定优于配置的思想,可以让开发人员不必在配置与逻辑业务之间进行思维的切换,全身心的投入到逻辑业务的代码编写中,从而大大提高了开发的效率Sp... 查看详情

求希腊神话诸神的族谱图!要很全的很详细的最好有链接的

希腊神话中的12主神1、宙斯Zeus:天神之父,也是众神之神,地上万物的最高统治者。用雷霆和叫做“埃奎斯”的神盾治理天和地。同时,宙斯还是个花心大萝卜,到处拈花惹草使他的妻子赫拉嫉妒。2、赫拉Hera:宙斯的妻子,神... 查看详情

比较全的常见的架构设计思想整理

...计算,作为整体提供数据库服务。非共享数据库集群有完全的可伸缩性、高可用、高性能、优秀的性价比、资源共享等优 查看详情

前端flex布局

关于flex布局很全的一个博客网址 查看详情

elasticsearchjavaapi的使用

elasticsearchAPI集合  ElasticsearchJavaAPI的使用(7)—多条件查询 查看详情

java书单-比较全的一篇

...方便他人,不用去各个地方找来找去;虽然不是很全,但是核心的都已经包含了,后续也会慢慢往上加,加油吧。健康《程序员健康指南》-本书是为程序员量身制作的健康指南,针对头痛、眼部疲劳、背... 查看详情