API 使用

Easysearch Java API Client 使用文档 #

管理索引 #

使用客户端对索引进行管理

String index = "test1";
if (client.indices().exists(r -> r.index(index)).value()) {
    LOGGER.info("Deleting index " + index);
    DeleteIndexResponse deleteIndexResponse = client.indices().delete(new DeleteIndexRequest.Builder().index(index).build());
    LOGGER.info(deleteIndexResponse.toString());

}
LOGGER.info("Creating index " + index);
CreateIndexResponse createIndexResponse = client.indices().create(req -> req.index(index));
CloseIndexResponse closeIndexResponse = client.indices().close(req -> req.index(index));
OpenResponse openResponse = client.indices().open(req -> req.index(index));
RefreshResponse refreshResponse = client.indices().refresh(req -> req.index(index));
FlushResponse flushResponse = client.indices().flush(req -> req.index(index));
ForcemergeResponse forcemergeResponse = client.indices().forcemerge(req -> req.index(index).maxNumSegments(1L));

也可以用异步方式执行

        EasysearchAsyncClient asyncClient = SampleClient.createAsyncClient();
        asyncClient.indices().exists(req -> req.index(index)).thenCompose(exists -> {
                if (exists.value()) {
                    LOGGER.info("Deleting index " + index);
                    return asyncClient.indices().delete(r -> r.index(index)).thenAccept(deleteResponse -> {
                        LOGGER.info(deleteResponse);
                    });
                }
                return CompletableFuture.completedFuture(null);
            }).thenCompose(v -> {
                LOGGER.info("Creating index " + index);
                return asyncClient.indices().create(req -> req.index(index));

            }).whenComplete((createResponse, throwable) -> {
                if (throwable != null) {
                    LOGGER.error("Error during index operations", throwable);
                } else {
                    LOGGER.info("Index created successfully");
                }
            })
            .get(30, TimeUnit.SECONDS);

滚动索引 Rollover #

滚动索引是一种管理时序数据的有效方式,当索引满足特定条件时(如大小、文档数量、年龄),会自动创建新的索引。

示例

// 创建初始索引并设置别名
String index = "test-00001";
client.indices().create(req -> req.index(index).aliases("test_log", a -> a.isWriteIndex(true)));

// 配置并执行滚动
 RolloverResponse res = client.indices().rollover(req -> req
            .alias("test_log")    // 指定别名
            .conditions(c -> c
                .maxDocs(100L)    // 文档数量达到100时滚动
                .maxAge(b -> b.time("7d"))    // 索引时间达到7天时滚动
                .maxSize("5gb"))); // 索引大小达到5GB时滚动

Mapping 设置 #

基本设置

String index = "test1";
PutMappingResponse response = client.indices().putMapping(req -> req.index(index)
    .properties("field1", p -> p.keyword(k -> k))         // keyword类型
    .properties("field2", p -> p.text(t -> t))            // text类型
);

更完整的示例,包含多种常用字段类型

response = client.indices().putMapping(req -> req.index(index)
    // 常用字段类型示例
    .properties("keyword_field", p -> p.keyword(k -> k))           // keyword 类型
    .properties("text_field", p -> p.text(t -> t))                // text 类型
    .properties("long_field", p -> p.long_(l -> l))               // long 类型
    .properties("integer_field", p -> p.integer(i -> i))          // integer 类型
    .properties("short_field", p -> p.short_(s -> s))             // short 类型
    .properties("byte_field", p -> p.byte_(b -> b))               // byte 类型
    .properties("double_field", p -> p.double_(d -> d))           // double 类型
    .properties("float_field", p -> p.float_(f -> f))             // float 类型
    .properties("date_field", p -> p.date(d -> d))                // date 类型
    .properties("boolean_field", p -> p.boolean_(b -> b))         // boolean 类型
    .properties("binary_field", p -> p.binary(b -> b))            // binary 类型
    .properties("ip_field", p -> p.ip(i -> i))                    // ip 类型
    .properties("geo_point_field", p -> p.geoPoint(g -> g))       // geo_point 类型
    .properties("flat_field", p -> p.flattened(f -> f))           // flattened 类型

    
    // date 类型
    .properties("date_field", p -> p.date(d -> d.format("yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis")))
    
    // object 类型
    .properties("object_field", p -> p.object(o -> o
            .properties("sub_field1", sp -> sp.keyword(k -> k))
            .properties("sub_field2", sp -> sp.long_(l -> l))
        )
    )
    
    // nested 类型
    .properties("nested_field", p -> p.nested(n -> n
            .properties("sub_field1", sp -> sp.keyword(k -> k))
            .properties("sub_field2", sp -> sp.text(t -> t))
        )
    )
);

