现在所有的基础模块已经差不多了,我们可以开始核心模块的开发 到了这里,我们将逐渐接触到业务层的逻辑了 前面的模块是比较通用的模块,所以我们几乎可以完全不谈业务逻辑。
但是后面的模块 ,后面都需要直接和业务层打交道,所以我们需要逐步考虑业务层的需求了。
要谈异步,首先我们就要谈同步。 一般情况下,我们的通信模块都是同步的
创建套接字->监听套接字->收到客户端->收发数据->关闭连接
在整个流程之中,任何一步都可能出现阻塞。 这个要求其实也合理,因为这些操作有前后顺序依赖 没有创建套接字,就无法监听;没有收到客户端,就无法收发数据 当时这里面却又隐藏着一个问题:
当服务器在等待客户端发送数据的时候,我们是否同时去等待其他客户端的接入?
其实任何一步,执行需要的时间都非常短暂 无论是创建套接字、监听套接字、将客户端接入、将数据收发、关闭链接 每一个都耗时非常短暂。 理论上我们可以用一个线程,完成多个客户端的服务。 所以现在我们需要一种机制,当客户端请求的时候,系统能通知我们。 然后我们去接入用户,从而节约大量时间。 当然这个可以用poll去实现,当时我们已经有了更高效的IOCP(如果是Linux,可以使用epoll)
创建套接字,绑定,监听之后,我们可以将套接字叫个IOCP来处理 当有客户端接入时,IOCP会通知我们该事件已经就绪。 这样一个IOCP线程,就可以处理多个客户端内容
加上我们可以申请多个IOCP线程,这样就可以在多个线程中处理更多的客户端请求了
需求说明
考虑到上面的分析,我们需要一个类来管理这些通信。 否则我们在收到客户端A的请求后,将结果发给客户端B。 或者A需求查数据库,而数据库结果出来后,却搞不清刚才是谁发来的 而且客户端有各种状态,也需要进行管理。 所以我们需要一个异步通信管理模块
- 能够记录客户端各种状态
- 能够收发客户端状态,交给业务层去处理
- 提供服务器的处理
- 异步处理通信过程
依赖实现:
class AsynClient
{
public:
AsynClient();
~AsynClient();
//禁止复制
AsynClient(const AsynClient&) = delete;
AsynClient& operator=(const AsynClient&) = delete;
void Init(PAsynServer& server, PAsynClient& self);
int Send(const PCPacket& packet);//异步写,立刻返回
int Recv();//异步读,立刻返回
void Close();
operator SOCKET() { return m_socket; }
operator sockaddr* () { return (sockaddr*)&m_addr; }
std::string Ip() { return m_addr.ip(); }
short Port() { return m_addr.sin_port; }
private:
PAsynServer m_pServer;
PAsynClient m_self;
std::mutex m_lockSend;
CPacketList m_lstSendPackets;
std::mutex m_lockRecv;
CPacketList m_lstRecvPackets;
SOCKET m_socket;
SockAddrIn m_addr;
};
为了方便客户端收发数据和管理,所以我们需要一个包类来管理
class CPacket
{
public:
CPacket() {}
CPacket(size_t size) { m_data.resize(size); }
CPacket(const CPacket& pack);
CPacket(const void* data, size_t size);
~CPacket();
CPacket& operator=(const CPacket& pack);
operator const char* ()const { return m_data.c_str(); }
operator char* ()const {
return const_cast<char*>(m_data.c_str());
}
size_t Size()const { return m_data.size(); }
void Resize(size_t size) { m_data.resize(size); }
private:
std::string m_data;
};
同时为了方便地址的处理,我们封装了一个结构体
struct SockAddrIn
:public sockaddr_in
{
public:
SockAddrIn(int family = AF_INET) {
memset(this, 0, sizeof(sockaddr_in));
sin_family = family;
}
SockAddrIn(const sockaddr_in& addr) {
sin_family = addr.sin_family;
sin_addr = addr.sin_addr;
sin_port = addr.sin_port;
memcpy(sin_zero, &addr.sin_zero, sizeof(sin_zero));
}
SockAddrIn(const SockAddrIn& addr) { memcpy(this, &addr, sizeof(SockAddrIn)); }
SockAddrIn& operator=(const short& port) { sin_port = htons(port); }
SockAddrIn& operator = (const std::string& ip) { inet_pton(AF_INET, ip.c_str(), &sin_addr); }
SockAddrIn& operator =(const SockAddrIn& addr) {
if (this != &addr) {
memcpy(this, &addr, sizeof(SockAddrIn));
}
return *this;
}
std::string ip() {
std::string result;
result.resize(32);
result = inet_ntop(sin_family, &sin_addr, const_cast<char*>(result.c_str()), result.size());
return result;
}
};
同时为了方便服务器记录客户端,所以实现了一个客户端映射表:
class ClientMap
{
public:
ClientMap();
~ClientMap();
int AddClient(PAsynClient& client);//添加一个客户端
PAsynClient GetClient(SOCKET s);//获取一个指定的客户端
int GetClients(MapClient& result);//获取所有客户端版本
private:
int ClientMapMain();
private:
ZXTool::CThread m_thread;
HANDLE m_iocp;
MapClient m_mapCilent;
enum {
CMP_NONE,//无操作
CMP_ADD,//添加
CMP_QUERY,//单个查询
CMP_ALL//获取全部
};
};
此外,要使用异步通信,所以我们还需要一个异步数据类来管理异步数据
struct OperatorOverlapped
{
public:
OperatorOverlapped(PAsynClient pclient, DWORD Type) {
memset(&overlapped, 0, sizeof(overlapped));
}
OVERLAPPED overlapped;
WSABUF wsabuffer;
DWORD recvbytes;
DWORD flags;
DWORD type;//1代表accept 2代表recv 3代表send
PAsynClient client;
PCPacket packet;
};
为了规范业务层的接口,我们还需要定义一个业务层:
class CBusinessLayer
{
public:
//客户端上来了,不要阻塞
virtual int ClientConnected(PAsynClient& client) = 0;
virtual int ClientRecvData(PAsynClient& client, PCPacket& packet) = 0;
virtual int ClientSendDone(PAsynClient& client, PCPacket& packet) = 0;
};
有了这些依赖模块,我们最后还需要定义一个套接字初始化模块
因为windows网络编程需要有全局的套接字初始化操作
class WSAIniter {
public:
WSAIniter() {
WSADATA wsadata;
if (WSAStartup(MAKEWORD(1, 1), &wsadata)) {
TRACEW(_T("WSAStartup 失败!error = %d"), WSAGetLastError());
}
}
~WSAIniter() {
if (WSACleanup()) {
TRACEW(_T("WSACleanup 失败!error = %d"), WSAGetLastError());
}
}
};
有了前面的铺垫,就可以来写服务器类了
class AsynServer
{
public:
friend class AsynClient;
AsynServer(const std::string& sIP, short port = 9666);
~AsynServer();
//禁止复制
AsynServer(const AsynServer&) = delete;
AsynServer& operator=(const AsynServer&) = delete;
int Start(PasynServer& self, PBLayer& business);
void Close();
private:
int Accept();
int ServerMain();
private:
CTaskActuator m_Task;
ClientMap m_mapClients;
SOCKET m_socket;
HANDLE m_iocp;
std::string m_ip;
short m_port;
PAsynServer m_self;
PBLayer m_business;
};
具体实现略