0
点赞
收藏
分享

微信扫一扫

NIO例子

seuleyang 2022-02-18 阅读 69


client向server发送questionId,server随机得到1个答案,然后把questionId和答案发送给client


AnswerServer 

public class AnswerServer {

private static AtomicLong atomicLong = new AtomicLong();

public static final String[] answers = {"A","B","C","D"};

public static final String RES_CONNECTION_OK = "OK";

static Random random = new Random(3);

private Selector selector;

public AnswerServer(){
try{
this.init();
}catch (Exception e){
e.printStackTrace();
}
}

public void init() throws Exception{
//打开1个io多路复用器
selector = Selector.open();

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
//绑定服务端口
serverSocketChannel.socket().bind(new InetSocketAddress(Constant.SERVER.IP,Constant.SERVER.PORT));

System.out.println("answer server linten port="+Constant.SERVER.PORT);

//对于服务端来说,一定要先注册一个OP_ACCEPT事件用来响应客户端的请求连接
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

}

public void start() throws Exception{

while (true) {
this.selector.select();
System.out.println("the coming keys="+this.selector.selectedKeys());

Iterator<SelectionKey> iterator = this.selector.selectedKeys().iterator();

while(iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
SocketChannel socketChannel = null;
int type = 0;
try {
if (key.isAcceptable()) {
//这里key.channel是ServerSocketChannel
type = SelectionKey.OP_ACCEPT;
this.accept(key);
} else if (key.isValid() && key.isReadable()) {
type = SelectionKey.OP_READ;
socketChannel = (SocketChannel) key.channel();
this.read(socketChannel, key);
}
}catch (Exception e) {
key.cancel();
if(socketChannel!=null){
socketChannel.socket().close();
socketChannel.close();
}
System.out.println("exception at type="+type);
e.printStackTrace();
}

}

}
}

private void accept(SelectionKey key) throws Exception {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel)key.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
System.out.println("开始注册读时间");

socketChannel.register(this.selector,SelectionKey.OP_READ);

this.sayAcceptOk(socketChannel);

}

private void sayAcceptOk(SocketChannel socketChannel) throws Exception{
ByteBuffer byteBuffer = ByteBuffer.wrap(AnswerServer.RES_CONNECTION_OK.getBytes());
System.out.println("AnswerServer发送连接成功!");
socketChannel.write(byteBuffer);
}

private void read(SocketChannel channel,SelectionKey key) throws Exception {
System.out.println(atomicLong.incrementAndGet()+",开始读数据 key="+key);
// 先得到题目

ByteBuffer buffer = ByteBuffer.allocate(50);

while (channel.isOpen() && channel.read(buffer) != -1) {

if (buffer.position() > 0) {
System.out.println("数据读完了-----------------------position="+buffer.position());
break;
}
}
if (buffer.position() == 0) {
System.out.println("没有数据了-----------------------");
return; // 如果没数据了, 则不继续后面的处理
}

String questionId = ByteBufferUtil.toString(buffer);
System.out.println("========read questionId :" + questionId);

String answer = answers[random.nextInt(4)];

String response = "questionId="+questionId+",answerId="+answer;

ByteBuffer responseBuffer = ByteBuffer.wrap(response.getBytes());
channel.write(responseBuffer);

}

public static void main(String[] args) throws Exception{
new AnswerServer().start();
}

}

AnswerNIOClient

public class AnswerNIOClient implements MyRunnable {

private Selector selector;
SocketChannel socketChannel;
private AtomicLong atomicLong = new AtomicLong();

public AnswerNIOClient() throws Exception {
this.init();
}


@Override
public void init() throws Exception {
selector = Selector.open();
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
// System.out.println("end init method");
}

@Override
public void start() throws Exception {
// System.out.println("call start method");
try {
this.doConnect();
// System.out.println("after call doConnect method");
}catch (Exception e){
System.out.println("连接失败!");
e.printStackTrace();
System.exit(-1);
}


while (true) {
selector.select();
try {

Iterator<SelectionKey> iterator = this.selector.selectedKeys().iterator();

while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();

SocketChannel sc = (SocketChannel) key.channel();

System.out.println("isTrue=" + (sc==socketChannel));
System.out.println("socketChannel=" + socketChannel);

if(key.isConnectable()) {
if (sc.finishConnect()) {
System.out.println("==========finishConnect==================================");
sc.register(selector, SelectionKey.OP_READ);
this.doWrite(socketChannel);
} else {
//连接失败 进程退出
System.exit(1);
}

} else if (key.isValid() && key.isReadable()) {
this.doRead(sc, key);
} else if (key.isValid() && key.isWritable()) {
this.doWrite(sc);
}

}

} catch (Exception e) {
e.printStackTrace();
}
}

}