常用字段类型配置

response = client.indices().putMapping(req -> req.index(index)
        // text 类型配置
        .properties("text_field2", p -> p
            .text(t -> t
                .analyzer("standard")                    // 分析器
                .searchAnalyzer("standard")              // 搜索分析器
                .fields("raw", f -> f.keyword(k -> k))    // 添加keyword子字段
                .copyTo("other_field")                   // 复制到其他字段
            )
        )

        // keyword 类型配置
        .properties("keyword_field2", p -> p
            .keyword(k -> k
                .ignoreAbove(256)                             // 忽略超过长度的值
                .nullValue("NULL")                            // null值替换
                .docValues(true)                              // 是否开启doc_values
            )
        )

        // date 类型配置
        .properties("date_field2", p -> p
            .date(d -> d
                .format("yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis")  // 日期格式
            )
        )
);

更新索引 Settings #

可以用JSON 的方式更新 特定索引的设置

 String settings = "{" +
            "    \"index\": {" +
            "        \"number_of_replicas\": 2," +
            "        \"refresh_interval\": \"5s\"," +
            "        \"max_result_window\": 50000," +
            "        \"analysis\": {\" +
            "            \"analyzer\": {" +
            "                \"my_analyzer\": {" +
            "                    \"type\": \"custom\"," +
            "                    \"tokenizer\": \"standard\"," +
            "                    \"filter\": [\"lowercase\", \"asciifolding\"]" +
            "                }" +
            "            }" +
            "        }" +
            "    }" +
            "}";
            
// 如果要更新静态设置(比如分词器),需要先关闭索引
String index = "test1";
client.indices().close(c -> c.index(index));
// 更新设置
client.indices().putSettings(req -> req
    .index(index)
    .withJson(new StringReader(settings))
);
// 重新打开索引
client.indices().open(c -> c.index(index));

创建带有自定义映射和设置的索引 #

使用 Easysearch Java 客户端创建带有自定义映射和设置的索引。

使用 JSON 配置

String settingsJson = """
    {
      "settings": {
        "number_of_shards": 1,
        "number_of_replicas": 1,
        "analysis": {
          "analyzer": {
            "my_analyzer": {
              "type": "custom",
              "tokenizer": "standard",
              "filter": ["lowercase", "asciifolding"]
            }
          }
        }
      },
      "mappings": {
        "properties": {
          "id": {
            "type": "keyword"
          },
          "title": {
            "type": "text",
            "analyzer": "my_analyzer"
          },
          "content": {
            "type": "text"
          },
          "status": {
            "type": "keyword"
          },
          "tags": {
            "type": "keyword"
          },
          "create_time": {
            "type": "date",
            "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
          },
          "update_time": {
            "type": "date",
            "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
          },
          "view_count": {
            "type": "long"
          },
          "price": {
            "type": "double"
          }
        }
      }
    }
    """;

// 创建索引
client.indices().create(req -> req
    .index("my_index")
    .withJson(new StringReader(settingsJson))
);

创建索引模板 #

使用 Easysearch Java 客户端创建索引模板。

