#前言
muduo库也封装了线程Thread类,线程池ThreadPool。以及线程辅助类ThreadData,在这一部分我们可以学到如何创建线程,如何回收线程,加深对进程和线程的理解,以及代码规范方面的提升。
Thread类
头文件代码(去除了和命名相关的成员变量和函数)
class Thread : noncopyable
{
public:
typedef std::function<void ()> ThreadFunc;
explicit Thread(ThreadFunc);
~Thread();
void start();
int join();
bool started() const { return started_; }
pid_t tid() const { return tid_; }
const string& name() const { return name_; }
static int numCreated() { return numCreated_.get(); }
private:
bool started_;
bool joined_;
pthread_t pthreadId_;
pid_t tid_;
ThreadFunc func_;
CountDownLatch latch_;
};
这里我们归纳一下成员变量作用
到这里我们可能会疑惑:什么是tid?tid和threadid的区别是什么?
tid和threadid区别
在这里我们可以顺便复习一下操作系统相关知识
linux进程创建详细过程
进程和线程以及线程组
pid,tid,tgId区别
我们知道linux创建进程是通过(fork)分裂+替换(execv)生成新的进程。但是使用fork时,不知道你有没有考虑过下面这种情况:
这时我们是否会重新思考一下进程到底是什么?或许进程和线程不是一个包含关系?
这里做一下总结
typedef 和访问控制的关系
源文件代码
核心代码(删减非必要逻辑)
struct ThreadData
{
typedef muduo::Thread::ThreadFunc ThreadFunc;
ThreadFunc func_;
string name_;
pid_t *tid_;
CountDownLatch *latch_;
ThreadData(ThreadFunc func,
const string &name,
pid_t *tid,
CountDownLatch *latch)
: func_(std::move(func)),
name_(name),
tid_(tid),
latch_(latch)
{
}
void runInThread()
{
*tid_ = muduo::CurrentThread::tid();
tid_ = NULL;
latch_->countDown();
latch_ = NULL;
try
{
func_();
}
catch (...)
{
throw; // rethrow
}
}
};
void *startThread(void *obj)
{
ThreadData *data = static_cast<ThreadData *>(obj); //向下转型
data->runInThread();
delete data;
return NULL;
}
Thread::Thread(ThreadFunc func)
: started_(false),
joined_(false),
func_(std::move(func)),
latch_(1)
{
}
Thread::~Thread()
{
if (started_ && !joined_)
{
pthread_detach(pthreadId_);
}
}
void Thread::start()
{
assert(!started_);
started_ = true;
// FIXME: move(func_)
detail::ThreadData *data = new detail::ThreadData(func_, name_, &tid_, &latch_); // data这里是用来做传出参数的,相当于一个代理,更新thread的tid以及,取消start的堵塞
if (pthread_create(&pthreadId_, NULL, &detail::startThread, data)) //第三个参数为startThread()线程任务,传入参数void*
{
//失败
started_ = false;
delete data; // or no delete?
LOG_SYSFATAL << "Failed in pthread_create";
}
else
{
// 成功
latch_.wait();
assert(tid_ > 0); //线程运行后,这里阻塞解除
}
}
int Thread::join()
{
assert(started_);
assert(!joined_);
joined_ = true;
return pthread_join(pthreadId_, NULL);
}
ThreadData类
std::move
代码规范
Thread类
一个执行的过程
如果忘记调用了Join怎么办?
Thread::~Thread()
{
if (started_ && !joined_)
{
pthread_detach(pthreadId_);
}
}
分析
ThreadPool类
设计思路(简单的生产者消费者模型),共享资源:任务队列,使用run向任务队列中添加任务,线程从任务队列中取任务,执行。
class ThreadPool : noncopyable
{
public:
typedef std::function<void ()> Task;
explicit ThreadPool();
~ThreadPool();
// Must be called before start().
void setMaxQueueSize(int maxSize) { maxQueueSize_ = maxSize; }
void setThreadInitCallback(const Task& cb)
{ threadInitCallback_ = cb; }
void start(int numThreads);
void stop();
size_t queueSize() const;
void run(Task f);
private:
bool isFull() const REQUIRES(mutex_);
void runInThread();
Task take();
mutable MutexLock mutex_;
Condition notEmpty_ GUARDED_BY(mutex_);
Condition notFull_ GUARDED_BY(mutex_);
Task threadInitCallback_;
std::vector<std::unique_ptr<muduo::Thread>> threads_;
std::deque<Task> queue_ GUARDED_BY(mutex_);
size_t maxQueueSize_;
bool running_;
};
简单分析下类
源文件
start
void ThreadPool::start(int numThreads)
{
assert(threads_.empty());
running_ = true;
threads_.reserve(numThreads);
for (int i = 0; i < numThreads; ++i)
{
threads_.emplace_back(new muduo::Thread(
std::bind(&ThreadPool::runInThread, this), name_ + id)); //线程运行的时候回调runInThread
threads_[i]->start();
}
//线程池没有额外设置线程
if (numThreads == 0 && threadInitCallback_)
{
threadInitCallback_();
}
}
分析
代码规范
线程消费者任务
// 作为每个线程的任务
void ThreadPool::runInThread()
{
try
{
if (threadInitCallback_)
{
threadInitCallback_();//执行线程运行起来的回调函数
}
while (running_)
{
Task task(take());
if (task)
{
task();
}
}
}
catch (...)
{
}
}
// 从任务队列获取一个任务
// 如果任务队列为空take将会堵塞,等待直到到任务队列不为空
ThreadPool::Task ThreadPool::take()
{
MutexLockGuard lock(mutex_);
// always use a while-loop, due to spurious wakeup
while (queue_.empty() && running_)
{
notEmpty_.wait();//如果任务队列空,挂起该线程
}
Task task;
if (!queue_.empty())
{
task = queue_.front();
queue_.pop_front();
if (maxQueueSize_ > 0)
{
notFull_.notify();
}
}
return task;
}
生产者
//添加任务的,如果queue_满了,则堵塞等到queue被消费后继续添加
void ThreadPool::run(Task task)
{
if (threads_.empty())
{
task();
}
else
{
MutexLockGuard lock(mutex_);
while (isFull() && running_)
{
notFull_.wait();
}
if (!running_)
return;
assert(!isFull());
queue_.push_back(std::move(task));
notEmpty_.notify();
}
}
while (queue_.empty() && running_)
{
notEmpty_.wait();//如果任务队列空,挂起该线程
}
分析下上面这块代码,while的目的是将该函数变为阻塞式函数,notEmpty_.wait(),挂起该线程,是为了避免对cpu的消耗。当线程([生产者线程)调用notEmpty_.notify(),唤醒调用notEmpty_.wait()堵塞的线程(消费者线程),消费者线程执行while,这时候任务队列不为空了,解除堵塞,执行下面逻辑。