# 前言
其实和flink没啥关系,只是正好场景使用的是flink,问题在于elasticsearch5的参数设置
# 问题
之前代码,数据无法写入,但是也不报错,后来添加了一个参数设置,就可以写入了
# 参数配置
config.put("bulk.flush.max.actions", "1");
在如下代码中,已经加入该参数配置
@Test
void contextLoads() throws Exception {
// 获取流处理执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从集合中读取数据
DataStream<String> dataStream = env.fromCollection(List.of("cc"));
// 数据发送到elasticsearch
Map<String, String> config = new HashMap<>();
config.put("cluster.name", "es");
config.put("bulk.flush.max.actions", "1");
List<InetSocketAddress> transportAddresses = new ArrayList<>();
transportAddresses.add(new InetSocketAddress(InetAddress.getByName("10.95.102.40"), 9300));
dataStream.addSink(new ElasticsearchSink(config,transportAddresses,new ElasticsearchMapper()));
// 打印读取数据
dataStream.print();
// 执行任务
env.execute();
}
public static class ElasticsearchMapper implements ElasticsearchSinkFunction<String> {
@Override
public void process(String s, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
// 组装数据
Map<String, String> datas = Map.of("id", s);
// 组织es结构
IndexRequest indexRequest = Requests.indexRequest()
.index("situation-sirius-2021.12.15")
.type("sirius")
.source(datas);
// 发送数据
requestIndexer.add(indexRequest);
}
}
}