#include "epoller.hh"
#include <arpa/inet.h>
#include <assert.h>
#include <cstring>
#include <iostream>
Epoller::Epoller() : activeEvents_(10) {
efd_ = ::epoll_create(10);
assert(efd_ >= 0);
}
void Epoller::addEvent(int fd, int operartion, Object* callback) {
assert(callback != nullptr);
struct epoll_event ev;
::memset(&ev, 0, sizeof(struct epoll_event));
ev.events = operartion;
ev.data.fd = fd;
ev.data.ptr = callback;
assert(::epoll_ctl(efd_, EPOLL_CTL_ADD, fd, &ev) >= 0);
std::cout << "add clt:" << fd << " to event_loop." << std::endl;
}
void Epoller::removeEvent(int fd, Object* callback) {
struct epoll_event event;
::memset(&event, 0, sizeof event);
assert(::epoll_ctl(efd_, EPOLL_CTL_DEL, fd, &event) >= 0);
std::cout << "delete clt:" << fd << " from event_loop." << std::endl;
}
void Epoller::loop() {
while (true) {
int ready = ::epoll_wait(efd_, &*activeEvents_.begin(),
activeEvents_.size(), kTimeoutMs_);
if (1 <= ready) {
for (size_t i = 0; i != ready; ++i) {
std::cout << ready << std::endl;
if (nullptr != activeEvents_[i].data.ptr) {
auto cb = static_cast<Object*>(activeEvents_[i].data.ptr);
(*cb)(activeEvents_[i].events);
}
}
if (ready == activeEvents_.size()) {
activeEvents_.resize(activeEvents_.size() * 2);
}
}
}
}
#ifndef __EPOLLER__
#define __EPOLLER__
#include <sys/epoll.h>
#include <sys/types.h>
#include <unistd.h>
#include <map>
#include <vector>
#include "copyability.hh"
#include "i_object.hh"
class Epoller : noncopyable {
public:
Epoller();
void loop();
void addEvent(int fd, int operartion, Object* callback);
void removeEvent(int fd, Object* callback);
private:
int efd_;
std::vector<struct epoll_event> activeEvents_;
const int kTimeoutMs_ = 1000;
};
#endif
#ifndef __INTERFACE_OBJECT__
#define __INTERFACE_OBJECT__
class Object {
public:
virtual ~Object() = default;
virtual void operator()(int events) = 0;
};
#endif
#ifndef __CONNECTOR__
#define __CONNECTOR__
#include <arpa/inet.h>
#include <memory>
#include <string>
#include "copyability.hh"
#include "epoller.hh"
#include "i_object.hh"
#include "server.hh"
class Server;
class Connector : noncopyable, public Object {
public:
explicit Connector(int fd, Epoller* poller, Server* server);
~Connector() override;
void operator()(int events) override;
int cltFd() const;
void set_cltFd(int fd);
struct Message {
unsigned char data[30];
};
private:
int cltFd_;
std::unique_ptr<unsigned char[]> buffer_;
const uint8_t bufferSz_ = 100;
Epoller* poller_;
Server* server_;
};
#endif
#include "connector.hh"
#include <assert.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <algorithm>
#include <cstring>
#include <iostream>
Connector::Connector(int fd, Epoller* poller, Server* server)
: cltFd_(fd),
buffer_{new unsigned char[bufferSz_]{}},
poller_(poller),
server_(server) {
assert(poller != nullptr);
assert(server_ != nullptr);
int opt = ::fcntl(cltFd_, F_GETFL, 0);
::fcntl(cltFd_, F_SETFL, opt | O_NONBLOCK);
}
Connector::~Connector() {}
int Connector::cltFd() const { return cltFd_; }
void Connector::set_cltFd(int fd) { cltFd_ = fd; }
void Connector::operator()(int events) {
if (events & (EPOLLIN | EPOLLPRI)) {
::memset(buffer_.get(), 0, bufferSz_);
while (true) {
int ret = ::read(cltFd_, buffer_.get(), sizeof(Message));
if (ret < 0) {
if (errno == EWOULDBLOCK or errno == EINTR) {
std::cout << "read again plz.\n";
continue;
}
}
if (0 == ret) {
assert(poller_ != nullptr);
assert(server_ != nullptr);
poller_->removeEvent(cltFd_, this);
break;
}
static int loop = 0;
std::cout << "loop" << ++loop << " recv msg:["
<< std::string((char*)buffer_.get()) << "] from:" << cltFd_
<< std::endl;
break;
}
}
}
#ifndef __COPYABILITY__
#define __COPYABILITY__
class copyable {
protected:
copyable() = default;
~copyable() = default;
};
class noncopyable {
public:
noncopyable(const noncopyable&) = delete;
void operator=(const noncopyable&) = delete;
protected:
noncopyable() = default;
~noncopyable() = default;
};
#endif
#ifndef __SERVER__
#define __SERVER__
#include <arpa/inet.h>
#include <cstring>
#include <iostream>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <vector>
#include "connector.hh"
#include "copyability.hh"
#include "epoller.hh"
#include "i_object.hh"
class Connector;
class Server : noncopyable, public Object {
public:
explicit Server(uint16_t listenPort, const char* ip, Epoller* poller);
~Server() override;
void start();
void operator()(int events) override;
void accept();
int listenfd() const;
Epoller* poller() { return poller_; }
void addClt(Connector* clt);
void addClt(int cltfd) = delete;
void removeClt(int cltfd);
private:
int listenfd_;
Epoller* poller_;
sockaddr_in sock_;
using ClientList = std::vector<Connector*>;
ClientList cltList_;
};
#endif
#include "server.hh"
#include <assert.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <algorithm>
Server::Server(uint16_t listenPort, const char* ip, Epoller* poller)
: listenfd_{}, poller_(poller), sock_{}, cltList_{} {
assert(ip != nullptr);
assert(poller_ != nullptr);
sock_.sin_family = AF_INET;
sock_.sin_port = htons(listenPort);
sock_.sin_addr.s_addr = ::inet_addr(ip);
listenfd_ = ::socket(AF_INET, SOCK_STREAM, 0);
assert(listenfd_ != -1);
int optval = 1;
::setsockopt(listenfd_, SOL_SOCKET, SO_REUSEADDR, &optval,
static_cast<socklen_t>(sizeof optval));
int opt = ::fcntl(listenfd_, F_GETFL, 0);
::fcntl(listenfd_, F_SETFL, opt | O_NONBLOCK);
assert(::bind(listenfd_, reinterpret_cast<sockaddr*>(std::addressof(sock_)),
sizeof(sockaddr_in)) >= 0);
assert(::listen(listenfd_, listenPort) >= 0);
}
Server::~Server() {
if (!cltList_.empty()) {
for (auto& clt : cltList_) {
if (clt->cltFd() != -1) {
::close(clt->cltFd());
}
}
}
::close(listenfd_);
}
void Server::start() { poller_->addEvent(listenfd_, EPOLLIN | EPOLLPRI, this); }
void Server::accept() {
struct sockaddr_in clientInfo {};
int size = sizeof clientInfo;
std::cout << "111" << std::endl;
int cltFd = ::accept(listenfd_, (sockaddr*)&clientInfo, (socklen_t*)&size);
assert(-1 != cltFd);
std::cout << "222" << std::endl;
Connector* newConnect = new Connector(cltFd, poller_, this);
addClt(newConnector);
std::cout << "333" << std::endl;
poller_->addEvent(cltFd, EPOLLIN | EPOLLPRI, newConnect);
}
int Server::listenfd() const { return listenfd_; }
void Server::addClt(Connector* clt) { cltList_.emplace_back(clt); }
void Server::removeClt(int cltfd) {
for (auto it = cltList_.begin(); it != cltList_.end(); ++it) {
::close(cltfd);
if (cltfd == (*it)->cltFd()) {
auto clt = *it;
delete clt;
cltList_.erase(it);
break;
}
}
}
void Server::operator()(int events) {
if (events & (EPOLLIN | EPOLLPRI)) {
std::cout << "register client\n";
accept();
}
}
cmake_minimum_required(VERSION 3.5)
project(SERVER)
aux_source_directory(${PROJECT_SOURCE_DIR} DIR_MAIN_SRCS)
# aux_source_directory(${PROJECT_SOURCE_DIR}/utils DIR_OTHER_SRCS)
# -I
include_directories(${PROJECT_SOURCE_DIR}/utils)
# parameterss
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_CXX_FLAG ${CMAKE_CXX_FLAG} "-g -Wall")
# macro
# -L
# -o
add_executable(reactor_server
${DIR_MAIN_SRCS}
)
# -l
set(LIB_NAME
pthread)
target_link_libraries(reactor_server ${LIB_NAME})
#include "server.hh"
constexpr uint16_t g_port = 6666;
constexpr char* g_ip = "127.0.0.1";
auto main() -> int {
Epoller eventloop{};
Server server{g_port, g_ip, std::addressof(eventloop)};
server.start();
eventloop.loop();
return 0;
}