private void doConnect() throws Exception {
//通过ip和端口号连接到服务器
boolean connected = this.socketChannel.connect(new InetSocketAddress(Constant.SERVER.IP,Constant.SERVER.PORT));
System.out.println("doConnect="+connected);
if(connected){
//向多路复用器注册可读事件
socketChannel.register(this.selector,SelectionKey.OP_READ);
} else {
//若连接服务器失败,则向多路复用器注册连接事件
socketChannel.register(this.selector, SelectionKey.OP_CONNECT);
}

}

private void doRead(SocketChannel socketChannel,SelectionKey key) throws Exception {
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
socketChannel.read(readBuffer);
String content = ByteBufferUtil.toString(readBuffer);
System.out.println("服务器响应! 问题及答案:"+content);

socketChannel.register(this.selector,SelectionKey.OP_WRITE);

}


private void doWrite(SocketChannel socketChannel) throws Exception {
Long questionId = getQuestionId();

ByteBuffer byteBuffer = ByteBuffer.wrap(String.valueOf(questionId).getBytes());
socketChannel.write(byteBuffer);
System.out.println("AnswerNIOClient 发送查询答案! questionId="+questionId);

socketChannel.register(this.selector,SelectionKey.OP_READ);

}


private Long getQuestionId() throws Exception{
TimeUnit.SECONDS.sleep(3);
Long questionId = atomicLong.incrementAndGet();
return questionId;


}

public static void main(String[] args) throws Exception{
new AnswerNIOClient().start();
}

}
AnswerBIOClient
public class AnswerBIOClient {

Socket socket = new Socket();

private AtomicLong atomicLong = new AtomicLong();

private boolean isConnectionOk = false;

public AnswerBIOClient(){

try{
init();
}catch (Exception e){
e.printStackTrace();
}
}

public void init() throws Exception{
socket.connect(new InetSocketAddress(Constant.SERVER.IP,Constant.SERVER.PORT));
if(socket.isConnected()) {
System.out.println("AnswerBIOClient连接成功! 服务端端口="+socket.getPort()+" 客户端端口="+socket.getLocalPort());
}
}

public void start() throws Exception{
Runnable readRunnable = ()->{
try {
read();
}catch (Exception e){
System.out.println("readRunnable exception "+e.getMessage());
e.printStackTrace();
}
};

Runnable writeRunnable = ()->{
try {
write();
}catch (Exception e){
System.out.println("readRunnable exception "+e.getMessage());
e.printStackTrace();
}
};

Thread readThread = new Thread(readRunnable,"readThread");
Thread writeThread = new Thread(writeRunnable,"writeThread");

readThread.start();
writeThread.start();

}

private void read1() throws Exception{
while(true) {
//服务器返回需要加\n
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
System.out.println("等待读数据");
String line = null;
while((line=bufferedReader.readLine())!=null) {
System.out.println("AnswerBIOClient read server data =" + line);
}

}

}

private void read() throws Exception{

while(true) {
System.out.println("等待读数据");

byte[] bytes = new byte[512];
int readLength = socket.getInputStream().read(bytes);
System.out.println("readLength="+readLength);
while (readLength!=-1) {
String result = new String(bytes,0,readLength);
if(!isConnectionOk){
isConnectionOk = result.equals(AnswerServer.RES_CONNECTION_OK);
System.out.println("result="+result+" isConnectionOk="+isConnectionOk+" o:"+result.equals(AnswerServer.RES_CONNECTION_OK));
}
}

}

}

private void write() throws Exception{

while (true) {
if (isConnectionOk) {
TimeUnit.SECONDS.sleep(5);
Long questionId = atomicLong.incrementAndGet();
System.out.println("客户端开始发送 questionId=" + questionId);
socket.getOutputStream().write(String.valueOf(questionId).getBytes());
socket.getOutputStream().flush();
}
}

}

public static void main(String[] args) throws Exception{

new AnswerBIOClient().start();

}

}

 

举报

相关推荐

0 条评论