String templateJson = "{"
    + "  \"index_patterns\": [\"log-*\"], "
    + "  \"template\": {"
    + "    \"settings\": {"
    + "      \"number_of_shards\": 1,"
    + "      \"number_of_replicas\": 1,"
    + "      \"refresh_interval\": \"5s\","
    + "      \"analysis\": {"
    + "        \"analyzer\": {"
    + "          \"my_analyzer\": {"
    + "            \"type\": \"custom\","
    + "            \"tokenizer\": \"standard\","
    + "            \"filter\": [\"lowercase\"]"
    + "          }"
    + "        }"
    + "      }"
    + "    },"
    + "    \"mappings\": {"
    + "      \"_source\": {"
    + "        \"enabled\": true"
    + "      },"
    + "      \"properties\": {"
    + "        \"@timestamp\": {"
    + "          \"type\": \"date\""
    + "        },"
    + "        \"message\": {"
    + "          \"type\": \"text\","
    + "          \"analyzer\": \"my_analyzer\""
    + "        },"
    + "        \"level\": {"
    + "          \"type\": \"keyword\""
    + "        },"
    + "        \"service\": {"
    + "          \"type\": \"keyword\""
    + "        },"
    + "        \"trace_id\": {"
    + "          \"type\": \"keyword\""
    + "        },"
    + "        \"metrics\": {"
    + "          \"type\": \"object\","
    + "          \"properties\": {"
    + "            \"value\": {"
    + "              \"type\": \"double\""
    + "            },"
    + "            \"name\": {"
    + "              \"type\": \"keyword\""
    + "            }"
    + "          }"
    + "        }"
    + "      }"
    + "    }"
    + "  },"
    + "  \"priority\": 100"
    + "}";

// 创建或更新模板
PutIndexTemplateRequest request = PutIndexTemplateRequest.of(builder -> builder
    .name("logs-template")                 // 模板名称
    .withJson(new StringReader(templateJson))
);

client.indices().putIndexTemplate(request);

Bulk 批量写入 #

EasysearchClient client = SampleClient.create();
String json2 = "{"
        + "    \"@timestamp\": \"2023-01-08T22:50:13.059Z\","
        + "    \"agent\": {"
        + "      \"version\": \"7.3.2\","
        + "      \"type\": \"filebeat\","
        + "      \"ephemeral_id\": \"3ff1f2c8-1f7f-48c2-b560-4272591b8578\","
        + "      \"hostname\": \"ba-0226-msa-fbl-747db69c8d-ngff6\""
        + "    }"
        + "}";
        
BulkRequest.Builder br = new BulkRequest.Builder();
for (int i = 0; i < 10; i++) {
    br.operations(op -> op.index(idx -> idx.index(indexName).document(JsonData.fromJson(json2))));
}
BulkResponse bulkResponse = client.bulk(br.build());
if (bulkResponse.errors()) {
    for (BulkResponseItem item : bulkResponse.items()) {
        System.out.println(item.toString());
    }
}

索引单个文档 #

String json2 = "{"
    + "    \"@timestamp\": \"2023-01-08T22:50:13.059Z\","
    + "    \"agent\": {"
    + "      \"version\": \"7.3.2\","
    + "      \"type\": \"filebeat\","
    + "      \"ephemeral_id\": \"3ff1f2c8-1f7f-48c2-b560-4272591b8578\","
    + "      \"hostname\": \"ba-0226-msa-fbl-747db69c8d-ngff6\""
    + "    }"
    + "}";

IndexRequest<JsonData> request = IndexRequest.of(i -> i
    .index("logs")
    .withJson(new StringReader(json2))
);
IndexResponse response = client.index(request);
System.out.println(response);

// 也可以这样
LogEntry logEntry = mapper.readValue(json2, LogEntry.class);
IndexRequest<LogEntry> request2 = IndexRequest.of(i -> i
    .index(indexName)
    .id(logEntry.getAgent().getEphemeralId())
    .document(logEntry)
);
IndexResponse response2 = client.index(request2);

// 或者这样
IndexRequest.Builder<LogEntry> indexReqBuilder = new IndexRequest.Builder<>();
indexReqBuilder.index(indexName);
indexReqBuilder.id(logEntry.getAgent().getEphemeralId());
indexReqBuilder.document(logEntry);
response2 = client.index(indexReqBuilder.build());

删除文档 #

