在Netty当中所有的IO操作全部是异步的,也就是说调用任何方法都会立即返回,而不能保证所请求的I / O操作在调用结束时已完成,想下面的方法,像对端发送一个消息,方法会立即返回,但是消息是异步发送的,你不知道消息最后的具体发送是成功了还是失败了。
ctx.channel().writeAndFlusf();
该方法返回一个ChannelFuture实力,通过它设置回调函数来接收操作的执行情况。
像下面的方法,close也是异步的,它返回一个ChannelFuture对象, 我们可以加入listener来判断执行结果。
// GOOD
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ChannelFuture future = ctx.channel().close();
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) {
// Perform post-closure operation
// ...
}
});
}
package com.ht.web.websocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
public class TextFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private static final Logger log = LoggerFactory.getLogger(TextFrameHandler.class);
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
//当前方法运行在Netty的EventLoop线程中
System.out.println("接收到客户端消息:" + msg.text() + " || 客户端地址: ==> "
+ ctx.channel().remoteAddress() + "|| 当前方法线程:" + Thread.currentThread());
//启动一个线程给客户端发送消息
new Thread(new Runnable() {
@Override
public void run() {
//给客户端发送消息,操作是异步的,所以返回Future对象
//writeAndFlush这个动作不会在当前线程执行,会以newRunnable的方式加入到EventLoop当中执行
ChannelFuture cf = ctx.channel().writeAndFlush(
new TextWebSocketFrame(ctx.alloc().buffer(1024)
.writeBytes("Hello".getBytes())));
//添加监听器
cf.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> f) throws Exception {
System.out.println("operationComplete:isCancelled->" + f.isCancelled() +
"| isDone-> " + f.isDone() +
"| isSuccess-> " + f.isSuccess() +
"| Thread->" + Thread.currentThread());
}
});
//判断是否可以取消,如果消息还没发送则返回true
System.out.println(cf.isCancellable());
//取消发送动作
System.out.println(cf.cancel(true));
}
}).start();
//这里暂停10秒为了测试,非常重要
//如果这里不暂停,当前方法马上返回,则EventLoop处于空闲状态
//上面线程内的发送事件会加入到EventLoop中,立即执行,所以cf.isCancellable()会返回false,因为消息已经发送完毕。
//反之,如果这个函数设置一个暂停,就会导致EventLoop处理工作状态,当前方法还没返回
//则上面加入到EventLoop当中的发送事件不会被立即执行,因为要等这个方法结束,eventloop才会执行下一个事件
//所以-调用cf.isCancellable()可以返回true
Thread.sleep(100000);
}
}