Socket的系统调用
[图片上传失败...(image-796c7f-1587557689563)]
1. 用户态和内核态
Linux操作系统的体系架构分为用户态和内核态(或者用户空间和内核)。内核从本质上看是一种软件——控制计算机的硬件资源,并提供上层应用程序运行的环境。用户态即上层应用程序的活动空间,应用程序的执行必须依托于内核提供的资源,包括CPU资源、存储资源、I/O资源等。为了使上层应用能够访问到这些资源,内核必须为上层应用提供访问的接口:即系统调用。
2. 访问内核态方式
- 系统调用
- 库函数
- Shell脚本
3. Socket的系统调用
socket、connect、accept、listen、bind、recvfrom、read、write
三次握手
- 服务端和客户端通过socket函数指向一个文件描述符,然后通过这个文件描述符建立连接。服务器通过bind()绑定端口,这样客户端可以IP+端口唯一标识到服务器的进程,调用listen()监听客户端,客户端调用connect请求连接,服务器的accept函数一直阻塞,直到有连接。三次握手建立连接之后,就可以调用write和read进行读写。
BIO
服务端
public class Server {
public static void main(String[] args) {
try {
byte[] bt = new byte[1024];
ServerSocket serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress(8080));
//等待连接:阻塞状态
System.out.println("server start...");
Socket accept = serverSocket.accept();
System.out.println("connect success");
//等待返回通知:阻塞状态
int read = accept.getInputStream().read(bt);
if(read != -1){
System.out.println("result:"+new String(bt));
}
} catch (Exception e){
System.out.println(e);
}
}
}
客户端实现
public class Clients {
public static void main(String[] args) {
try {
Socket socket = new Socket("127.0.0.1",8080);
socket.getOutputStream().write("返回结果".getBytes());
} catch (IOException e) {
e.printStackTrace();
}
}
}
这是BIO单线程实现,其中accept和read都会被阻塞。如果被阻塞在accpet和read中,那么不能接受其他连接。
多线程实现
public class Server {
public static void main(String[] args) {
try {
byte[] bt = new byte[1024];
ServerSocket serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress(8080));
while (true) {
//等待连接:阻塞状态
System.out.println("server start...");
Socket accept = serverSocket.accept();
System.out.println("connect success");
new Thread(new Runnable() {
@Override
public void run() {
try {
while (true) {
System.out.println("--------------------------");
int read = accept.getInputStream().read(bt);
if(read != -1){
System.out.println("result:" + new String(bt));
}
}
} catch (Exception e) {
System.out.println(e);
}
}
}).start();
}
} catch (Exception e){
System.out.println(e);
}
}
}
每个连接都会开一个新的线程来处理连接,但是read依然是阻塞的,那么当客户端阻塞在read时,会造成资源的浪费。导致其他连接无法处理。也可以用线程池处理,但是依然会有阻塞的问题。
NIO
- Buffer缓冲区
- 多路复用的Selctor,也就是用一个系统调用来管理多个连接上的IO读写,减少了系统调用的次数,减少用户态到内核态的切换,降低开销,因为切换时需要用额外的寄存器传递参数
- 通过Channel
服务端
public static void main(String[] args) throws IOException{
int port = 8080;
if(args != null &&args.length >0){
try{
port = Integer.valueOf(args[0]);
}catch (NumberFormatException ex){
//采用默认值
}
}
MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
new Thread(timeServer,"NIO-MultiplexerTimeServer-001").start();
}
public class MultiplexerTimeServer implements Runnable {
private Selector selector;
private ServerSocketChannel serverChannel;
private volatile boolean stop;
/**
* 初始化多路复用器,绑定监听端口
* @param port
*/
public MultiplexerTimeServer(int port){
try{
selector = Selector.open();//创建多路复用器
serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);//设置为异步非阻塞模式
serverChannel.socket().bind(new InetSocketAddress(port),1024);//绑定端口
serverChannel.register(selector,SelectionKey.OP_ACCEPT);//注册到Selector
System.out.println("The time server is start in port:" + port);
}catch (IOException e){
e.printStackTrace();
System.exit(1);
}
}
public void stop(){
this.stop = true;
}
public void run(){
while(!stop){
try{
selector.select(1000);//selector每隔1s都被唤醒一次
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
SelectionKey key = null;
while(it.hasNext()){
key = it.next();
it.remove();
try{
handleInput(key);
}catch (Exception e){
if(key !=null){
key.cancel();
if(key.channel() !=null)
key.channel().close();
}
}
}
}catch (Throwable t){
t.printStackTrace();
}
}
//多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
if(selector != null){
try{
selector.close();
}catch (IOException e){
e.printStackTrace();
}
}
}
private void handleInput(SelectionKey key) throws IOException{
if(key.isValid()){
//处理新接入的请求消息
if(key.isAcceptable()){
//Accept the new connection
ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
SocketChannel sc = ssc.accept();//接收客户端的连接请求,完成TCP三次握手
sc.configureBlocking(false);//设置为异步非阻塞
//Add the new connection to the selector
sc.register(selector,SelectionKey.OP_READ);
}
if(key.isReadable()){
//Read the data
SocketChannel sc = (SocketChannel)key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if(readBytes > 0 ){
readBuffer.flip();//将缓冲区当前的limit设置为position,position设置为0
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes,"UTF-8");
System.out.println("The time server receive order :" + body);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)?new java.util.Date(System.currentTimeMillis()).toString():"BAD ORDER";
doWrite(sc,currentTime);
}else if(readBytes <0){
//对端链路关闭
key.cancel();
sc.close();
}else{
//读到0字节,忽略
}
}
}
}
/**
* 将应答消息异步发送给客户端
* @param channel
* @param response
* @throws IOException
*/
private void doWrite(SocketChannel channel,String response) throws IOException{
if(response !=null && response.trim().length() >0){
byte[] bytes = response.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer);
}
}
}
客户端代码
public static void main(String[] args) throws IOException{
int port = 8080;
if(args != null &&args.length >0){
try{
port = Integer.valueOf(args[0]);
}catch (NumberFormatException ex){
//采用默认值
}
}
new Thread(new TimeClientHandle("127.0.0.1",port),"TimeClient-001").start();
}
public class TimeClientHandle implements Runnable{
private String host;
private int port;
private Selector selector;
private SocketChannel socketChannel;
private volatile boolean stop;
public TimeClientHandle(String host,int port){
this.host = host == null?"127.0.0.1":host;
this.port = port;
try{
selector = Selector.open();
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
}catch (IOException e){
e.printStackTrace();
System.exit(1);
}
}
public void run(){
try{
doConnect();
}catch (IOException e){
e.printStackTrace();
System.exit(1);
}
while(!stop){
try{
selector.select(1000);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
SelectionKey key = null;
while(it.hasNext()){
key = it.next();
it.remove();
try{
handleInput(key);
}catch (Exception e){
if(key != null){
key.cancel();
if(key.channel() !=null)
key.channel().close();
}
}
}
}catch (Exception e){
e.printStackTrace();
System.exit(1);
}
}
if(selector !=null){
try{
selector.close();
}catch (IOException e){
e.printStackTrace();
}
}
}
private void handleInput(SelectionKey key) throws IOException{
if(key.isValid()){
//判断是否连接成功
SocketChannel sc = (SocketChannel)key.channel();
if(key.isConnectable()){
if(sc.finishConnect()){
sc.register(selector,SelectionKey.OP_READ);
doWrite(sc);
}else{
System.exit(1);//连接失败,进程退出
}
}
if(key.isReadable()) {
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();//将缓冲区当前的limit设置为position,position设置为0
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes, "UTF-8");
System.out.println("The time server receive order :" + body);
this.stop = true;
} else if (readBytes < 0) {
//对端链路关闭
key.cancel();
sc.close();
} else {
//读到0字节,忽略
}
}
}
}
private void doConnect() throws IOException{
if(socketChannel.connect(new InetSocketAddress(host,port))){
socketChannel.register(selector,SelectionKey.OP_READ);
doWrite(socketChannel);
}else{
socketChannel.register(selector,SelectionKey.OP_CONNECT);
}
}
private void doWrite(SocketChannel sc) throws IOException {
byte[] bytes = "QUERY TIME ORDER".getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
sc.write(writeBuffer);
if (!writeBuffer.hasRemaining())
System.out.println("Send order 2 server succeed.");
}
}
由于NIO比较难用,所以Socket编程更多地使用Netty框架
Reactor模型
code
Reactor模型中定义三种角色:Reactor将IO事件分发给Handler;Acceptor处理客户端连接,请求派发到处理器;Handler执行读写任务。
1. 单Reactor单线程模型
Reactor线程,负责多路分离套接字,有新连接到来触发connect 事件之后,交由Acceptor进行处理,有IO读写事件之后交给hanlder 处理
2. 单Reactor多线程模型
在处理业务逻辑,也就是获取到IO的读写事件之后,交由线程池来处理,这样可以减小主reactor的性能开销
3. 多Reactor多线程模型
这种模型下,将处理连接和处理业务分成分成主Reactor和从Reactor。此种模型中,每个模块的工作更加专一,耦合度更低,性能和稳定性也大量的提升,支持的可并发客户端数量可达到上百万级别,Netty中NIO就是基于这种模型。