DeleteRequest deleteRequest = new DeleteRequest.Builder()
    .index(indexName)
    .id("3ff1f2c8-1f7f-48c2-b560-4272591b8578")
    .build();
DeleteResponse response = client.delete(deleteRequest);

deleteByQuery 删除 #

DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest.Builder()
    .index(indexName)
    .query(q -> q.match(new MatchQuery.Builder()
        .field("agent.type").query("filebeat").build())
    ).build();
DeleteByQueryResponse response = client.deleteByQuery(deleteByQueryRequest);

更新文档 #

UpdateRequest updateRequest = UpdateRequest.of(u -> u
    .index(indexName)
    .id("3ff1f2c8-1f7f-48c2-b560-4272591b8578")
    .doc(Map.of("agent.type", "logstash"))
);
UpdateResponse<Map<String, Object>> response = client.update(updateRequest, Map.class);

updateByQuery 更新 #

Query query = Query.of(q -> q
    .term(t -> t
        .field("agent.type")
        .value(v -> v.stringValue("filebeat"))
    )
);
UpdateByQueryRequest updateByQueryRequest = UpdateByQueryRequest.of(u -> u
    .index(indexName).query(query).script(s -> s.inline(in ->
        in.source("ctx._source.agent.type = params.param1")
            .lang("painless")
            .params(Map.of("param1", JsonData.of("logstash"))))).refresh(true)
);
UpdateByQueryResponse response = client.updateByQuery(updateByQueryRequest);
System.out.println(response.updated());

搜索文档 #

Query query = Query.of(q -> q
    .term(t -> t
        .field("agent.type")
        .value(v -> v.stringValue("filebeat"))
    )
);

SortOptions.Builder sb = new SortOptions.Builder();
SortOptions sortOptions = sb.field(fs -> fs.field("@timestamp").order(SortOrder.Desc)).build();

final SearchRequest.Builder searchReq = new SearchRequest.Builder().allowPartialSearchResults(false)
    .index(indexName)
    .size(10)
    .sort(sortOptions)
    .source(sc -> sc.fetch(true))
    .trackTotalHits(tr -> tr.enabled(true))
    .query(query);

SearchResponse<LogEntry> searchResponse = client.search(searchReq.build(), LogEntry.class);
System.out.println(searchResponse.hits().total());
for (Hit<LogEntry> hit : searchResponse.hits().hits()) {
    System.out.println(JsonData.of(hit.source()).toJson(new JacksonJsonpMapper()));
}

带子聚合的日期直方图聚合 #

SearchRequest searchRequest = SearchRequest.of(s -> s
    .index(index)
    .size(0)  // 不需要返回文档,只要聚合结果
    .aggregations("by_date", a -> a
        .dateHistogram(dh -> dh
            .field("create_time")
            .calendarInterval(CalendarInterval.Month) // 按月聚合数据
            .format("yyyy-MM-dd")
            .minDocCount(1)
        ).aggregations("avg_price", avg -> avg // 计算每月的平均价格
            .avg(a1 -> a1
                .field("price")
            )
        ).aggregations("avg_view_count", avg -> avg // 计算每月的平均浏览次数
            .avg(a1 -> a1
                .field("view_count")
            )
        ).aggregations("by_status", terms -> terms // 统计每月不同状态的文档数量
            .terms(t -> t
                .field("status")
                .size(10)
            )
        ).aggregations("price_stats", stats -> stats // 计算价格的统计信息(最小值、最大值、平均值、总和)
            .stats(s1 -> s1
                .field("price")
            )
        )
    ));
    
