0
点赞
收藏
分享

微信扫一扫

5.java实现socket编程之AIO方式


AIO理解

aio比nio更加进步和简化。多路复用器判断状态那堆麻烦的代码被线程组取代了。
这些线程组判断通道状态,建立连接,然后把对接好的请求推送到具体的handler,可以让你更加专注业务。(简单理解就是通过一堆线程组,把客户端进行接入,过程中的事情,aio给做了)

客户端代码实现

package aio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;

/**
* @Auther: 李泽
* @Date: 2019/3/4 17:26
* @Description:
*/
public class Client implements Runnable{
private AsynchronousSocketChannel asc;

public Client( ) throws IOException {
asc = AsynchronousSocketChannel.open();
}
public void connect(){
asc.connect(new InetSocketAddress("127.0.0.1",8000));
}
public void write(String request){
try {
asc.write(ByteBuffer.wrap(request.getBytes())).get();
read();
}catch (Exception e){
e.printStackTrace();
}
}

private void read() {
ByteBuffer buffer = ByteBuffer.allocate(1024);
try {
//把數據读入缓冲区中
asc.read(buffer).get();
//切换缓冲区的读取模式
buffer.flip();
//构造一个字节数组接受缓冲区中的数据
byte[] respBytes = new byte[buffer.remaining()];
buffer.get(respBytes);
System.out.println("客户端接收服务器端:"+new String(respBytes,"utf-8").trim());
}catch (Exception e){
e.printStackTrace();
}
}
/**
* 功能描述: 简单让线程不停止。
*
* @auther: 李泽
* @date: 2019/3/4 18:09
*/
@Override
public void run() {
while (true){

}
}

public static void main(String[] args) throws IOException, InterruptedException {
Client c1 = new Client();
c1.connect();
Client c2 = new Client();
c2.connect();
Client c3 = new Client();
c3.connect();

new Thread(c1,"c1").start();
new Thread(c2,"c2").start();
new Thread(c3,"c3").start();

Thread.sleep(1000);

c1.write("c1 aaa");
c2.write("c2 bbhb");
c3.write("c3 cccc");
}
}

服务器端代码实现

package aio;

import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* @Auther: Administrator
* @Date: 2019/3/4 16:45
* @Description:
*/
public class Server {
//线程池
private ExecutorService executorService;
//AIO工作的线程组
private AsynchronousChannelGroup threadGroup;
//服务器通道
public AsynchronousServerSocketChannel assc;

public Server(int port) {
try {
//创建一个可伸缩的线程池
executorService = Executors.newCachedThreadPool();
//创建干活的线程组,负责连接上之前的所有的琐碎的工作。
threadGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService,1);
//创建服务器通道,并且设置为这个通道干活的线程组
assc = AsynchronousServerSocketChannel.open(threadGroup);
//绑定端口
assc.bind(new InetSocketAddress(port));
System.out.println("server start!port:"+port);
//进行阻塞,实际上并没有卡在这。
assc.accept(this,new ServerCompletionHandler());
//阻塞在这不让服务停止,因为accept不会阻塞。
Thread.sleep(Integer.MAX_VALUE);
}catch (Exception e){
e.printStackTrace();
}
}

public static void main(String[] args) {
Server server = new Server(8000);
}
}

服务端业务处理handler实现

package aio;

import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

/**
* @Auther: 李泽
* @Date: 2019/3/4 16:57
* @Description:
*/
public class ServerCompletionHandler implements CompletionHandler<AsynchronousSocketChannel,Server> {
/**
* 功能描述: 建立连接之后才会调用,主要用来读写数据给客户端。
*
* @auther: 李泽
* @date: 2019/3/4 17:09
*/
@Override
public void completed(AsynchronousSocketChannel result, Server attachment) {
//这个对象被使用了,accept方法被执行过了,如果不设置一个本类对象去执行任务的话,不重新监听的话,新的请求
// 绝对进不来,所以要调用一下 server中 这句话的等效语句assc.accept(this,new ServerCompletionHandler());
//类似递归,让这个类重新处于监听状态,处理下一个请求,没有new新对象。各个对象之间互不干扰。
attachment.assc.accept(attachment,this);
//读取数据
read(result);
}
/**
* 功能描述: 具体的读取逻辑
*
* @auther: 李泽
* @date: 2019/3/4 17:10
*/
private void read(AsynchronousSocketChannel asc) {
//创建缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
//拿出通道来执行读取的业务
asc.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
//进行读取之前重新设置pos和limit
attachment.flip();
//打印获得的字节数
System.out.println("resultSize = " + result);
//获取读取的数据
String body = new String(attachment.array()).trim();
System.out.println("server accept body = " + body);
String resp = "服务器收到你发来的数据:"+ body;
write(asc,resp);
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {
exc.printStackTrace();
}
});
}
/**
* 功能描述: 写回客户端,读写方法都是不阻塞的。
*
* @auther: 李泽
* @date: 2019/3/4 17:23
*/
private void write(AsynchronousSocketChannel asc, String resp) {
try {
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put(resp.getBytes());
buffer.flip();
//get写不写都行
asc.write(buffer).get();
}catch (Exception e){
e.printStackTrace();
}
}

@Override
public void failed(Throwable exc, Server attachment) {
exc.printStackTrace();
}
}


举报

相关推荐

0 条评论