0
点赞
收藏
分享

微信扫一扫

高性能服务器 异步通信模块

code_balance 2022-03-30 阅读 131
c++服务器

现在所有的基础模块已经差不多了,我们可以开始核心模块的开发 到了这里,我们将逐渐接触到业务层的逻辑了 前面的模块是比较通用的模块,所以我们几乎可以完全不谈业务逻辑。

但是后面的模块 ,后面都需要直接和业务层打交道,所以我们需要逐步考虑业务层的需求了。

要谈异步,首先我们就要谈同步。 一般情况下,我们的通信模块都是同步的

创建套接字->监听套接字->收到客户端->收发数据->关闭连接

在整个流程之中,任何一步都可能出现阻塞。                                                                                这个要求其实也合理,因为这些操作有前后顺序依赖                                                                         没有创建套接字,就无法监听;没有收到客户端,就无法收发数据                                                   当时这里面却又隐藏着一个问题:
当服务器在等待客户端发送数据的时候,我们是否同时去等待其他客户端的接入?

其实任何一步,执行需要的时间都非常短暂                                                                                       无论是创建套接字、监听套接字、将客户端接入、将数据收发、关闭链接                                         每一个都耗时非常短暂。                                                                                                                   理论上我们可以用一个线程,完成多个客户端的服务。                                                                     所以现在我们需要一种机制,当客户端请求的时候,系统能通知我们。                                           然后我们去接入用户,从而节约大量时间。                                                                                      当然这个可以用poll去实现,当时我们已经有了更高效的IOCP(如果是Linux,可以使用epoll)

创建套接字,绑定,监听之后,我们可以将套接字叫个IOCP来处理                                                   当有客户端接入时,IOCP会通知我们该事件已经就绪。                                                                   这样一个IOCP线程,就可以处理多个客户端内容

加上我们可以申请多个IOCP线程,这样就可以在多个线程中处理更多的客户端请求了

需求说明

考虑到上面的分析,我们需要一个类来管理这些通信。                                                                     否则我们在收到客户端A的请求后,将结果发给客户端B。                                                                 或者A需求查数据库,而数据库结果出来后,却搞不清刚才是谁发来的                                              而且客户端有各种状态,也需要进行管理。                                                                                      所以我们需要一个异步通信管理模块

  1. 能够记录客户端各种状态
  2. 能够收发客户端状态,交给业务层去处理
  3. 提供服务器的处理
  4. 异步处理通信过程

依赖实现:
 

class AsynClient
{
public:
	AsynClient();
	~AsynClient();
	//禁止复制
	AsynClient(const AsynClient&) = delete;
	AsynClient& operator=(const AsynClient&) = delete;

	void Init(PAsynServer& server, PAsynClient& self);
	int Send(const PCPacket& packet);//异步写,立刻返回
	int Recv();//异步读,立刻返回

	void Close();

	operator SOCKET() { return m_socket; }
	operator sockaddr* () { return (sockaddr*)&m_addr; }
	std::string Ip() { return m_addr.ip(); }
	short Port() { return m_addr.sin_port; }
private:
	PAsynServer m_pServer;
	PAsynClient m_self;
	std::mutex m_lockSend;
	CPacketList m_lstSendPackets;
	std::mutex m_lockRecv;
	CPacketList m_lstRecvPackets;
	SOCKET m_socket;
	SockAddrIn m_addr;

};

为了方便客户端收发数据和管理,所以我们需要一个包类来管理

class CPacket
{
public:
	CPacket() {}
	CPacket(size_t size) { m_data.resize(size); }
	CPacket(const CPacket& pack);
	CPacket(const void* data, size_t size);
	~CPacket();
	CPacket& operator=(const CPacket& pack);
	operator const char* ()const { return m_data.c_str(); }
	operator char* ()const {
		return const_cast<char*>(m_data.c_str());
	}
	size_t Size()const { return m_data.size(); }
	void Resize(size_t size) { m_data.resize(size); }
private:
	std::string m_data;
};

同时为了方便地址的处理,我们封装了一个结构体

