0
点赞
收藏
分享

微信扫一扫

epoll reactor反应堆服务器demo学习

梦幻之云 2022-02-09 阅读 36
//epoller.cc
#include "epoller.hh"

#include <arpa/inet.h>
#include <assert.h>

#include <cstring>
#include <iostream>

Epoller::Epoller() : activeEvents_(10) {
  efd_ = ::epoll_create(10);
  assert(efd_ >= 0);
}

void Epoller::addEvent(int fd, int operartion, Object* callback) {
  assert(callback != nullptr);

  struct epoll_event ev;
  ::memset(&ev, 0, sizeof(struct epoll_event));
  ev.events = operartion;
  ev.data.fd = fd;
  ev.data.ptr = callback;

  assert(::epoll_ctl(efd_, EPOLL_CTL_ADD, fd, &ev) >= 0);
  std::cout << "add clt:" << fd << " to event_loop." << std::endl;
}

void Epoller::removeEvent(int fd, Object* callback) {
  // 从红黑树上摘下
  struct epoll_event event;
  ::memset(&event, 0, sizeof event);

  assert(::epoll_ctl(efd_, EPOLL_CTL_DEL, fd, &event) >= 0);
  std::cout << "delete clt:" << fd << " from event_loop." << std::endl;
}

void Epoller::loop() {
  while (true) {
    int ready = ::epoll_wait(efd_, &*activeEvents_.begin(),
                             activeEvents_.size(), kTimeoutMs_);

    if (1 <= ready) {
      for (size_t i = 0; i != ready; ++i) {
        std::cout << ready << std::endl;
        // FIXME: 不能使用wait返回的epoll_event中的fd,是一个随机值
        if (nullptr != activeEvents_[i].data.ptr) {
          auto cb = static_cast<Object*>(activeEvents_[i].data.ptr);
          (*cb)(activeEvents_[i].events);
        }
      }

      if (ready == activeEvents_.size()) {
        activeEvents_.resize(activeEvents_.size() * 2);
      }
    }
  }
}
// epoller.hh
#ifndef __EPOLLER__
#define __EPOLLER__

#include <sys/epoll.h>
#include <sys/types.h>
#include <unistd.h>

#include <map>
#include <vector>

#include "copyability.hh"
#include "i_object.hh"

class Epoller : noncopyable {
 public:
  Epoller();
  void loop();

  void addEvent(int fd, int operartion, Object* callback);
  void removeEvent(int fd, Object* callback);

 private:
  int efd_;
  std::vector<struct epoll_event> activeEvents_;
  const int kTimeoutMs_ = 1000;
};

#endif
//i_object.hh
#ifndef __INTERFACE_OBJECT__
#define __INTERFACE_OBJECT__

class Object {
 public:
  virtual ~Object() = default;
  virtual void operator()(int events) = 0;
};

#endif
//connector.hh
#ifndef __CONNECTOR__
#define __CONNECTOR__

#include <arpa/inet.h>  //sockaddr_in

#include <memory>
#include <string>

#include "copyability.hh"
#include "epoller.hh"
#include "i_object.hh"
#include "server.hh"

class Server;

class Connector : noncopyable, public Object {
 public:
  explicit Connector(int fd, Epoller* poller, Server* server);
  ~Connector() override;
  void operator()(int events) override;

  int cltFd() const;
  void set_cltFd(int fd);

  struct Message {
    unsigned char data[30];
  };

 private:
  int cltFd_;
  std::unique_ptr<unsigned char[]> buffer_;
  const uint8_t bufferSz_ = 100;
  Epoller* poller_;
  Server* server_;
};

#endif
//connector.cc
#include "connector.hh"

#include <assert.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>

#include <algorithm>
#include <cstring>
#include <iostream>

Connector::Connector(int fd, Epoller* poller, Server* server)
    : cltFd_(fd),
      buffer_{new unsigned char[bufferSz_]{}},
      poller_(poller),
      server_(server) {
  assert(poller != nullptr);
  assert(server_ != nullptr);

  int opt = ::fcntl(cltFd_, F_GETFL, 0);
  ::fcntl(cltFd_, F_SETFL, opt | O_NONBLOCK);
}

Connector::~Connector() {}

int Connector::cltFd() const { return cltFd_; }

void Connector::set_cltFd(int fd) { cltFd_ = fd; }

void Connector::operator()(int events) {
  if (events & (EPOLLIN | EPOLLPRI)) {
    ::memset(buffer_.get(), 0, bufferSz_);

    while (true) {
      int ret = ::read(cltFd_, buffer_.get(), sizeof(Message));

      if (ret < 0) {
        if (errno == EWOULDBLOCK or errno == EINTR) {
          std::cout << "read again plz.\n";
          continue;
        }
      }

      if (0 == ret) {
        assert(poller_ != nullptr);
        assert(server_ != nullptr);

        poller_->removeEvent(cltFd_, this);
        // connector不允许通知server来delete自己
        // server_->removeClt(cltFd_);

        // ::close(cltFd_);
        break;
      }

      static int loop = 0;
      std::cout << "loop" << ++loop << " recv msg:["
                << std::string((char*)buffer_.get()) << "] from:" << cltFd_
                << std::endl;
      break;
    }
  }
}
//copyablility.hh
#ifndef __COPYABILITY__
#define __COPYABILITY__

