public class WriteServer {
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.register(selector, SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
while (true) {
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
SelectionKey scKey = sc.register(selector, 0, null);
scKey.interestOps(SelectionKey.OP_READ);
// 1. 向客户端发送大量数据
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 5000000; i++) {
sb.append("a");
}
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
// 2. 返回值代表实际写入的字节数
int writeNums = sc.write(buffer);
System.out.println(writeNums);
// 3. 判断是否有剩余内容
if (buffer.hasRemaining()) {
// 4. 关注可写事件
scKey.interestOps(scKey.interestOps() + SelectionKey.OP_WRITE);
// 5. 把未写完的数据挂到scKey上
scKey.attach(buffer);
}
} else if (key.isWritable()) {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
int write = sc.write(buffer);
System.out.println(write);
// 6. 清理操作
if (!buffer.hasRemaining()) {
// 需要清除buffer
key.attach(null);
// 不再关注可写事件
key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
}
}
}
}
}
}
public class WriteClient {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
// 客户端接收数据
int count = 0;
while (true) {
ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
count += sc.read(buffer);
System.out.println(count);
buffer.clear();
}
}
}