协处理器代码
package com.***.hbase.coprocessors;
import com.***.es.util.ESClient;
import com.***.es.util.ElasticSearchBulkOperator;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.*;
import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
public class UserInfoTestSyncEsObserver implements RegionObserver , RegionCoprocessor {
private static final Logger LOG = Logger.getLogger(UserInfoTestSyncEsObserver.class);
private String index = "user_info_test";
private String type = "user_info_testtype";
@Override
public Optional<RegionObserver> getRegionObserver() {
return Optional.of(this);
}
@Override
public void start(CoprocessorEnvironment env) throws IOException {
ESClient.initEsClient();
ElasticSearchBulkOperator.init();
LOG.info("****init start*****");
}
@Override
public void stop(CoprocessorEnvironment env) throws IOException {
ESClient.closeEsClient();
ElasticSearchBulkOperator.shutdownScheduEx();
LOG.info("****end*****");
}
@Override
public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
String indexId = new String(put.getRow());
try {
NavigableMap<byte[], List<Cell>> familyMap = put.getFamilyCellMap();
Map<String, Object> json = new HashMap<>();
for (Map.Entry<byte[], List<Cell>> entry : familyMap.entrySet()) {
for (Cell cell : entry.getValue()) {
String key = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
json.put(key, value);
}
}
ElasticSearchBulkOperator.addUpdateBuilderToBulk(ESClient.client.prepareUpdate(index,type, indexId).setDocAsUpsert(true).setDoc(json));
} catch (Exception ex) {
LOG.error("observer put a doc, index [ " + "user_test" + " ]" + "indexId [" + indexId + "] error : " + ex.getMessage());
}
}
@Override
public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit, Durability durability) throws IOException {
String indexId = new String(delete.getRow());
try {
ElasticSearchBulkOperator.addDeleteBuilderToBulk(ESClient.client.prepareDelete(index,type, indexId));
LOG.info("**** postDelete success*****");
} catch (Exception ex) {
LOG.error(ex);
LOG.error("observer delete a doc, index [ " + "user_test" + " ]" + "indexId [" + indexId + "] error : " + ex.getMessage());
}
}
}
依赖
<properties>
<hadoop.version>3.0.0</hadoop.version>
<hbase.version>2.0.0</hbase.version>
<elasticsearch.version>6.3.0</elasticsearch.version>
<hutool.version>5.3.10</hutool.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.htrace</groupId>
<artifactId>htrace-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>${hutool.version}</version>
</dependency>
</dependencies>
ES客户端代码
package com.***.es.util;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ElasticSearchBulkOperator {
private static final Log LOG = LogFactory.getLog(ElasticSearchBulkOperator.class);
private static final int MAX_BULK_COUNT = 10000;
private static BulkRequestBuilder bulkRequestBuilder = null;
private static final Lock commitLock = new ReentrantLock();
private static ScheduledExecutorService scheduledExecutorService = null;
static {
init();
}
public static void init() {
bulkRequestBuilder = ESClient.client.prepareBulk();
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
scheduledExecutorService = Executors.newScheduledThreadPool(1);
final Runnable beeper = () -> {
commitLock.lock();
try {
LOG.info("Before submission bulkRequest size : " +bulkRequestBuilder.numberOfActions());
bulkRequest(0);
LOG.info("After submission bulkRequest size : " +bulkRequestBuilder.numberOfActions());
} catch (Exception ex) {
System.out.println(ex.getMessage());
} finally {
commitLock.unlock();
}
};
scheduledExecutorService.scheduleAtFixedRate(beeper, 10, 30, TimeUnit.SECONDS);
}
public static void shutdownScheduEx() {
if (null != scheduledExecutorService && !scheduledExecutorService.isShutdown()) {
scheduledExecutorService.shutdown();
}
}
private static void bulkRequest(int threshold) {
if (bulkRequestBuilder.numberOfActions() > threshold) {
BulkResponse bulkItemResponse = bulkRequestBuilder.execute().actionGet();
if (!bulkItemResponse.hasFailures()) {
int beforeCount = bulkRequestBuilder.numberOfActions();
bulkRequestBuilder = ESClient.client.prepareBulk();
LOG.info("提交成功,提交前"+beforeCount+"\t提交后:"+bulkRequestBuilder.numberOfActions());
}else {
LOG.error("异常1,待提交的数量为:"+bulkRequestBuilder.numberOfActions());
LOG.error("异常信息:"+bulkItemResponse.buildFailureMessage());
LOG.error("异常2,待提交的数量为:"+bulkRequestBuilder.numberOfActions());
}
}
}
public static void addUpdateBuilderToBulk(UpdateRequestBuilder builder) {
commitLock.lock();
try {
bulkRequestBuilder.add(builder);
bulkRequest(MAX_BULK_COUNT);
} catch (Exception ex) {
LOG.error(" update Bulk index error : " + ex.getMessage() + "\t" +ex.toString());
ex.printStackTrace();
} finally {
commitLock.unlock();
}
}
public static void addDeleteBuilderToBulk(DeleteRequestBuilder builder) {
commitLock.lock();
try {
bulkRequestBuilder.add(builder);
bulkRequest(MAX_BULK_COUNT);
} catch (Exception ex) {
LOG.error(" delete Bulk " + "gejx_test" + " index error : " + ex.getMessage());
} finally {
commitLock.unlock();
}
}
}
package com.***.es.util;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import java.net.InetAddress;
import java.net.UnknownHostException;
public class ESClient {
public static Client client;
private static final Log log = LogFactory.getLog(ESClient.class);
public static void initEsClient() throws UnknownHostException {
log.info("初始化es连接开始");
System.setProperty("es.set.netty.runtime.available.processors", "false");
Settings esSettings = Settings.builder()
.put("cluster.name", "log_cluster")
.put("client.transport.sniff", true)
.build();
client = new PreBuiltTransportClient(esSettings)
.addTransportAddress(new TransportAddress(InetAddress.getByName("ip"), 9300))
.addTransportAddress(new TransportAddress(InetAddress.getByName("ip"), 9300))
.addTransportAddress(new TransportAddress(InetAddress.getByName("ip"), 9300))
;
log.info("初始化es连接完成");
}
public static void closeEsClient() {
client.close();
log.info("es连接关闭");
}
}