struct SockAddrIn
	:public sockaddr_in
{
public:
	SockAddrIn(int family = AF_INET) {
		memset(this, 0, sizeof(sockaddr_in));
		sin_family = family;
	}
	SockAddrIn(const sockaddr_in& addr) {
		sin_family = addr.sin_family;
		sin_addr = addr.sin_addr;
		sin_port = addr.sin_port;
		memcpy(sin_zero, &addr.sin_zero, sizeof(sin_zero));
	}
	SockAddrIn(const SockAddrIn& addr) { memcpy(this, &addr, sizeof(SockAddrIn)); }
	SockAddrIn& operator=(const short& port) { sin_port = htons(port); }
	SockAddrIn& operator = (const std::string& ip) { inet_pton(AF_INET, ip.c_str(), &sin_addr); }
	SockAddrIn& operator =(const SockAddrIn& addr) {
		if (this != &addr) {
			memcpy(this, &addr, sizeof(SockAddrIn));
		}
		return *this;
	}
	std::string ip() {
		std::string result;
		result.resize(32);
		result = inet_ntop(sin_family, &sin_addr, const_cast<char*>(result.c_str()), result.size());
		return result;
	}
};

同时为了方便服务器记录客户端,所以实现了一个客户端映射表:

class ClientMap
{
public:
	ClientMap();
	~ClientMap();
	int AddClient(PAsynClient& client);//添加一个客户端
	PAsynClient GetClient(SOCKET s);//获取一个指定的客户端
	int GetClients(MapClient& result);//获取所有客户端版本
private:
	int ClientMapMain();
private:
	ZXTool::CThread m_thread;
	HANDLE m_iocp;
	MapClient m_mapCilent;
	enum {
		CMP_NONE,//无操作
		CMP_ADD,//添加
		CMP_QUERY,//单个查询
		CMP_ALL//获取全部
	};
};

此外,要使用异步通信,所以我们还需要一个异步数据类来管理异步数据

struct OperatorOverlapped
{
public:
	OperatorOverlapped(PAsynClient pclient, DWORD Type) {
		memset(&overlapped, 0, sizeof(overlapped));
	}
	OVERLAPPED overlapped;
	WSABUF wsabuffer;
	DWORD recvbytes;
	DWORD flags;
	DWORD type;//1代表accept 2代表recv 3代表send
	PAsynClient client;
	PCPacket packet;
};

为了规范业务层的接口,我们还需要定义一个业务层:

class CBusinessLayer
{
public:
	//客户端上来了,不要阻塞
	virtual int ClientConnected(PAsynClient& client) = 0;
	virtual int ClientRecvData(PAsynClient& client, PCPacket& packet) = 0;
	virtual int ClientSendDone(PAsynClient& client, PCPacket& packet) = 0;
};

有了这些依赖模块,我们最后还需要定义一个套接字初始化模块

因为windows网络编程需要有全局的套接字初始化操作

class WSAIniter {
public:
	WSAIniter() {
		WSADATA wsadata;
		if (WSAStartup(MAKEWORD(1, 1), &wsadata)) {
			TRACEW(_T("WSAStartup 失败!error = %d"), WSAGetLastError());
		}
		
	}
	~WSAIniter() {
		if (WSACleanup()) {
			TRACEW(_T("WSACleanup 失败!error = %d"), WSAGetLastError());
		}
	}
};

有了前面的铺垫,就可以来写服务器类了

class AsynServer
{
public:
	friend class AsynClient;
	AsynServer(const std::string& sIP, short port = 9666);
	~AsynServer();
	//禁止复制
	AsynServer(const AsynServer&) = delete;
	AsynServer& operator=(const AsynServer&) = delete;

	int Start(PasynServer& self, PBLayer& business);
	void Close();
private:
	int Accept();
	int ServerMain();
private:
	CTaskActuator m_Task;
	ClientMap m_mapClients;
	SOCKET m_socket;
	HANDLE m_iocp;
	std::string m_ip;
	short m_port;
	PAsynServer m_self;
	PBLayer m_business;

};

具体实现略

举报

相关推荐

0 条评论