目录

elasticsearch 使用BulkProcessor导入txt大文件

最近在学es,需要导入8亿条数据

手上有一个txt大约有8亿条数据的样子,文件有19G左右,一开始百度搜了下,基本都是重复文章,不过也根据写这些文章的大佬慢慢google到了一些方法。
先说下导入机器配置:
cpu: E5 1620V2
内存: 32G(分给es 12G)
硬盘:4x2T raid 0(io大概在600左右)

导入的几种方法:

1、bulk:

ES本地支持的批量导入方式,将json文件post到es进行处理。
将需要导入的数据先生成json文件,格式类似这种

#指定 index  
{"index":{"_index":"suoy","_id":1}}  
#字段  
{"text_entry":"内容"}  
{"index":{"_index":"suoy","_id":1}}  
{"text_entry":"内容"}  
{"index":{"_index":"suoy","_id":1}}  
{"text_entry":"内容"}  
...........

然后使用curl提交

curl -H 'Content-Type: application/x-ndjson' -XPOST '127.0.0.1:9200/xxxxxxxx/doc/_bulk?pretty' --data-binary @out.json

一开始我是尝试这种方法,用python将数据重新处理了下,生成的文件有48GB……,还花了3-5个小时的样子,导入的时候直接失败~后面,看了下说是文件大小尽量不能超过200MB???这样的话就要分割文件了,虽然可以shell脚本一个一个的提交小文件json,后面想想直接放弃了(嫌麻烦)…….

2、logstash:

ES官方的另一个产品,将数据文本转换为ES的数据源。
我的文本一行只有两个字段,用 “—-” 分割,花了点时间学logstash直接上手开干,但是导入速度只有9000条/s的样子,一个小时导了大概3200w条数据,导了16个小时大概导了5.2亿条数据,这速度完全不行啊,后面手贱不注意按了 Ctrl+c,嗯…..这下好了,不知道怎么断点续传,又得重新来……
我用的脚本如下,有懂的大佬能否告知下logstash有没有类似BulkProcessor储存到x条数据再执行Bulk的方法?

input {  
    # 从文件读取日志信息  
    file {  
        path => "/data/sda3/k/log.txt"  
        type => "text"  
        start_position => "beginning"  
    }  
}
filter {  
    mutate {  
        split => ["message", "----"]  
    }  
	if [message][2] {  
        mutate {  
            add_field =>   {  
                "title" => "%{[message][1]}"  
				"log" => "%{[message][2]}"  
            }  
        }  
    }else{  
		if [message][0] {  
        mutate {  
            add_field =>   {  
                "title" => "%{[message][0]}"  
            }  
        }  
		}  
		if [message][1] {  
			mutate {  
				add_field =>   {  
					"log" => "%{[message][1]}"  
				}  
			}  
		}  
	}  
}
output {  
	elasticsearch{  
		hosts => "127.0.0.1:9200"  
		index => "logs"  
		user => "elastic"  
		#password  => "xxx"  
		#document_type => "_doc"  
	}  
    # 标准输出  
    #stdout { codec => rubydebug }  
}

 

3、Bulk Processor

rest方式采用的是restClient,基于http协议,Bulk Processor使用的是TransportClient,基于Tcp协议。
我这里是将数据jaon序列化后再提交的
配置文件

cluster.name=my-application  
ip=10.0.2.26  
port=9300  
# 每x个请求执行一次Bulk  
setBulkActions=10  
# 每5MB Bulk  
setBulkSize=5  
#无论请求多少,我都希望每x秒刷新一次*  
setFlushInterval=10  
# 设置并发请求数  
setConcurrentRequests=5  
index=logs  
path=D:\\log.txt

