IOCP 算是.Net里,相对来讲是属于AIO模型的,而常用的socket是BIO模型的,Netty技术是属于NIO模型的。
IOCP是windows平台最高效的通信模型,可以在一个应用程序同时管理为数众多的套接字,以达到最佳的系统性能!从本质上说,完成端口模型要求我们创建一个Win32完成端口对象,通过指定数量的线程,对重叠I/O请求进行管理,以便为已经完成的重叠I/O请求提供服务。
这个技术点算是通讯顶级的技术方案了。用的好,性能刚刚的。
IOCP在Windows下,算是性能发挥最强的方案。
话不多说,看代码。
IOCPSocket实现
实现主要是两方面,一方面是实现IOCP客户端,一方面是要实现IOCP服务端。
开搞。
新建项目,大概是这个样子
IOCP客户端 IOCPClient.cs
/// <summary>
/// 一个客户端
/// </summary>
public class IOCPClient : IDisposable
{
/// <summary>
/// 客户端socket
/// </summary>
private Socket _clientSock;
/// <summary>
/// 服务端 ip信息
/// </summary>
public IPEndPoint ServerEndPoint;
/// <summary>
/// 接收的委托
/// </summary>
/// <param name="e"></param>
public delegate void ReceiveHandler(byte[] data);
/// <summary>
/// 接收数据的时间
/// </summary>
public event ReceiveHandler OnReceive;
/// <summary>
/// 开始的委托
/// </summary>
public delegate void StartHandler();
/// <summary>
/// 开始的事件
/// </summary>
public event StartHandler OnStart;
/// <summary>
/// 关闭事件的委托
/// </summary>
public delegate void CloseHandler();
/// <summary>
/// 关闭事件
/// </summary>
public event CloseHandler OnClose;
/// <summary>
/// 发送事件的委托
/// </summary>
public delegate void SeededHandler();
/// <summary>
/// 发送完毕后的事件
/// </summary>
public event SeededHandler OnSeeded;
/// <summary>
/// 异常错误事件
/// </summary>
public delegate void ErrHandler(Exception e);
/// <summary>
/// 异常事件
/// </summary>
public event ErrHandler OnErr;
/// <summary>
/// 是否在运行
/// </summary>
public bool IsRuning = false;
private byte[] buffer;
private List<byte> bufferData;
/// <summary>
/// 客户端socket
/// </summary>
/// <param name="serverIp">服务端ip</param>
/// <param name="serverPort">服务端端口</param>
/// <param name="encoding">编码</param>
public IOCPClient(string serverIp, int serverPort, int bufferSize = 1024)
{
_clientSock = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
ServerEndPoint = new IPEndPoint(IPAddress.Parse(serverIp), serverPort);
buffer = new byte[bufferSize];
bufferData = new List<byte>();
}
/// <summary>
/// 开始连接服务器
/// </summary>
public void ConnServer()
{
_clientSock.BeginConnect(ServerEndPoint, new AsyncCallback(ConnectCallback), IsRuning);
}
/// <summary>
/// 异步连接的回调
/// </summary>
/// <param name="ar"></param>
private void ConnectCallback(IAsyncResult ar)
{
try
{
_clientSock.EndConnect(ar);
IsRuning = true;
if (OnStart != null)
{
OnStart();
}
}
catch (Exception e) { IsRuning = false; if (OnErr != null) { OnErr(e); } }
}
/// <summary>
/// 发送数据
/// </summary>
/// <param name="data"></param>
public void Seed(byte[] data)
{
_clientSock.BeginSend(data, 0, data.Length, SocketFlags.None, new AsyncCallback(SendCallback), null);
}
//发送数据,以防发送少了。。
private void SendCallback(IAsyncResult ar)
{
try
{
if (_clientSock.Connected)
{
_clientSock.EndSend(ar);
if (OnSeeded != null) { OnSeeded(); }
}
}
catch (Exception e) { if (OnErr != null) { OnErr(e); } }
}
/// <summary>
/// 接收
/// </summary>
public void Receive()
{
if (_clientSock.Connected)
{
_clientSock.BeginReceive(buffer, 0, buffer.Length, SocketFlags.None, new AsyncCallback(ReceiveCallback), null);
}
}
/// <summary>
/// 接收
/// </summary>
/// <param name="ar"></param>
private void ReceiveCallback(IAsyncResult ar)
{
try
{
if (_clientSock.Connected)
{
int count = _clientSock.EndReceive(ar);
for (int i = 0; i < count; i++)
{
bufferData.Add(buffer[i]);
}
if (this.OnReceive != null)
{
if (_clientSock.Available == 0)
{
this.OnReceive(bufferData.ToArray());
bufferData.Clear();
}
}
this.Receive();
}
}
catch (Exception e) { if (OnErr != null) { OnErr(e); } }
}
/// <summary>
/// 关闭服务
/// </summary>
public void Dispose()
{
IsRuning = false;
_clientSock.Shutdown(SocketShutdown.Send);
_clientSock.Close();
if (OnClose != null)
{
OnClose();
}
}
}
IOCP 服务端 IOCPServer.cs
服务端,其实第一是用了socket异步,第二是用了一个连接池化技术。
/// <summary>
/// 异步通讯类高级并发基类
/// 异步接收,同步发送
/// </summary>
public class IOCPServer : IDisposable
{
/// <summary>
/// 服务端负责监听的socket
/// </summary>
private Socket ListenerSocket { get; set; }
/// <summary>
/// 最大连接数
/// </summary>
private int MaxConnectNumber { get; set; }
/// <summary>
/// 最大接收字符数
/// </summary>
private int RevBufferSize { get; set; }
/// <summary>
/// 本地地址
/// </summary>
public IPEndPoint ServerlocaPoint { get; set; }
/// <summary>
/// 是否在运行
/// </summary>
public bool IsRunning = false;
/// <summary>
/// 是否释放了
/// </summary>
private bool disposed;
/// <summary>
/// 基于这个事件的委托
/// </summary>
/// <param name="UserToken"></param>
public delegate void ReceiveHandler(AsyncUserToken UserToken);
/// <summary>
/// 一个接收的事件
/// </summary>
public event ReceiveHandler OnReceive;
/// <summary>
/// 新用户的委托
/// </summary>
/// <param name="UserToken"></param>
public delegate void newAcceptHandler(AsyncUserToken UserToken);
/// <summary>
/// 新用户的事件
/// </summary>
public event newAcceptHandler OnNewAccept;
/// <summary>
/// 新用户的委托
/// </summary>
/// <param name="UserToken"></param>
public delegate void newQuitHandler(AsyncUserToken UserToken);
/// <summary>
/// 新用户的事件
/// </summary>
public event newQuitHandler OnQuit;
/// <summary>
/// 开始服务的委托
/// </summary>
public delegate void ServerStart();
/// <summary>
/// 开始服务的事件
/// </summary>
public event ServerStart OnStart;
/// <summary>
/// 发送信息完成后的委托
/// </summary>
/// <param name="successorfalse"></param>
public delegate void SendCompletedHandler(AsyncUserToken UserToken, int SeedLength);
/// <summary>
/// 发送信息完成后的事件
/// </summary>
public event SendCompletedHandler OnSended;
/// <summary>
/// 客户端列表
/// </summary>
public Dictionary<string, AsyncUserToken> clients;
/// <summary>
/// 对象池
/// </summary>
private AsyncUserTokenPool _userTokenPool;
/// <summary>
/// 异步socket TCP服务器
/// </summary>
/// <param name="listenPort">监听的端口</param>
/// <param name="maxClient">最大的客户端数量</param>
public IOCPServer(int listenPort, int maxClient) : this(IPAddress.Any, listenPort, maxClient)
{ }
/// <summary>
/// 异步socket TCP服务器
/// </summary>
/// <param name="localIpaddress">监听的ip地址</param>
/// <param name="listenPort">监听的端口</param>
/// <param name="maxClient">最大的客户端数量</param>
/// <param name="BufferSize">缓存的buffer</param>
public IOCPServer(IPAddress localIpaddress, int listenPort, int maxClient, int BufferSize = 1024)
{
IsRunning = true;//服务状态变成 已在运行
disposed = false;
clients = new Dictionary<string, AsyncUserToken>();
RevBufferSize = BufferSize;
MaxConnectNumber = maxClient;
ServerlocaPoint = new IPEndPoint(localIpaddress, listenPort);
ListenerSocket = new Socket(localIpaddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
_userTokenPool = new AsyncUserTokenPool(MaxConnectNumber);
AsyncUserToken userToken;
for (int i = 0; i < MaxConnectNumber; i++)
{
userToken = new AsyncUserToken(RevBufferSize);
userToken.ReceiveEventArgs.Completed += new EventHandler<SocketAsyncEventArgs>(OnIOCompleted);
_userTokenPool.Push(userToken);
}
}
/// <summary>
/// 开始监听
/// </summary>
public void Start()
{
ListenerSocket.Bind(ServerlocaPoint);
//开始监听
ListenerSocket.Listen(MaxConnectNumber);
//投递第一个接受的请求
//一个异步socket事件
StartAccept();
if (OnStart != null)
{
OnStart();
}
}
/// <summary>
/// 开始接收新的异步请求
/// </summary>
/// <param name="Args"></param>
private void StartAccept(SocketAsyncEventArgs Args = null)
{
if (Args == null)
{
Args = new SocketAsyncEventArgs();
//接收的事件,要放在另外一个事件里
Args.Completed += new EventHandler<SocketAsyncEventArgs>(AcceptEvent_Completed);
}
else
{
Args.AcceptSocket = null;
}
//如果 挂起,则会触发 OnIOCompleted 事件方法
//否则,指定 接收的方法
if (!ListenerSocket.AcceptAsync(Args))
{
ProcessAccept(Args);
}
}
/// <summary>
/// 只处理收到的请求的事件
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void AcceptEvent_Completed(object sender, SocketAsyncEventArgs e)
{
ProcessAccept(e);
}
/// <summary>
/// 当socket 上的发送或接收被完成时,调用此函数
/// </summary>
/// <param name="sender">激发事件的对象</param>
/// <param name="e">与发送或接收完成操作相关联的socketAsyncEventArg对象</param>
private void OnIOCompleted(object sender, SocketAsyncEventArgs e)
{
AsyncUserToken userToken = e.UserToken as AsyncUserToken;
if (e.LastOperation == SocketAsyncOperation.Receive && e.SocketError == SocketError.Success)
{
ProcessReceive(e);
}
else if (e.SocketError == SocketError.ConnectionReset)
{
CloseClientSocket(userToken);
}
}
/// <summary>
/// 监听socket接收处理
/// </summary>
/// <param name="e"></param>
private void ProcessAccept(SocketAsyncEventArgs e)
{
//如果socket状态正常并且链接也正常
if (e.SocketError == SocketError.Success && e.AcceptSocket.Connected)
{
//获取当前客户端连接的socket
Socket socket = e.AcceptSocket;
//获取一个这个对象的 token
AsyncUserToken userToken = _userTokenPool.Pop();
userToken.ConnectSocket = socket;
userToken.ConnectTime = DateTime.Now;
userToken.RemoteAddress = e.AcceptSocket.RemoteEndPoint;
userToken.IPAddress = ((IPEndPoint)(e.AcceptSocket.RemoteEndPoint)).Address;
//更新客户列表
lock (clients) { clients.Add(e.AcceptSocket.RemoteEndPoint.ToString(), userToken); }
if (OnNewAccept != null)
{
OnNewAccept(userToken);
}
//开始投递 接收异步请求
if (!socket.ReceiveAsync(userToken.ReceiveEventArgs))
{
ProcessReceive(userToken.ReceiveEventArgs);
}
StartAccept(e);
}
}
/// <summary>
/// 已经收到消息
/// </summary>
/// <param name="e"></param>
private void ProcessReceive(SocketAsyncEventArgs e)
{
AsyncUserToken userToken = e.UserToken as AsyncUserToken;
if (userToken.ReceiveEventArgs.BytesTransferred > 0 && userToken.ReceiveEventArgs.SocketError == SocketError.Success)
{
Socket socket = userToken.ConnectSocket;
userToken.Receive();
if (socket.Available == 0)
{
if (OnReceive != null)
{
OnReceive(userToken);
}
userToken.ReceiveBuffer.Clear();
}
if (!socket.ReceiveAsync(e))
{
ProcessReceive(e);
}
}
else
{
CloseClientSocket(userToken);
}
}
/// <summary>
/// 关闭已经出问题的客户端
/// </summary>
/// <param name="e"></param>
private void CloseClientSocket(AsyncUserToken userToken)
{
if (OnQuit != null)
{
OnQuit(userToken);
}
//移除这个客户信息
lock (clients) { clients.Remove(userToken.RemoteAddress.ToString()); }
//通知客户端要关闭连接
try
{
userToken.ConnectSocket.Shutdown(SocketShutdown.Send);
}
catch (Exception) { }
userToken.ConnectSocket.Close();//关闭客户端的socket
//压入新的对象
userToken = new AsyncUserToken(RevBufferSize);
userToken.ReceiveEventArgs.Completed += new EventHandler<SocketAsyncEventArgs>(OnIOCompleted);
_userTokenPool.Push(userToken);
//开始接收新的请求
StartAccept();
}
/// <summary>
/// 直接发送数据
/// </summary>
/// <param name="token"></param>
/// <param name="message"></param>
/// <returns></returns>
public void Seed(AsyncUserToken userToken, byte[] message)
{
int num = Seed(userToken.ConnectSocket, message);
if (OnSended != null)
{
OnSended(userToken, num);
}
}
/// <summary>
/// 直接发送数据k
/// </summary>
/// <param name="socket"></param>
/// <param name="message"></param>
/// <returns></returns>
public int Seed(Socket socket, byte[] message)
{
try
{
if (socket != null && socket.Connected != false)
{
return socket.Send(message);
}
}
catch (Exception) { };
return -1;
}
/// <summary>
/// 资源的释放
/// </summary>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
/// <summary>
/// 释放资源
/// </summary>
/// <param name="disposing"></param>
private void Dispose(bool disposing)
{
if (!this.disposed && disposed)
{
_userTokenPool.Dispose();
//关闭其他客户端的信息
foreach (var item in clients)
{
try
{
item.Value.ConnectSocket.Shutdown(SocketShutdown.Both);
}
catch (Exception) { }
}
//关闭服务器的信息
try
{
ListenerSocket.Shutdown(SocketShutdown.Both);
}
catch (Exception) { }
//关闭监听的socket
ListenerSocket.Close();
//清空客户端列表
lock (clients) { clients.Clear(); }
this.disposed = true;
IsRunning = false;
}
}
}
用户token管理 AsyncUserToken.cs
/// <summary>
/// 用户对象
/// 一个socket的seed和receive 分别用一个SocketAsyncEventArgs
/// </summary>
public class AsyncUserToken
{
/// <summary>
/// 构造函数
/// </summary>
/// <param name="bufferSize">缓存的长度</param>
public AsyncUserToken(int bufferSize)
{
ConnectSocket = null;
ReceiveEventArgs = new SocketAsyncEventArgs() { UserToken = this };
AsyncReceiveBuffer = new byte[bufferSize];
ReceiveEventArgs.SetBuffer(AsyncReceiveBuffer, 0, AsyncReceiveBuffer.Length);//设置接收缓冲区
ReceiveBuffer = new List<byte>();
temp = new Dictionary<string, object>();
}
/// <summary>
/// 接收数据的SocketAsyncEventArgs
/// </summary>
public SocketAsyncEventArgs ReceiveEventArgs { get; set; }
/// <summary>
/// 连接的Socket对象
/// </summary>
public Socket ConnectSocket { get; set; }
/// <summary>
/// 连接的时间
/// </summary>
public DateTime ConnectTime { get; set; }
/// <summary>
/// 远程地址
/// </summary>
public EndPoint RemoteAddress { get; set; }
/// <summary>
/// 客户端IP地址
/// </summary>
public IPAddress IPAddress { get; set; }
/// <summary>
/// 接收数据的缓冲区
/// </summary>
public byte[] AsyncReceiveBuffer { get; set; }
/// <summary>
/// 动态接收数据
/// </summary>
public List<byte> ReceiveBuffer { get; set; }
/// <summary>
/// 是否接受完数据
/// </summary>
public bool IsReceiveOk { get; set; }
/// <summary>
/// 临时数据
/// </summary>
public Dictionary<string, Object> temp;
/// <summary>
/// 内部写一个接收的方法
/// </summary>
/// <returns></returns>
public void Receive()
{
//把数据清楚完后,再接收新的数据
//if (ConnectSocket.Available == 0 && IsReceiveOk == false)
//{
// IsReceiveOk = true;
//}
//else if (ConnectSocket.Available == 0 && IsReceiveOk == true)
//{
// ReceiveBuffer.Clear();
// IsReceiveOk = false;
//}
for (int i = 0; i < ReceiveEventArgs.BytesTransferred; i++)
{
ReceiveBuffer.Add(ReceiveEventArgs.Buffer[i]);
}
}
}
线程池管理 AsyncUserTokenPool.cs
/// <summary>
/// AsyncUserToken 对象池 (固定缓存设计)
/// </summary>
public class AsyncUserTokenPool : IDisposable
{
/// <summary>
/// 后进先出的集合
/// </summary>
Stack<AsyncUserToken> m_pool;
/// <summary>
/// 构造函数
/// </summary>
/// <param name="capacity">设置对象池的容量</param>
public AsyncUserTokenPool(int capacity)
{
m_pool = new Stack<AsyncUserToken>(capacity);
}
/// <summary>
/// 压入一个数据
/// </summary>
/// <param name="item"></param>
public void Push(AsyncUserToken item)
{
if (item != null)
{
lock (m_pool)
{
m_pool.Push(item);
}
}
}
/// <summary>
/// 弹出一个
/// </summary>
/// <returns></returns>
public AsyncUserToken Pop()
{
lock (m_pool)
{
return m_pool.Pop();
}
}
/// <summary>
/// 获取总和数
/// </summary>
public int Count
{
get { return m_pool.Count; }
}
/// <summary>
/// 释放资源
/// </summary>
public void Dispose()
{
do
{
this.Pop();
}
while (this.Count > 0);
}
}
客户端代码样例
只要连接上服务端,就发送一条信息,然后,接受信息,并显示出来。
class Program
{
static IOCPClient client;
static void Main(string[] args)
{
using (client = new IOCPClient("127.0.0.1", 9999))
{
client.OnReceive += new IOCPClient.ReceiveHandler(ReceiveHandler);
client.OnClose += new IOCPClient.CloseHandler(CloseHandler);
client.OnErr += new IOCPClient.ErrHandler(ErrHandler);
client.OnSeeded += new IOCPClient.SeededHandler(SeededHandler);
client.OnStart += new IOCPClient.StartHandler(StartHandler);
client.ConnServer();
client.Seed(Encoding.UTF8.GetBytes("客户端发送的信息"));
client.Receive();
Console.ReadLine();
}
Console.ReadLine();
}
public static void ReceiveHandler(byte[] data)
{
Console.WriteLine("服务端的信息:" + Encoding.UTF8.GetString(data));
}
public static void CloseHandler()
{
Console.WriteLine("服务已经关闭");
}
/// </summary>
public static void ErrHandler(Exception e)
{
Console.WriteLine("发生了异常!" + e.ToString());
}
public static void SeededHandler()
{
Console.WriteLine("刚才的消息发送成功!");
}
public static void StartHandler()
{
Console.WriteLine("连接服务器成功:" + client.ServerEndPoint.ToString());
}
} class Program
{
static IOCPClient client;
static void Main(string[] args)
{
Console.Title = "蓝创精英团队 IOCP Client Demo";
using (client = new IOCPClient("127.0.0.1", 9999))
{
client.OnReceive += new IOCPClient.ReceiveHandler(ReceiveHandler);
client.OnClose += new IOCPClient.CloseHandler(CloseHandler);
client.OnErr += new IOCPClient.ErrHandler(ErrHandler);
client.OnSeeded += new IOCPClient.SeededHandler(SeededHandler);
client.OnStart += new IOCPClient.StartHandler(StartHandler);
client.ConnServer();
client.Seed(Encoding.UTF8.GetBytes("客户端发送的信息"));
client.Receive();
Console.ReadLine();
}
Console.ReadLine();
}
public static void ReceiveHandler(byte[] data)
{
Console.WriteLine("服务端的信息:" + Encoding.UTF8.GetString(data));
}
public static void CloseHandler()
{
Console.WriteLine("服务已经关闭");
}
/// </summary>
public static void ErrHandler(Exception e)
{
Console.WriteLine("发生了异常!" + e.ToString());
}
public static void SeededHandler()
{
Console.WriteLine("刚才的消息发送成功!");
}
public static void StartHandler()
{
Console.WriteLine("连接服务器成功:" + client.ServerEndPoint.ToString());
}
}
服务端样例
启动服务,然后,获取到数据,就转发回客户端,说我收到了。
class Program
{
public static IOCPServer server;
static void Main(string[] args)
{
Console.Title = "蓝创精英团队 IOCP Server Demo";
server = new IOCPServer(9999, 1000);
server.OnReceive += new IOCPServer.ReceiveHandler(server_OnReceive);
server.OnNewAccept += new IOCPServer.newAcceptHandler(newAcceptHandler);
server.OnQuit += new IOCPServer.newQuitHandler(newQuitHandler);
server.OnStart += new IOCPServer.ServerStart(ServerStart);
server.Start();
Console.ReadLine();
}
/// <summary>
/// 收到信息
/// </summary>
/// <param name="args"></param>
public static void server_OnReceive(AsyncUserToken args)
{
string data = Encoding.UTF8.GetString(args.ReceiveBuffer.ToArray());
Console.WriteLine("获取到的数据:" + data + " " + DateTime.Now.ToString());
server.Seed(args, Encoding.UTF8.GetBytes(data));
}
/// <summary>
/// 新接入的用户
/// </summary>
/// <param name="UserToken"></param>
public static void newAcceptHandler(AsyncUserToken UserToken)
{
Console.WriteLine("一个新的用户:" + UserToken.RemoteAddress.ToString());
}
/// <summary>
/// 退出用户
/// </summary>
/// <param name="UserToken"></param>
public static void newQuitHandler(AsyncUserToken UserToken)
{
Console.WriteLine("用户:" + UserToken.RemoteAddress.ToString() + "退出连接");
}
/// <summary>
/// 信息发送成功!
/// </summary>
/// <param name="userToken"></param>
public static void SendCompletedHandler(string userToken)
{
Console.WriteLine("刚才那条消息发送成功!" + userToken);
}
/// <summary>
/// 服务启动
/// </summary>
public static void ServerStart()
{
Console.WriteLine("服务启动 端口:" + 9999);
}
}
效果展示
先启动服务端,效果如下:
再启动客户端,效果如下:
展示结束
总结
其实更多的IOCP,用了池化技术的缓存,以及用了异步的socket,来实现性能的加强。
代码地址
https://github.com/kesshei/IOCPDemo.git
https://gitee.com/kesshei/IOCPDemo.git