SearchResponse<Void> response = client.search(searchRequest, Void.class);
// 处理聚合结果
response.aggregations()
    .get("by_date")
    .dateHistogram()
    .buckets()
    .array()
    .forEach(bucket -> {
        // 基本信息
        System.out.printf("\n日期: %s (文档数: %d)\n",
            bucket.keyAsString(),
            bucket.docCount());

        // 平均值
        System.out.printf("平均价格: %.2f\n",
            bucket.aggregations().get("avg_price").avg().value());
        System.out.printf("平均浏览: %.2f\n",
            bucket.aggregations().get("avg_view_count").avg().value());

        // 价格统计
        StatsAggregate stats = bucket.aggregations().get("price_stats").stats();
        System.out.printf("价格统计: 最小值=%.2f, 最大值=%.2f, 平均值=%.2f\n",
            stats.min(), stats.max(), stats.avg());

        // 状态分布
        System.out.println("状态分布:");
        bucket.aggregations()
            .get("by_status")
            .sterms()
            .buckets()
            .array()
            .forEach(status -> System.out.printf("  %s: %d\n",
                status.key().stringValue(),
                status.docCount()));

        System.out.println("----------------------------------------");
    });

Reindex #

ReindexResponse response = client.reindex(r -> r
    .source(s -> s.index("test1"))
    .dest(d -> d.index("test1_new_index"))
    .script(sc -> sc
        .inline(i -> i
            .source(
                "if (ctx._source.price != null) { " +
                    "    ctx._source.price *= 1.1; " +  // 价格上调10%
                    "    ctx._source.updated = true; " +
                    "}"
            )
            .lang("painless")
        )
    )
);

异步方式 Reindex #


        ReindexResponse response = client.reindex(r -> r
            .source(s -> s.index("test1"))
            .dest(d -> d.index("test1_new_index"))
            .script(sc -> sc
                .inline(i -> i
                    .source(
                        "if (ctx._source.price != null) { " +
                            "    ctx._source.price *= 1.1; " +  // 价格上调10%
                            "    ctx._source.updated = true; " +
                            "}"
                    )
                    .lang("painless")
                )
            ).waitForCompletion(false)
        );
        String taskId = response.task();
        System.out.println("Started reindex task: " + taskId);
        // 监控任务进度
        boolean completed = false;
        while (!completed) {
            try {
                Thread.sleep(1000); // 每1秒检查一次
                GetTasksResponse taskResponse = client.tasks().get(g -> g.taskId(taskId).waitForCompletion(false));
                Info taskInfo = taskResponse.task();
                if (taskInfo == null) {
                    // 任务可能已完成
                    System.out.println("Task completed or not found");
                    break;
                }

                // 获取任务状态
                JsonData status = taskInfo.status();
                if (status != null) {
                    System.out.println("Running time in millis: " + taskInfo.runningTimeInNanos() / 1_000_000L);
                    System.out.println("Current status: " + status.toJson());
                }

                if (taskResponse.completed()) {
                    completed = true;
                    System.out.println("Reindex completed successfully");

                    // 获取结果
                    JsonData result = taskInfo.status();
                    if (result != null) {
                        // 解析状态信息中的统计数据
                        String resultStr = result.toJson().toString();
                        System.out.println("Final result: " + resultStr);
                    }
                }

            } catch (Exception e) {
                e.printStackTrace();
                break;
            }
        }

附测试用例 #

public class APITest {

    EasysearchClient client;
    ObjectMapper mapper = new ObjectMapper();
    String indexName = "test1";