class copyable {
 protected:
  copyable() = default;
  ~copyable() = default;
};

class noncopyable {
 public:
  noncopyable(const noncopyable&) = delete;
  void operator=(const noncopyable&) = delete;

 protected:
  noncopyable() = default;
  ~noncopyable() = default;
};

#endif
//server.hh
#ifndef __SERVER__
#define __SERVER__

#include <arpa/inet.h>  //sockaddr_in

#include <cstring>
#include <iostream>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <vector>

#include "connector.hh"
#include "copyability.hh"
#include "epoller.hh"
#include "i_object.hh"

class Connector;

class Server : noncopyable, public Object {
 public:
  explicit Server(uint16_t listenPort, const char* ip, Epoller* poller);
  ~Server() override;
  void start();

  void operator()(int events) override;

  void accept();
  int listenfd() const;
  Epoller* poller() { return poller_; }
  void addClt(Connector* clt);
  void addClt(int cltfd) = delete;
  void removeClt(int cltfd);

 private:
  int listenfd_;
  Epoller* poller_;
  sockaddr_in sock_;
  using ClientList = std::vector<Connector*>;
  ClientList cltList_;
};

#endif
// server.cc
#include "server.hh"

#include <assert.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>

#include <algorithm>

Server::Server(uint16_t listenPort, const char* ip, Epoller* poller)
    : listenfd_{}, poller_(poller), sock_{}, cltList_{} {
  assert(ip != nullptr);
  assert(poller_ != nullptr);

  sock_.sin_family = AF_INET;
  sock_.sin_port = htons(listenPort);
  sock_.sin_addr.s_addr = ::inet_addr(ip);

  listenfd_ = ::socket(AF_INET, SOCK_STREAM, 0);
  assert(listenfd_ != -1);

  int optval = 1;
  ::setsockopt(listenfd_, SOL_SOCKET, SO_REUSEADDR, &optval,
               static_cast<socklen_t>(sizeof optval));

  int opt = ::fcntl(listenfd_, F_GETFL, 0);
  ::fcntl(listenfd_, F_SETFL, opt | O_NONBLOCK);

  assert(::bind(listenfd_, reinterpret_cast<sockaddr*>(std::addressof(sock_)),
                sizeof(sockaddr_in)) >= 0);

  assert(::listen(listenfd_, listenPort) >= 0);
}

Server::~Server() {
  if (!cltList_.empty()) {
    for (auto& clt : cltList_) {
      if (clt->cltFd() != -1) {
        ::close(clt->cltFd());
      }
    }
  }

  ::close(listenfd_);
}

void Server::start() { poller_->addEvent(listenfd_, EPOLLIN | EPOLLPRI, this); }

void Server::accept() {
  struct sockaddr_in clientInfo {};
  int size = sizeof clientInfo;

  // 从accept队列中pop一个建立好连接的fd出来
  std::cout << "111" << std::endl;
  int cltFd = ::accept(listenfd_, (sockaddr*)&clientInfo, (socklen_t*)&size);
  assert(-1 != cltFd);

  std::cout << "222" << std::endl;
  Connector* newConnect = new Connector(cltFd, poller_, this);
  addClt(newConnector);

  std::cout << "333" << std::endl;
  poller_->addEvent(cltFd, EPOLLIN | EPOLLPRI, newConnect);
}

int Server::listenfd() const { return listenfd_; }

void Server::addClt(Connector* clt) { cltList_.emplace_back(clt); }

void Server::removeClt(int cltfd) {
  for (auto it = cltList_.begin(); it != cltList_.end(); ++it) {
    ::close(cltfd);

    if (cltfd == (*it)->cltFd()) {
      auto clt = *it;
      delete clt;

      cltList_.erase(it);
      break;
    }
  }
}

void Server::operator()(int events) {
  if (events & (EPOLLIN | EPOLLPRI)) {
    std::cout << "register client\n";
    accept();
  }
}
cmake_minimum_required(VERSION 3.5)

project(SERVER)

aux_source_directory(${PROJECT_SOURCE_DIR} DIR_MAIN_SRCS)
# aux_source_directory(${PROJECT_SOURCE_DIR}/utils DIR_OTHER_SRCS)

# -I
include_directories(${PROJECT_SOURCE_DIR}/utils)

# parameterss
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_CXX_FLAG ${CMAKE_CXX_FLAG} "-g -Wall")

# macro

# -L

# -o
add_executable(reactor_server 
    ${DIR_MAIN_SRCS} 
)

# -l
set(LIB_NAME 
    pthread)
target_link_libraries(reactor_server ${LIB_NAME})

#include "server.hh"

constexpr uint16_t g_port = 6666;
constexpr char* g_ip = "127.0.0.1";

auto main() -> int {
  Epoller eventloop{};
  Server server{g_port, g_ip, std::addressof(eventloop)};
  server.start();
  eventloop.loop();
  return 0;
}
举报

相关推荐

0 条评论