0
点赞
收藏
分享

微信扫一扫

C++11线程池的创建

#pragma once

#include <vector>
#include <queue>
#include <string>
#include <thread>
#include <future>
#include <memory>
#include <stdexcept>
#include <functional>
#include <condition_variable>
#include "plm_downloader.hpp"
#include <windows.h>

template<class TaskType>
class SafeQueue
{
public:
SafeQueue(){}
~SafeQueue() {}

bool IsEmpty()
{
std::unique_lock<std::mutex> lock(mSQMutex);

return mSQueue.empty();
}

int Size()
{
std::unique_lock<std::mutex> lock(mSQMutex);

return mSQueue.size();
}

void Push(TaskType& iData)
{
std::unique_lock<std::mutex> lock(mSQMutex);

mSQueue.emplace(iData);
}

bool Pop(TaskType& oData)
{
std::unique_lock<std::mutex> lock(mSQMutex);

if (mSQueue.empty())
{
return false;
}

oData = std::move(mSQueue.front());
mSQueue.pop();
return true;
}
private:
std::queue<TaskType> mSQueue;
std::mutex mSQMutex;
};


class ThreadPool
{
public:
ThreadPool()
:mIsClosePool(false)
{}
ThreadPool(const ThreadPool&) = delete;
ThreadPool(ThreadPool&&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
ThreadPool& operator=(ThreadPool&&) = delete;

~ThreadPool()
{}

void Init()
{
mVectThreadsPool.clear();
SYSTEM_INFO Info;
GetSystemInfo(&Info);
mThreadNumbersPool = Info.dwNumberOfProcessors + 1;
mVectThreadsPool.resize(mThreadNumbersPool);
std::unique_lock<std::mutex> lock(mMutexPool);

for (int i = 0; i < mThreadNumbersPool; i++)
{
mVectThreadsPool.at(i) = std::thread(Thread(this, i));
}
}

template<class F, class... Args>
auto PushTask(F&& f, Args&& ...args) -> std::future<decltype(f(args...))>
{
using RetType = decltype(f(args...));
auto task = std::make_shared<std::packaged_task<RetType()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));


std::function<void()> TaskFunc = [task]()
{
(*task)();
};
mSQueuePool.Push(TaskFunc);
mConditionPool.notify_all();

return task->get_future();
}

void Close()
{
mIsClosePool = true;
mConditionPool.notify_all();

for (int i = 0; i < mThreadNumbersPool; i++)
{
if (mVectThreadsPool.at(i).joinable())
{
mVectThreadsPool.at(i).join();
std::thread::id id = mVectThreadsPool.at(i).get_id();
}
}
}

private:
SafeQueue<std::function<void()>> mSQueuePool;
std::vector<std::thread> mVectThreadsPool;
int mThreadNumbersPool;
bool mIsClosePool;

private:
std::mutex mMutexPool;
std::condition_variable mConditionPool;

private:
class Thread
{
public:
Thread(ThreadPool* pool, int id)
:mCTPool(pool),
mId(id)
{}

~Thread()
{}

void operator()()
{
std::function<void()> func;

bool IsPop;
while (!mCTPool->mIsClosePool)
{
std::unique_lock<std::mutex> lock(mCTPool->mMutexPool);
if (mCTPool->mSQueuePool.IsEmpty())
{
mCTPool->mConditionPool.wait_for(lock, std::chrono::seconds(1));
}

IsPop = mCTPool->mSQueuePool.Pop(func);

if (IsPop)
{
func();
}
}

}
private:
int mId;
ThreadPool* mCTPool;
};
};

C++11线程池涉及到锁,条件变量,安全队列(STL不是线程安全),线程库新特性,bind等机制。


举报

相关推荐

0 条评论