您的位置 首页 java

java实现Elasticsearch的数据导出与导入

最近组内有个需求要把本地的es测试数据导入到内网中,看了网上很多说用 elasticdump插件,不过这个还需要安装个人觉得有点麻烦,在网上找了一下代码自己做了下优化还挺方便,适合刚开始学习的人,下面给大家分享一下。

java实现Elasticsearch的数据导出与导入

1、pom.xml引入依赖如下:

 <dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>6.4.3</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>6.4.3</version>
</dependency>  

2、 在java中用scroll查询es,数据明细输出为json格式文件

  //传入待查的es索引、类型、输出文件路径
public static void scrollQueryTest(String index, String type, File file) throws IOException {
        //  1. 创建查询对象
        SearchRequest searchRequest = new SearchRequest(index);//指定索引
        searchRequest.types(type);//指定类型
        searchRequest.scroll(TimeValue.timeValueMinutes(1l));//指定存在内存的时长为1分钟
      //  2. 封装查询条件
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.size(2);
        searchSourceBuilder.query(QueryBuilders.matchAllQuery());
        searchRequest.source(searchSourceBuilder);
        //  3.执行查询
        // client执行
        HttpHost httpHost = new HttpHost("源es的IP", 9200);
        RestClientBuilder restClientBuilder = RestClient.builder(httpHost);
        RestHighLevelClient restHighLevelClient = new RestHighLevelClient(restClientBuilder);
        SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
        //获取scorllId
        String scrollId = searchResponse.getScrollId();
        System.out.println(scrollId);
        //创建文件输出流
        BufferedWriter out = new BufferedWriter(new FileWriter(file, true));
       // 4.获取数据
        SearchHit[] hits = searchResponse.getHits().getHits();
        //第一页的数据
        for (int i = 0; i < hits.length; i++) {
            String json = hits[i].getSourceAsString();
            out.write(json);//导出的文件以json格式写入到文件中
            out.write("n");
        }
        System.out.println("首页查询输出结束,文件存储路径=====" + file);
        //获取全部数据
        while (true) {
            //创建SearchScrollRequest对象
            SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId);
            searchScrollRequest.scroll(TimeValue.timeValueMinutes(1l));//设置1分钟
            SearchResponse scroll = restHighLevelClient.scroll(searchScrollRequest, RequestOptions.DEFAULT);
            SearchHit[] hits1 = scroll.getHits().getHits();
            if (hits1 != null && hits1.length > 0) {
                System.out.println("hit1======="+hits1.length+"   "+hits1);
                for (int i = 0; i < hits1.length; i++) {
                    String json = hits1[i].getSourceAsString();
                    out.write(json);//导出的文件以json格式写入到文件中
                    out.write("n");
                }
                System.out.println("本次查询输出结束,文件存储路径=====" + file);
                System.out.println("------------下一页--------------");

            } else {
                System.out.println("------------结束--------------");
             // 关闭文件流
                out.close();
                break;
            }
        }
        //      删除ScrollId
        ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
        clearScrollRequest.addScrollId(scrollId);
        ClearScrollResponse clearScrollResponse = restHighLevelClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
        System.out.println("删除scroll" + clearScrollResponse);
        //    关闭restHighLevelClient链接
        restHighLevelClient.close();
    }  

3、java读取本地文件,文件数据明细存入es

 //传入文件路径、索引、es中的type ,RestHighLevelClient连接统一使用esContenct()方法
public static void inputEsData(File file, String index, String type) throws IOException {
    BufferedReader br = new BufferedReader(new FileReader(file));//读取刚才导出的ES数据
    String json;
    int count = 1;
    BulkRequest bulkRequest = new BulkRequest();
    BulkResponse indexResponse;
    while ((json = br.readLine()) != null) {
        bulkRequest.add(new IndexRequest(index).source(json, XContentType.JSON).type(type)); //先把数据加入到队列中
        //每1000条提交一次
        if (count % 1000 == 0) {
            indexResponse = esContenct().bulk(bulkRequest, RequestOptions.DEFAULT);
            if (indexResponse != null) {
                System.out.println("=================indexResponse==============" + indexResponse);
                System.out.println("本次es提交的记录数=============" + count);
            }
            count++;
        }
    }
    System.out.println("插入完毕");
    br.close();
    esContenct().close();
}  

3、第三步inputEsData方法用到的esContenct连接方法和查询条件统一抽出来放在下面的方法中。

  public static RestHighLevelClient esContenct() {
        // 创建Client连接对象
        System.out.println("===================连接RestHighLevelClient======================");
        String[] ips = {"源es的IP:9200"};
        HttpHost[] httpHosts = new HttpHost[ips.length];
        for (int i = 0; i < ips.length; i++) {
            httpHosts[i] = HttpHost.create(ips[i]);
        }
        RestClientBuilder builder = RestClient.builder(httpHosts);
        return new RestHighLevelClient(builder);
    }

    //构建查询条件
    public static SearchRequest searchRequest(String index, String type) {
        //设置要查询的索引 和 type
        SearchRequest searchRequest = new SearchRequest(index);
        searchRequest.types(type);
        //查询构建工具
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        //查询构造条件
        searchSourceBuilder.query(QueryBuilders.matchAllQuery());
        searchSourceBuilder.timeout(new TimeValue(5000));
        searchSourceBuilder.from(0);//分页查询,设置起始下标,从0开始
        searchSourceBuilder.size(1000);//每页显示个数
        searchRequest.source(searchSourceBuilder); //将请求体加入到请求中
        return searchRequest;
    }
  

4、创建main方法分别调用

  public class EsDataExchange {
    public static void main(String[] args) throws IOException {
        File file = new File("/Users/XXXX/Desktop/outes.json");
        String index = "源es索引";
        String type = "源es类型";
// es数据导入
        inputEsData(file, index, type);
      //es数据导出
        scrollQueryTest(index, type, file);
    }  

分享是人们与社会产生联系的一种方式,本质上是一种互利行为,在互利中,选择、巩固和发展彼此的社会关系。完全没有分享,社会将不能成立。分享是构成社会的要素之一。

文章来源:智云一二三科技

文章标题:java实现Elasticsearch的数据导出与导入

文章地址:https://www.zhihuclub.com/174426.shtml

关于作者: 智云科技

热门文章

网站地图