    {
        try {
            client = SampleClient.create();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
    
    
    // 创建索引并通过 JSON 设置 mappings 和 settings
    @Test
    public void testCreateIndexByJSON() throws IOException {
        String index = "test1";
        ESVersionInfo version = client.info().version();
        LOGGER.info("Server: " + version.number());
        if (client.indices().exists(r -> r.index(index)).value()) {
            LOGGER.info("Deleting index " + index);
            DeleteIndexResponse deleteIndexResponse = client.indices().delete(new DeleteIndexRequest.Builder().index(index).build());
            LOGGER.info(deleteIndexResponse.toString());

            LOGGER.info("Creating index " + index);

            // Settings and mappings in JSON format
            String settingsJson = "{\n" +
                "                  \"settings\": {\n" +
                "                    \"number_of_shards\": 1,\n" +
                "                    \"number_of_replicas\": 1,\n" +
                "                    \"analysis\": {\n" +
                "                      \"analyzer\": {\n" +
                "                        \"my_analyzer\": {\n" +
                "                          \"type\": \"custom\",\n" +
                "                          \"tokenizer\": \"standard\",\n" +
                "                          \"filter\": [\"lowercase\", \"asciifolding\"]\n" +
                "                        }\n" +
                "                      }\n" +
                "                    }\n" +
                "                  },\n" +
                "                  \"mappings\": {\n" +
                "                    \"properties\": {\n" +
                "                      \"id\": {\n" +
                "                        \"type\": \"keyword\"\n" +
                "                      },\n" +
                "                      \"title\": {\n" +
                "                        \"type\": \"text\",\n" +
                "                        \"analyzer\": \"my_analyzer\"\n" +
                "                      },\n" +
                "                      \"content\": {\n" +
                "                        \"type\": \"text\"\n" +
                "                      },\n" +
                "                      \"status\": {\n" +
                "                        \"type\": \"keyword\"\n" +
                "                      },\n" +
                "                      \"tags\": {\n" +
                "                        \"type\": \"keyword\"\n" +
                "                      },\n" +
                "                      \"create_time\": {\n" +
                "                        \"type\": \"date\",\n" +
                "                        \"format\": \"yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis\"\n" +
                "                      },\n" +
                "                      \"update_time\": {\n" +
                "                        \"type\": \"date\",\n" +
                "                        \"format\": \"yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis\"\n" +
                "                      },\n" +
                "                      \"view_count\": {\n" +
                "                        \"type\": \"long\"\n" +
                "                      },\n" +
                "                      \"price\": {\n" +
                "                        \"type\": \"double\"\n" +
                "                      }\n" +
                "                    }\n" +
                "                  }\n" +
                "                }";

            // Create index using JSON string
            CreateIndexResponse createIndexResponse = client.indices().create(req -> req
                .index(index)
                .withJson(new StringReader(settingsJson))
            );

            LOGGER.info(createIndexResponse.toString());
        }
    }

    @Test
    public void testCatHealth() throws IOException {
        ESVersionInfo version = client.info().version();
        System.out.println("Server: " + version.number());
        HealthResponse res = client.cat().health();
        System.out.println(res.toString());
    }

    @Test
    public void testCatIndices() throws IOException {
        ESVersionInfo version = client.info().version();
        System.out.println("Server: " + version.number());
        IndicesResponse res = client.cat().indices();
        System.out.println(res.toString());
    }

    @Test
    public void testCatNodes() throws IOException {
        ESVersionInfo version = client.info().version();
        System.out.println("Server: " + version.number());
        NodesResponse res = client.cat().nodes();
        System.out.println(res.toString());
    }


    @Test
    public void testScrollQuery() throws IOException {
        SortOptions.Builder sb = new SortOptions.Builder();
        SortOptions sortOptions = sb.field(fs -> fs.field("@timestamp").order(SortOrder.Desc)).build();

        Time time = Time.of(t -> t.time("1m"));
        final SearchRequest.Builder searchReq = new SearchRequest.Builder().allowPartialSearchResults(false)
            .index(Collections.singletonList(indexName)).size(10).sort(sortOptions).source(sc -> sc.fetch(true))
            .trackTotalHits(tr -> tr.enabled(true)).scroll(time).query(q -> q.matchAll(new MatchAllQuery.Builder().build()));

        SearchResponse<LogEntry> searchResponse = client.search(searchReq.build(), LogEntry.class);
        String scrollId = null;
        scrollId = searchResponse.scrollId();
        System.out.println(searchResponse.hits().total());
        for (Hit<LogEntry> hit : searchResponse.hits().hits()) {
            System.out.println("_id " + hit.id());
        }

        while (true) {
            String finalScrollId = scrollId;
            ScrollResponse<LogEntry> scrollResponse = client.scroll(sc -> sc.scrollId(finalScrollId).scroll(time), LogEntry.class);
            if (scrollResponse.hits().hits().isEmpty()) {
                break;
            }
            for (Hit<LogEntry> hit : scrollResponse.hits().hits()) {
                System.out.println("_id " + hit.id());
            }
            scrollId = scrollResponse.scrollId();
        }

        if (scrollId != null) {
            System.out.println("clear " + scrollId);
            String finalScrollId = scrollId;
            client.clearScroll(cs -> cs.scrollId(finalScrollId));
        }
    }

    @Test
    public void testQuery() {

        MatchQuery.Builder mb = new MatchQuery.Builder();
        mb.field("agent").query("xxx");
        Query query = Query.of(qb -> qb.match(mb.build()));

        Instant now = Instant.now();
        Instant threeDaysAgo = now.minus(Duration.ofDays(3));
        long threeDaysAgoMillis = threeDaysAgo.toEpochMilli();
        Query rangequery = RangeQuery.of(r -> r.field("@timestamp").lte(JsonData.of(now.toEpochMilli())).gte(JsonData.of(threeDaysAgoMillis)))._toQuery();
        Query boolquery = Query.of(qb -> qb.bool(b -> b.must(query).must(rangequery)));
        System.out.println(boolquery);
        
        MatchPhraseQuery.Builder builder = new MatchPhraseQuery.Builder();
        builder.field("agent").query("xxx");
        Query phraseQuery = Query.of(qb -> qb.matchPhrase(builder.build()));
        System.out.println(phraseQuery);
        
    }

    @Test
    public void testSearch() throws IOException {

        SortOptions.Builder sb = new SortOptions.Builder();
        SortOptions sortOptions = sb.field(fs -> fs.field("@timestamp").order(SortOrder.Desc)).build();
        final SearchRequest.Builder searchReq = new SearchRequest.Builder().allowPartialSearchResults(false)
            .index(Collections.singletonList(indexName)).size(10).sort(sortOptions).source(sc -> sc.fetch(true))
            .trackTotalHits(tr -> tr.enabled(true)).query(q -> q.matchAll(new MatchAllQuery.Builder().build()));
        SearchResponse<LogEntry> searchResponse = client.search(searchReq.build(), LogEntry.class);
        System.out.println(searchResponse.hits().total());
        for (Hit<LogEntry> hit : searchResponse.hits().hits()) {
            System.out.println(mapper.writeValueAsString(hit.source()));
        }
    }

    @Test
    public void testTermsAgg() throws IOException {
        MatchAllQuery.Builder mb = new MatchAllQuery.Builder();
        Query query = Query.of(qb -> qb.matchAll(mb.build()));
        final SearchRequest.Builder searchReq = new SearchRequest.Builder().allowPartialSearchResults(false)
            .index(Collections.singletonList(indexName)).query(query).size(0).trackTotalHits(tr -> tr.enabled(true))
            .aggregations("hostname_group", a -> a.terms(t -> t.field("agent.hostname.keyword")));

        SearchResponse<LogEntry> searchResponse = client.search(searchReq.build(), LogEntry.class);
        System.out.println(searchResponse.hits().total());
        List<StringTermsBucket> buckets = searchResponse.aggregations().get("hostname_group").sterms().buckets().array();
        for (StringTermsBucket bucket : buckets) {
            System.out.println(bucket.docCount() + " terms under " + bucket.key().stringValue());
        }
    }

    @Test
    public void testSingleDocument() throws IOException {
        Product product = new Product("bk-1", "City bike", 123.0);
        IndexRequest<Product> request = IndexRequest.of(i -> i.index("products").id(product.getSku()).document(product));

        IndexResponse response = client.index(request);

        System.out.println("Indexed with version " + response.version());
    }

    @Test
    public void testSingleDocumentDSLAsync() throws Exception {
        Product product = new Product("bk-1", "City bike", 123.0);

        EasysearchAsyncClient asyncClient = SampleClient.createAsyncClient();
        CompletableFuture<IndexResponse> future = asyncClient.index(i -> i.index("products")
            .id(product.getSku()).document(product)).whenComplete((response, exception) -> {
            if (exception != null) {
                System.err.println("Failed to index " + exception);
            } else {
                System.out.println("Indexed with version " + response.version());
            }
        });
        IndexResponse response = future.get();
        System.out.println("Async Indexed with version " + response.version());
    }
}

Edit 编辑本页