一:前言
为了应对找工作的需求,不得不在简历上写出一个亮眼的C/C++项目,于是我准备开始将我学习图床共享云存储的学习笔记分享出来,可以让大家共同学习,而且还可以防止我忘记一些细节。
因为我们C/C++主要注重的并不是前端,因此前端的代码我是不会讲解的,只会讲解后端代码,其中包括线程池,mysql连接池,reactor,nginx,fastdfs,spdlog,redis等等技术栈,以及一些相关的业务代码,我会尽量展现出来。我会分好几期来讲,大家可以指出我写的不对的地方。但是由于代码非常多,我不会列的很细,我会把大概,核心的给拿出来。
二:Base_socket
在这个函数中,我们将关于socket的所有信息全部封装到一个 CBaseSocket 的类中,这样封装会使我们拿去任意信息都会非常方便。
#ifndef __SOCKET_H__
#define __SOCKET_H__
#include "ostype.h"
#include "util.h"
enum { //这个是表示我们这个socket的状态
SOCKET_STATE_IDLE,
SOCKET_STATE_LISTENING,
SOCKET_STATE_CONNECTING,
SOCKET_STATE_CONNECTED,
SOCKET_STATE_CLOSING
};
class CBaseSocket : public CRefObject {
public:
CBaseSocket(); //构造函数
virtual ~CBaseSocket(); //析构函数
//下面这一些全都是顾名思义,要什么给什么
SOCKET GetSocket() { return socket_; }
void SetSocket(SOCKET fd) { socket_ = fd; }
void SetState(uint8_t state) { state_ = state; }
void SetCallback(callback_t callback) { callback_ = callback; }
void SetCallbackData(void *data) { callback_data_ = data; }
void SetRemoteIP(char *ip) { remote_ip_ = ip; }
void SetRemotePort(uint16_t port) { remote_port_ = port; }
void SetSendBufSize(uint32_t send_size);
void SetRecvBufSize(uint32_t recv_size);
const char *GetRemoteIP() { return remote_ip_.c_str(); }
uint16_t GetRemotePort() { return remote_port_; }
const char *GetLocalIP() { return local_ip_.c_str(); }
uint16_t GetLocalPort() { return local_port_; }
public://在这里的就是服务端和客户端具体的代码了:监听和连接,发送和接收
int Listen(const char *server_ip, uint16_t port, callback_t callback,void *callback_data);
net_handle_t Connect(const char *server_ip, uint16_t port,callback_t callback, void *callback_data);
int Send(void *buf, int len);
int Recv(void *buf, int len);
int Close();
public: //下面是一些触发的事件,读写状态和断开连接的状态。
void OnRead();
void OnWrite();
void OnClose();
private: // 私有函数以_ 开头//这里还是设置一些socket所需要的函数
int _GetErrorCode();
bool _IsBlock(int error_code); //查看是否为阻塞
void _SetNonblock(SOCKET fd); //设置成非阻塞
void _SetReuseAddr(SOCKET fd);
void _SetNoDelay(SOCKET fd); //关闭 TCP 连接的 Nagle 算法,减少网络延迟
void _SetAddr(const char *ip, const uint16_t port, sockaddr_in *addr);
void _AcceptNewSocket(); //用于服务端进行接收客户端链接上来的函数。
private: //下面也是顾名思义,地址端口等,其中重要的就是回调函数了
string remote_ip_;
uint16_t remote_port_;
string local_ip_;
uint16_t local_port_;
callback_t callback_;
void *callback_data_;
uint8_t state_;
SOCKET socket_;
};
CBaseSocket *FindBaseSocket(net_handle_t fd); //这里还创建了个函数,通过指定的fd就可以找到这个fd所对应的全部信息。
//typedef void (*callback_t)(void *callback_data, uint8_t msg, uint32_t handle, void *pParam);
#endif
//创建哈希表,存放所有的fd net_handle_t :int 下面这三个函数并不在这个类中,但是也是很重要的
typedef hash_map<net_handle_t, CBaseSocket *> SocketMap;
SocketMap g_socket_map;
void AddBaseSocket(CBaseSocket *pSocket) {//将创建好的socket添加到哈希表中。
g_socket_map.insert(make_pair((net_handle_t)pSocket->GetSocket(), pSocket)); //使用这个socket类中的函数
}
void RemoveBaseSocket(CBaseSocket *pSocket) { //删除socket
g_socket_map.erase((net_handle_t)pSocket->GetSocket());
}
CBaseSocket *FindBaseSocket(net_handle_t fd) { //找到fd的socket的全部信息
CBaseSocket *pSocket = NULL;
SocketMap::iterator iter = g_socket_map.find(fd);
if (iter != g_socket_map.end()) {
pSocket = iter->second; //拿到这个socket的全部信息
pSocket->AddRef(); //进行加锁操作,加一操作。 //这里的加锁操作不用管白。
}
return pSocket;
}
//下面就是最基础的网络相关的一些代码
int CBaseSocket::Listen(const char *server_ip, uint16_t port,
callback_t callback, void *callback_data) { //服务器用于监听的函数
local_ip_ = server_ip;
local_port_ = port;
callback_ = callback;
callback_data_ = callback_data;
socket_ = socket(AF_INET, SOCK_STREAM, 0);
if (socket_ == INVALID_SOCKET) {
printf("socket failed, err_code=%d, server_ip=%s, port=%u",
_GetErrorCode(), server_ip, port);
return NETLIB_ERROR;
}
_SetReuseAddr(socket_); //设置重用地址
_SetNonblock(socket_); //设置非阻塞
sockaddr_in serv_addr;
_SetAddr(server_ip, port, &serv_addr); //设置地址
int ret = ::bind(socket_, (sockaddr *)&serv_addr, sizeof(serv_addr)); //绑定
if (ret == SOCKET_ERROR) { //绑定失败直接返回
printf("bind failed, err_code=%d, server_ip=%s, port=%u",
_GetErrorCode(), server_ip, port);
closesocket(socket_);
return NETLIB_ERROR;
}
//开始监听地址,监听64个连接。
ret = listen(socket_, 64);
if (ret == SOCKET_ERROR) {
printf("listen failed, err_code=%d, server_ip=%s, port=%u",
_GetErrorCode(), server_ip, port);
closesocket(socket_);
return NETLIB_ERROR;
}
state_ = SOCKET_STATE_LISTENING; //设置监听状态
printf("CBaseSocket::Listen on %s:%d", server_ip, port);
AddBaseSocket(this); //将服务端的自身监听socket也放入到map中。
CEventDispatch::Instance()->AddEvent(socket_, SOCKET_READ | SOCKET_EXCEP); //此处是进行分发事件,这个等会会讲到,不用急。
return NETLIB_OK;
}
//下面也就是accep的一些代码,写过网络的应该都清楚
void CBaseSocket::_AcceptNewSocket() { //用于接收客户端的消息
SOCKET fd = 0;
sockaddr_in peer_addr;
socklen_t addr_len = sizeof(sockaddr_in);
char ip_str[64];
while ((fd = accept(socket_, (sockaddr *)&peer_addr, &addr_len)) !=
INVALID_SOCKET) { //循环等待客户端的连接。
CBaseSocket *pSocket = new CBaseSocket(); //每当连接进来一个客户端就要创建一个
uint32_t ip = ntohl(peer_addr.sin_addr.s_addr);
uint16_t port = ntohs(peer_addr.sin_port);
snprintf(ip_str, sizeof(ip_str), "%d.%d.%d.%d", ip >> 24,
(ip >> 16) & 0xFF, (ip >> 8) & 0xFF, ip & 0xFF);
// printf("AcceptNewSocket, socket=%d from %s:%d\n", fd, ip_str, port);
//设置套接字的属性,包括套接字的回调函数、回调函数的参数、套接字的状态、远程IP和端口等。
pSocket->SetSocket(fd);
pSocket->SetCallback(callback_);
pSocket->SetCallbackData(callback_data_);
pSocket->SetState(SOCKET_STATE_CONNECTED);
pSocket->SetRemoteIP(ip_str);
pSocket->SetRemotePort(port);
_SetNoDelay(fd);
_SetNonblock(fd);
AddBaseSocket(pSocket);
CEventDispatch::Instance()->AddEvent(fd, SOCKET_READ | SOCKET_EXCEP); //和上面的一样
callback_(callback_data_, NETLIB_MSG_CONNECT, (net_handle_t)fd, NULL);
}
}
//上面两个都是服务端使用的,这个是客户端使用的。
net_handle_t CBaseSocket::Connect(const char *server_ip, uint16_t port,
callback_t callback, void *callback_data) { //客户端用于连接的函数
printf("CBaseSocket::Connect, server_ip=%s, port=%d", server_ip, port);
remote_ip_ = server_ip;
remote_port_ = port;
callback_ = callback;
callback_data_ = callback_data;
socket_ = socket(AF_INET, SOCK_STREAM, 0);
if (socket_ == INVALID_SOCKET) {
printf("socket failed, err_code=%d, server_ip=%s, port=%u",
_GetErrorCode(), server_ip, port);
return NETLIB_INVALID_HANDLE;
}
_SetNonblock(socket_);
_SetNoDelay(socket_);
sockaddr_in serv_addr;
_SetAddr(server_ip, port, &serv_addr);
int ret = connect(socket_, (sockaddr *)&serv_addr, sizeof(serv_addr));
if ((ret == SOCKET_ERROR) && (!_IsBlock(_GetErrorCode()))) {
printf("connect failed, err_code=%d, server_ip=%s, port=%u",
_GetErrorCode(), server_ip, port);
closesocket(socket_);
return NETLIB_INVALID_HANDLE;
}
state_ = SOCKET_STATE_CONNECTING; //状态为连接中
AddBaseSocket(this);
CEventDispatch::Instance()->AddEvent(socket_, SOCKET_ALL);
return (net_handle_t)socket_;
}
void CBaseSocket::OnRead() { //只读状态,因为客户端刚连接上来的时候,我们不知道是否已经连接上了,当客户端发消息 \
我们服务端收到消息之后,连接的客户端触发服务端的读事件,因此才分配一个新的socket,分配完知之后分配写事件
if (state_ == SOCKET_STATE_LISTENING) {
_AcceptNewSocket();
} else {
u_long avail = 0;
int ret = ioctlsocket(socket_, FIONREAD, &avail);
if ((SOCKET_ERROR == ret) || (avail == 0)) {
callback_(callback_data_, NETLIB_MSG_CLOSE, (net_handle_t)socket_,
NULL);
} else {
callback_(callback_data_, NETLIB_MSG_READ, (net_handle_t)socket_,
NULL);
}
}
}
对于其他的函数:recv,send等等都不太重要,并且比较简单,在这里就不说了。
三:事件循环:event_loop
我们在这个类中封装了event_loop的一些函数,以及添加了定时器任务函数。
//下面是关于epoll的一些代码了
#include "ostype.h"
#include "util.h"
#include "lock.h"
enum {
SOCKET_READ = 0x1,
SOCKET_WRITE = 0x2,
SOCKET_EXCEP = 0x4,
SOCKET_ALL = 0x7
};
//用于分发事件的类
class CEventDispatch {
public:
virtual ~CEventDispatch();
void AddEvent(SOCKET fd, uint8_t socket_event);
void RemoveEvent(SOCKET fd, uint8_t socket_event);
void AddTimer(callback_t callback, void *user_data, uint64_t interval);
void RemoveTimer(callback_t callback, void *user_data);
void AddLoop(callback_t callback, void *user_data);
void StartDispatch(uint32_t wait_timeout = 100);
void StopDispatch();
bool IsRunning() { return running_; }
static CEventDispatch *Instance(); //使用单例设计模式,保证只有一个实例,返回我们创建的实例,每次都通过这个来调用实例。
protected:
CEventDispatch();
private:
void _CheckTimer();
void _CheckLoop();
typedef struct {
callback_t callback; // 回调函数
void *user_data;
uint64_t interval; //定时器触发的间隔,也就是我多少秒之后触发
uint64_t next_tick;
} TimerItem;
private:
int epfd_;
CLock lock_;
list<TimerItem *> timer_list_; // 定时器
list<TimerItem *> loop_list_; // 自定义loop,为什么这里还是使用定时器的结构体呢,因为里面含有回调函数,我们直接复用。
static CEventDispatch *event_dispatch_;
bool running_;
};
CEventDispatch *CEventDispatch::Instance() { //使用单例模式,只有一个实例。
if (event_dispatch_ == NULL) {
event_dispatch_ = new CEventDispatch();
}
return event_dispatch_;
}
void CEventDispatch::AddEvent(SOCKET fd, uint8_t socket_event) {
struct epoll_event ev;
ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLPRI | EPOLLERR | EPOLLHUP; //将所有的事件全部添加进去。ET模式
ev.data.fd = fd;
if (epoll_ctl(epfd_, EPOLL_CTL_ADD, fd, &ev) != 0) { //将事件添加到epoll中。
printf("epoll_ctl() failed, errno=%d", errno);
}
}
void CEventDispatch::AddTimer(callback_t callback, void *user_data,
uint64_t interval) { //添加定时器
list<TimerItem *>::iterator it;
for (it = timer_list_.begin(); it != timer_list_.end(); it++) {
TimerItem *pItem = *it;
if (pItem->callback == callback && pItem->user_data == user_data) { //如果还是同样的回调函数的话,我们就更新一下定时器的时间间隔
pItem->interval = interval;
pItem->next_tick = GetTickCount() + interval; //比如心跳包文
return;
}
}
//如果找不到同样的回调函数,那就重新添加。
TimerItem *pItem = new TimerItem;
pItem->callback = callback;
pItem->user_data = user_data;
pItem->interval = interval;
pItem->next_tick = GetTickCount() + interval;
timer_list_.push_back(pItem);
}
void CEventDispatch::AddLoop(callback_t callback, void *user_data) { //添加到事件循环
TimerItem *pItem = new TimerItem;
pItem->callback = callback;
pItem->user_data = user_data;
loop_list_.push_back(pItem);
}
void CEventDispatch::_CheckTimer() { //检查定时器
uint64_t curr_tick = GetTickCount();
list<TimerItem *>::iterator it;
for (it = timer_list_.begin(); it != timer_list_.end();) {
TimerItem *pItem = *it;
it++; // iterator maybe deleted in the callback, so we should increment
// it before callback
if (curr_tick >= pItem->next_tick) { //当前时间大于这个任务的触发时间了,那么就更新并执行任务。
pItem->next_tick += pItem->interval;
pItem->callback(pItem->user_data, NETLIB_MSG_TIMER, 0, NULL);
}
}
}
void CEventDispatch::_CheckLoop() { //执行全部返回的事件。
for (list<TimerItem *>::iterator it = loop_list_.begin();
it != loop_list_.end(); it++) {
TimerItem *pItem = *it;
pItem->callback(pItem->user_data, NETLIB_MSG_LOOP, 0, NULL);
}
}
//分发事件
void CEventDispatch::StartDispatch(uint32_t wait_timeout) { //超时时间默认为100.
struct epoll_event events[1024];
int nfds = 0;
if (running_)
return;
running_ = true;
while (running_) {
nfds = epoll_wait(epfd_, events, 1024, wait_timeout);
for (int i = 0; i < nfds; i++) { //触发事件,开始查找
int ev_fd = events[i].data.fd;
CBaseSocket *pSocket = FindBaseSocket(ev_fd); //我们返回这个事件的全部信息。
if (!pSocket)
continue;
// Commit by zhfu @2015-02-28
#ifdef EPOLLRDHUP
if (events[i].events & EPOLLRDHUP) {
// printf("On Peer Close, socket=%d, ev_fd);
pSocket->OnClose();
}
#endif
// Commit End
if (events[i].events & EPOLLIN) {
// printf("OnRead, socket=%d\n", ev_fd);
pSocket->OnRead(); //用于连接客户端:accept
}
if (events[i].events & EPOLLOUT) {
// printf("OnWrite, socket=%d\n", ev_fd);
pSocket->OnWrite(); //执行写的回调函数。
}
if (events[i].events & (EPOLLPRI | EPOLLERR | EPOLLHUP)) {
// printf("OnClose, socket=%d\n", ev_fd);
pSocket->OnClose(); //执行关闭的回调函数。
}
pSocket->ReleaseRef();
}
_CheckTimer();
_CheckLoop();
}
}
四:将上面两个类再次进行封装:net_lib
这个类中的代码基本没有添加什么,就是对上面两个类的再一次封装,让我们调用更加的方便。但是在这里面我们服务端有listen和accept两个函数啊,在这里只进行了listen没有使用accept啊?首先我们这个网络模块的封装是按着Reactor的模式进行封装的,他是事件驱动的,我们前面是将socket设置成了非阻塞,因此他一调用直接返回,直接添加到epoll中,并不会被阻塞住。当客户端发送来具体的消息之后,我们epoll会返回只读事件,这个时候我们才知道客户端已经成功连接上了,然后根据返回的只读事件再去进行处理操作。
那么这个时候应该有个疑惑:客户端发送来消息,我们只根据这个消息判断到了客户端连接上来了,并且通过accept给客户端分配了一个socket,那么数据我们怎么读取呢?当我们知道客户端连接上来,那么肯定是带着数据来的,我们直接将他的事件改为写事件,然后跳到写事件里面去读取就好了。
//此个封装就是在socket和eventloop中再封装成同一层。在这一层中可以直接调用他俩的函数。
#ifdef __cplusplus
extern "C"
{
#endif
int netlib_init();
int netlib_destroy();
int netlib_listen(const char *server_ip,uint16_t port,callback_t callback,void *callback_data);
net_handle_t netlib_connect(const char *server_ip,uint16_t port,callback_t callback,void *callback_data);
int netlib_send(net_handle_t handle, void *buf, int len);
int netlib_recv(net_handle_t handle, void *buf, int len);
int netlib_close(net_handle_t handle);
int netlib_option(net_handle_t handle, int opt, void *optval);
int netlib_register_timer(callback_t callback, void *user_data, uint64_t interval);
int netlib_delete_timer(callback_t callback, void *user_data);
int netlib_add_loop(callback_t callback, void *user_data);
void netlib_eventloop(uint32_t wait_timeout = 100);
void netlib_stop_event();
bool netlib_is_running();
#ifdef __cplusplus
}
int netlib_listen(const char *server_ip, uint16_t port, callback_t callback,
void *callback_data) { //在我们CBaseSocket的上面封装了一层,将服务端和客户端分开,可以直接调用。
CBaseSocket *pSocket = new CBaseSocket();
if (!pSocket)
return NETLIB_ERROR;
int ret = pSocket->Listen(server_ip, port, callback, callback_data);
if (ret == NETLIB_ERROR)
delete pSocket;
return ret;
}
net_handle_t netlib_connect(const char *server_ip, uint16_t port,
callback_t callback, void *callback_data) { //给客户端使用的。直接封装好。
CBaseSocket *pSocket = new CBaseSocket();
if (!pSocket)
return NETLIB_INVALID_HANDLE;
net_handle_t handle =
pSocket->Connect(server_ip, port, callback, callback_data);
if (handle == NETLIB_INVALID_HANDLE)
delete pSocket;
return handle;
}
int netlib_send(net_handle_t handle, void *buf, int len) {
CBaseSocket *pSocket = FindBaseSocket(handle); //找到要发送的socket
if (!pSocket) {
return NETLIB_ERROR;
}
int ret = pSocket->Send(buf, len); //直接发送。
pSocket->ReleaseRef();
return ret;
}
int netlib_recv(net_handle_t handle, void *buf, int len) {
CBaseSocket *pSocket = FindBaseSocket(handle); //和发送是一个性质,先找到,然后直接发送。
if (!pSocket)
return NETLIB_ERROR;
int ret = pSocket->Recv(buf, len);
pSocket->ReleaseRef();
return ret;
}
int netlib_close(net_handle_t handle) {
CBaseSocket *pSocket = FindBaseSocket(handle); //关闭某个具体的socket。
if (!pSocket)
return NETLIB_ERROR;
int ret = pSocket->Close();
pSocket->ReleaseRef();
return ret;
}
int netlib_register_timer(callback_t callback, void *user_data,
uint64_t interval) { //添加定时器。
CEventDispatch::Instance()->AddTimer(callback, user_data, interval);
return 0;
}
int netlib_delete_timer(callback_t callback, void *user_data) {
CEventDispatch::Instance()->RemoveTimer(callback, user_data); //删除定时器。
return 0;
}
int netlib_add_loop(callback_t callback, void *user_data) {
CEventDispatch::Instance()->AddLoop(callback, user_data); //将任务添加到loop中。
return 0;
}
void netlib_eventloop(uint32_t wait_timeout) { //启动事件循环。
CEventDispatch::Instance()->StartDispatch(wait_timeout);
}
void netlib_stop_event() { CEventDispatch::Instance()->StopDispatch(); } //停止事件循环。
bool netlib_is_running() { return CEventDispatch::Instance()->IsRunning(); } //判断是否在运行。
五:开源高速日志:spdlog
我之前的文章讲过spdlog,我写过的文章我也会忘记,因此我才写了博客,就是怕自己忘记。开源组件——异步日志方案 spdlog 的讲解_spdlog 异步日志-CSDN博客
在这里也大致讲一下:在最高层有个register_logger注册中心,他管理所有的logger(写日志的东西),而在logger的下层有sink,这个sink是管理输出到什么地方,以什么样的方式进行输出,以及日志级别等。我们通过先设置几个不同的sink,指定我们想要输出到的位置,然后将这几个sink添加到logger中,以后我们使用logger进行写日志的时候就可以使用我们定义的sink了。然后我们在register_logger中将我们的logger进行注册,让全局都知道这个logger,这样我们就可以随便使用了。
class DLog
{
public:
static DLog* GetInstance()
{
static DLog dlogger;
return &dlogger;
}
std::shared_ptr<spdlog::logger> getLogger()
{
return log_;
}
static void SetLevel(char *log_level); //设置日志级别,默认为info级别。
private:
DLog(/* args */) {
std::vector<spdlog::sink_ptr> sinkList; //一个log中可以存放多个输出的目标,我们用容器存放起来。
#if 1 //输出日志到控制台
auto consoleSink = std::make_shared<spdlog::sinks::stdout_color_sink_mt>(); //创建一个输出到控制台的目标。
consoleSink->set_level(level_); //设置日志级别。
consoleSink->set_pattern("[%Y-%m-%d %H:%M:%S.%e][thread %t][%@,%!][%l] : %v"); //设置输出的格式
sinkList.push_back(consoleSink);
#endif
// 输出日志到文件
auto dailySink = std::make_shared<spdlog::sinks::daily_file_sink_mt>("logs/daily.log", 23, 59);
dailySink->set_level(level_);
dailySink->set_pattern("[%Y-%m-%d %H:%M:%S.%e][thread %t][%@,%!][%l] : %v");
sinkList.push_back(dailySink);
log_ = std::make_shared<spdlog::logger>("both", begin(sinkList), end(sinkList)); //我们通过创建log,并使用sinklist。
spdlog::register_logger(log_); //将这个log进行注册。
"logs/daily.txt", 0, 1);
spdlog::flush_every(std::chrono::seconds(1)); //设置日志的刷新频率,一秒。
}
~DLog() { }
private:
std::shared_ptr<spdlog::logger> log_;
// static spdlog::level::level_enum level_ = spdlog::level::info;
static spdlog::level::level_enum level_; //日志的级别。
};
void DLog::SetLevel(char *log_level) {
printf("SetLevel log_level:%s\n", log_level);
fflush(stdout); //立即将数据输出出来。
if(strcmp(log_level, "trace") == 0) {
level_ = spdlog::level::trace;
}else if(strcmp(log_level, "debug") == 0) {
level_ = spdlog::level::debug;
}else if(strcmp(log_level, "info") == 0) {
level_ = spdlog::level::info;
}else if(strcmp(log_level, "warn") == 0) {
level_ = spdlog::level::warn;
}else if(strcmp(log_level, "err") == 0) {
level_ = spdlog::level::err;
}else if(strcmp(log_level, "critical") == 0) {
level_ = spdlog::level::critical;
}else if(strcmp(log_level, "off") == 0) {
level_ = spdlog::level::off;
} else {
printf("level: %s is invalid\n", log_level);
}
}
六:线程池
对于线程池的讲解,我之前也写过两篇文章:
Linux下c语言的线程池(分模块讲解)_linux c线程池的作用-CSDN博客
Linux下C++线程池的实现(分模块讲解)_linux c 线程池-CSDN博客
在这个项目中我就使用了之前写过的C++线程池。这两篇文章我个人认为讲解的还是很细的(狗头保命)。
七:结尾
本篇主要讲解了这个项目中一些最基本的技术栈,仔细的走一走流程还是比较容易看懂的,以后我会更新其他的技术栈,大家一起努力!https://github.com/0voice