Maven

          org.elasticsearch.client  
          transport  
          7.1.0  
          org.apache.logging.log4j  
          log4j-core  
          2.11.1  
          com.lmax  
          disruptor  
          3.4.1  
          com.google.code.gson  
          gson  
          2.8.5  
          org.elasticsearch  
          elasticsearch  
          7.1.0  
          org.elasticsearch.client  
          elasticsearch-rest-client  
          7.1.0  
          org.elasticsearch.client  
          elasticsearch-rest-high-level-client  
          7.1.0
    public static void init(String filepath) throws IOException, InterruptedException {  
        // https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-bulk-processor.html  
        /*cluster.name 必须与服务端相同*/  
        Settings settings = Settings.builder().put("cluster.name", "my-application").build();  
//        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();  
//        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("elastic", "12346"));  //es账号密码(默认用户名为elastic)  
        TransportClient client = new PreBuiltTransportClient(settings)  
                .addTransportAddress(new TransportAddress(InetAddress.getByName(properties.getProperty("ip")), Integer.valueOf(properties.getProperty("port"))));  
        BulkProcessor build = BulkProcessor.builder(client, new BulkProcessor.Listener() {  
            @Override  
            public void beforeBulk(long l, BulkRequest bulkRequest) {  
                /*在批量执行之前调用此方法*/  
                logger.info("---尝试插入{}条数据---", bulkRequest.numberOfActions());  
            }  
            @Override  
            public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {  
                /*批量执行后调用此方法。例如,您可以检查是否有一些失败的请求response.hasFailures()*/  
                logger.info("---插入完成-> {}条数据,---", bulkRequest.numberOfActions());  
                bulkResponse.hasFailures();  
            }  
            @Override  
            public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {  
                /*批量失败并引发一个 Throwable*/  
                logger.error("error:" + throwable.getMessage() + " \ncause:" + throwable.getCause());  
//                logger.info("---尝试插入{}条数据---", bulkRequest.numberOfActions());  
            }  
        })  
                /*每x个请求执行一次Bulk*/  
                .setBulkActions(Integer.valueOf(properties.getProperty("setBulkActions")))  
                /*每5MB Bulk一次*/  
                .setBulkSize(new ByteSizeValue(Integer.valueOf(properties.getProperty("setBulkSize")), ByteSizeUnit.MB))  
                /*无论请求多少,我都希望每5秒刷新一次*/  
                .setFlushInterval(TimeValue.timeValueSeconds(Integer.valueOf(properties.getProperty("setFlushInterval"))))  
                /*设置并发请求数。值为0表示仅允许执行一个请求。值为1表示允许在累积新的批量请求时执行1个并发请求。*/  
                .setConcurrentRequests(Integer.valueOf(properties.getProperty("setConcurrentRequests")))  
                /*设置一个自定义退避策略,该策略最初将等待100毫秒,然后呈指数增长,然后重试最多3次。每当一个或多个批量项目请求失败时,都会尝试重试,EsRejectedExecutionException 并显示,表明有太多的计算资源可用于处理请求。要禁用退避,请通过BackoffPolicy.noBackoff()*/  
                .setBackoffPolicy(  
                        BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)  
                ).build();  
                /*默认情况下BulkProcessor:  
                    将bulkActions设置为 1000  
                    设置bulkSize为 5mb  
                    没有设置flushInterval  
                    将parallelRequests设置为1,这意味着异步执行刷新操作。  
                    将backoffPolicy设置为具有8次重试和50ms启动延迟的指数补偿。总等待时间约为5.1秒。  
                */  
        String index = properties.getProperty("index");  
        logger.info("开始添加数据!");  
        Gson gson = new Gson();  
        // 读取日志文件  
        File file = new File(filepath);  
        BufferedInputStream fis = new BufferedInputStream(new FileInputStream(file));  
        BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "utf-8"), 10 * 1024 * 1024);// 用10M的缓冲读取文本文件  
        String line = "";  
        while ((line = reader.readLine()) != null) {  
            //TODO: write your business  
            Map<String, String> map = new HashMap<>();  
            /*第一种情况*/  
            String[] split = line.split("----");  
            if (split.length > 1) {  
                map.put("qq", split[1]);  
                map.put("phone", split[2]);  
            }  
            if (map.size() > 0) {  
                /*添加数据*/  
                build.add(new IndexRequest(index, "_doc").source(gson.toJson(map), XContentType.JSON));  
            }  
        }  
        logger.info("添加数据完成!等待提交......");  
        /*刷新剩余请求*/  
        //build.flush();  
        build.awaitClose(10, TimeUnit.MINUTES);  
//        build.close();

我设置的BulkActions值为 50000 用这种方式导入速度达到了 6w/s的速度,cpu也是全部跑满,完全将性能发挥出来了~

https://img.qyi.io/2020/12/03/52bd7d1ade3dc.png

https://img.qyi.io/2020/12/03/ddaf1503f4edb.png

https://img.qyi.io/2020/12/03/28aa7c7d15fbd.png

参考文章:
https://www.cnblogs.com/ttzsqwq/p/11077574.html
https://blog.csdn.net/wslyk606/article/details/79413980