RocketClient
基于netty实现了基础的httpClient,基于common-pool池化连接,分别实现http的同步和异步调用
本工程的重点在于连接的管理和异步响应式编程,对http协议的支持没有太多开发
代码简洁清晰,方便二次开发
分支说明
sync:最基础的同步http调用,每次请求都新建连接,阻塞等待连接完成后发出http请求,阻塞等待http响应
async:最基础的异步http调用,每次请求都新建连接,连接完成后回调发出http请求,不阻塞等待http响应,直接返回包含响应的promise,http响应后回调写入promise
syncPool:使用连接池同步http调用,每次从池中获取连接。如果没有可用连接则新建连接,阻塞等待连接完成后发出http请求,阻塞等待http响应
asyncPool:使用连接池异步http调用,每次从池中获取连接。如果没有可用连接则新建连接,连接完成后回调发出http请求,不阻塞等待http响应,直接返回CompletableFuture,http响应后回调写入CompletableFuture
使用示例
// 异步调用
RocketClient rocketClient = new RocketClient(RocketConfig.defaultConfig());
long start = System.currentTimeMillis();
rocketClient.execute(RocketRequest.get("www.qq.com", 80, ""), new RocketDIYHandler() {
@Override
public void onCompleted(RocketResponse response) throws Exception {
log.info("resp:{} cast:{}", new String(response.getBody(), CharsetUtil.UTF_8), System.currentTimeMillis() - start);
}
@Override
public void onThrowable(Throwable t) {
log.error("error", t);
}
});
// 同步调用
RocketResponse rocketResponse = rocketClient.execute(RocketRequest.get("www.baidu.com", 80, "")).get();
// 实际上execute方法返回的是CompletableFuture<RocketResponse>,如果不传入RocketDIYHandler可以自己处理future
CompletableFuture<RocketResponse> future = rocketClient.execute(RocketRequest.get("www.qq.com", 80, ""))
性能测试,分别同HttpAsyncClient和AsyncHttpClient测试
同HttpAsyncClient对比
// HttpAsyncClient测试如下
AtomicInteger completeCount = new AtomicInteger();
AtomicInteger failCount = new AtomicInteger();
AtomicInteger cancelCount = new AtomicInteger();
HttpAsyncClientBuilder builder = HttpAsyncClientBuilder.create();
builder.setMaxConnPerRoute(100);
CloseableHttpAsyncClient httpclient = builder.build();
httpclient.start();
for (int i = 0; i < 2000; i++) {
HttpGet request = new HttpGet("http://httpbin.org/get");
httpclient.execute(request, new FutureCallback<HttpResponse>() {
@Override
public void completed(HttpResponse httpResponse) {
if (completeCount.get() >= 0) {
log.info("complete count:{}", completeCount.incrementAndGet());
}
}
@Override
public void failed(Exception e) {
log.info("fail count:{}", failCount.incrementAndGet());
}
@Override
public void cancelled() {
log.info("cancel count:{}", cancelCount.incrementAndGet());
}
});
}
// RocketClient测试如下
RocketConfig rocketConfig = RocketConfig.defaultConfig();
rocketConfig.setBlockWhenExhausted(true);
RocketClient rocketClient = new RocketClient(rocketConfig);
AtomicInteger completeCount = new AtomicInteger();
AtomicInteger failCount = new AtomicInteger();
AtomicInteger cancelCount = new AtomicInteger();
for (int i = 0; i < 2000; i++) {
rocketClient.execute(RocketRequest.get("httpbin.org", 80, "/get"), new RocketDIYHandler() {
@Override
public void onCompleted(RocketResponse response) throws Exception {
log.info("complete count:{}", completeCount.incrementAndGet());
}
@Override
public void onThrowable(Throwable t) {
log.info("fail count:{}", failCount.incrementAndGet());
}
});
}
同AsyncHttpClient对比
// AsyncHttpClient测试如下
AtomicInteger completeCount = new AtomicInteger();
AtomicInteger failCount = new AtomicInteger();
AtomicInteger cancelCount = new AtomicInteger();
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(10, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000));
AsyncHttpClientConfig ahcConfig = new DefaultAsyncHttpClientConfig.Builder()
.setMaxConnectionsPerHost(100)
.build();
DefaultAsyncHttpClient asyncHttpClient = new DefaultAsyncHttpClient(ahcConfig);
for (int i = 0; i < 2000; i++) {
poolExecutor.execute(()->{
try {
asyncHttpClient.executeRequest(Dsl.get("http://httpbin.org/get").setRequestTimeout(60000), new AsyncCompletionHandlerBase() {
@Override
public Response onCompleted(Response response) throws Exception {
log.info("complete count:{}", completeCount.incrementAndGet());
return response;
}
@Override
public void onThrowable(Throwable t) {
System.err.println(t);
}
}).get();
} catch (Exception e) {
e.printStackTrace();
}
});
}
// RocketClient测试如下
AtomicInteger completeCount = new AtomicInteger();
AtomicInteger failCount = new AtomicInteger();
AtomicInteger cancelCount = new AtomicInteger();
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(10, 10, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000));
RocketClient rocketClient = new RocketClient(RocketConfig.defaultConfig());
for (int i = 0; i < 2000; i++) {
poolExecutor.execute(()->{
try {
rocketClient.execute(RocketRequest.get("httpbin.org", 80, "/get"), new RocketDIYHandler() {
@Override
public void onCompleted(RocketResponse response) throws Exception {
log.info("complete count:{}", completeCount.incrementAndGet());
}
@Override
public void onThrowable(Throwable t) {
log.info("fail count:{}", failCount.incrementAndGet());
}
}).get();
} catch (Exception e) {
e.printStackTrace();
}
});
}
因为和HttpAsyncClient、AsyncHttpClient对比方式不同,所以结果不能对比HttpAsyncClient和AsyncHttpClient之间的性能
同HttpAsyncClient对比最终结果
RocketClient 2000次请求:9s
HttpAsyncClient 2000次请求:28s
同AsyncHttpClient对比最终结果
RocketClient 2000次请求:65s
AsyncHttpClient 2000次请求:65s
依赖清单(仅额外依赖三个包)
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>