0
点赞
收藏
分享

微信扫一扫

【muduo/base】线程

若如初梘 2022-01-16 阅读 46
c++

#前言
muduo库也封装了线程Thread类,线程池ThreadPool。以及线程辅助类ThreadData,在这一部分我们可以学到如何创建线程,如何回收线程,加深对进程和线程的理解,以及代码规范方面的提升。

Thread类

头文件代码(去除了和命名相关的成员变量和函数)

class Thread : noncopyable
{
 public:
  typedef std::function<void ()> ThreadFunc;

  explicit Thread(ThreadFunc);
  ~Thread();
  void start();
  int join();
  bool started() const { return started_; }
  pid_t tid() const { return tid_; }
  const string& name() const { return name_; }
  static int numCreated() { return numCreated_.get(); }
  
 private:
 
  bool       started_;
  bool       joined_;
  pthread_t  pthreadId_;
  pid_t      tid_;
  ThreadFunc func_;
  CountDownLatch latch_;

};

这里我们归纳一下成员变量作用

到这里我们可能会疑惑:什么是tid?tid和threadid的区别是什么?

tid和threadid区别

在这里我们可以顺便复习一下操作系统相关知识
linux进程创建详细过程
进程和线程以及线程组
pid,tid,tgId区别

我们知道linux创建进程是通过(fork)分裂+替换(execv)生成新的进程。但是使用fork时,不知道你有没有考虑过下面这种情况:

这时我们是否会重新思考一下进程到底是什么?或许进程和线程不是一个包含关系?

这里做一下总结

typedef 和访问控制的关系

源文件代码

核心代码(删减非必要逻辑)

	struct ThreadData
    {
      typedef muduo::Thread::ThreadFunc ThreadFunc;
      ThreadFunc func_;
      string name_;
      pid_t *tid_;
      CountDownLatch *latch_;

      ThreadData(ThreadFunc func,
                 const string &name,
                 pid_t *tid,
                 CountDownLatch *latch)
          : func_(std::move(func)),
            name_(name),
            tid_(tid),
            latch_(latch)
      {
      }

      void runInThread()
      {
        *tid_ = muduo::CurrentThread::tid();
        tid_ = NULL;
        latch_->countDown();
        latch_ = NULL;
        
        try
        {
          func_();
        }
        catch (...)
        {
          throw; // rethrow
        }
      }
    };
    void *startThread(void *obj)
    {
      ThreadData *data = static_cast<ThreadData *>(obj); //向下转型
      data->runInThread();
      delete data;
      return NULL;
    }
  Thread::Thread(ThreadFunc func)
      : started_(false),
        joined_(false),
        func_(std::move(func)),
        latch_(1) 
  {

  }

  Thread::~Thread()
  {
    if (started_ && !joined_)
    {
      pthread_detach(pthreadId_);
    }
  }


  void Thread::start()
  {
    assert(!started_);
    started_ = true;
    // FIXME: move(func_)
    detail::ThreadData *data = new detail::ThreadData(func_, name_, &tid_, &latch_); // data这里是用来做传出参数的,相当于一个代理,更新thread的tid以及,取消start的堵塞
    if (pthread_create(&pthreadId_, NULL, &detail::startThread, data))               //第三个参数为startThread()线程任务,传入参数void*
    {
      //失败
      started_ = false;
      delete data; // or no delete?
      LOG_SYSFATAL << "Failed in pthread_create";
    }
    else
    {
      // 成功
      latch_.wait();
      assert(tid_ > 0); //线程运行后,这里阻塞解除
    }
  }

  int Thread::join()
  {
    assert(started_);
    assert(!joined_);
    joined_ = true;
    return pthread_join(pthreadId_, NULL);
  }

ThreadData类

std::move

代码规范

Thread类

一个执行的过程

如果忘记调用了Join怎么办?

  Thread::~Thread()
  {
    if (started_ && !joined_)
    {
      pthread_detach(pthreadId_);
    }
  }

分析

ThreadPool类

设计思路(简单的生产者消费者模型),共享资源:任务队列,使用run向任务队列中添加任务,线程从任务队列中取任务,执行。

class ThreadPool : noncopyable
{
 public:
  typedef std::function<void ()> Task;

  explicit ThreadPool();
  ~ThreadPool();

  // Must be called before start().
  void setMaxQueueSize(int maxSize) { maxQueueSize_ = maxSize; }
  void setThreadInitCallback(const Task& cb)
  { threadInitCallback_ = cb; }

  void start(int numThreads);
  void stop();

  size_t queueSize() const;
  
  void run(Task f);

 private:
  bool isFull() const REQUIRES(mutex_);
  void runInThread();
  Task take();

  mutable MutexLock mutex_;
  Condition notEmpty_ GUARDED_BY(mutex_);
  Condition notFull_ GUARDED_BY(mutex_);
  Task threadInitCallback_;
  std::vector<std::unique_ptr<muduo::Thread>> threads_;
  std::deque<Task> queue_ GUARDED_BY(mutex_);
  size_t maxQueueSize_;
  bool running_;
};

简单分析下类

源文件

start

void ThreadPool::start(int numThreads)
{
  assert(threads_.empty());
  running_ = true;
  threads_.reserve(numThreads);
  for (int i = 0; i < numThreads; ++i)
  {
    threads_.emplace_back(new muduo::Thread(
        std::bind(&ThreadPool::runInThread, this), name_ + id)); //线程运行的时候回调runInThread
    threads_[i]->start();
  }
  //线程池没有额外设置线程
  if (numThreads == 0 && threadInitCallback_)
  {
    threadInitCallback_();
  }
}
分析

代码规范

线程消费者任务

// 作为每个线程的任务
void ThreadPool::runInThread()
{
  try
  {
    if (threadInitCallback_)
    {
      threadInitCallback_();//执行线程运行起来的回调函数
    } 
    while (running_)
    {
      Task task(take());
      if (task)
      {
        task();
      }
    }
  }
  catch (...)
  {

  }
}

// 从任务队列获取一个任务
// 如果任务队列为空take将会堵塞,等待直到到任务队列不为空
ThreadPool::Task ThreadPool::take()
{
  MutexLockGuard lock(mutex_);
  // always use a while-loop, due to spurious wakeup
  while (queue_.empty() && running_)
  {
    notEmpty_.wait();//如果任务队列空,挂起该线程
  }
  Task task;
  if (!queue_.empty())
  {
    task = queue_.front();
    queue_.pop_front();
    if (maxQueueSize_ > 0)
    {
      notFull_.notify();
    }
  }
  return task;
}

生产者

//添加任务的,如果queue_满了,则堵塞等到queue被消费后继续添加
void ThreadPool::run(Task task)
{
  if (threads_.empty())
  {
    task();
  }
  else
  {
    MutexLockGuard lock(mutex_);
    while (isFull() && running_)
    {
      notFull_.wait();
    }
    if (!running_)
      return;
    assert(!isFull());

    queue_.push_back(std::move(task));
    notEmpty_.notify();
  }
}
  while (queue_.empty() && running_)
  {
    notEmpty_.wait();//如果任务队列空,挂起该线程
  }

分析下上面这块代码,while的目的是将该函数变为阻塞式函数,notEmpty_.wait(),挂起该线程,是为了避免对cpu的消耗。当线程([生产者线程)调用notEmpty_.notify(),唤醒调用notEmpty_.wait()堵塞的线程(消费者线程),消费者线程执行while,这时候任务队列不为空了,解除堵塞,执行下面逻辑。

举报

相关推荐

0 条评论