目录
bind的实际使用
日志如何实现
具体代码
//Main.cc
#include <iostream>
#include "Threadpool.hpp"
#include "Task.hpp"
#include <ctime>
#include <cstdlib>
int main()
{
srand(time(nullptr));
// Enablescreen();
EnableFile();
ThreadPool<Task> tp(5);
tp.InitThreadPool();
tp.Start();
int tasknum = 10;
while (tasknum)
{
int a = rand() % 10 + 1;
usleep(1234);
int b = rand() % 5 + 1;
Task t(a, b);
LOG(INFO, "main thread push task: %s", t.DebugToString().c_str());
tp.Enqueue(t);
sleep(1);
tasknum--;
}
tp.Wait();
return 0;
}
//ThreadPool.hpp
#pragma once
#include <iostream>
#include <vector>
#include <queue>
#include <string>
#include <memory>
#include <pthread.h>
#include <unistd.h>
#include "MyThread.hpp"
#include"Task.hpp"
#include"Log.hpp"
const static int gdefaultthreadnum = 3;
template <class T>
class ThreadPool
{
public:
ThreadPool(int threadnum = gdefaultthreadnum)
: _threadnum(threadnum), _waitnum(0), _isrunning(false)
{
pthread_mutex_init(&_mutex, nullptr);
pthread_cond_init(&_cond, nullptr);
LOG(INFO, "ThreadPool Construct()");
}
void HandlerTask(std::string &name)
{
LOG(INFO, "%s is running...", name.c_str());
while (true)
{
pthread_mutex_lock(&_mutex);
while (_task_queue.empty() && _isrunning)
{
_waitnum++;
pthread_cond_wait(&_cond, &_mutex);
_waitnum--;
}
if (_task_queue.empty() && !_isrunning)
{
pthread_mutex_unlock(&_mutex);
break;
}
T t = _task_queue.front();
_task_queue.pop();
pthread_mutex_unlock(&_mutex);
LOG(DEBUG, "%s get a task", name.c_str());
t();
LOG(DEBUG, "%s handler a task, result is: %s", name.c_str(), t.ResultToString().c_str());
}
}
void InitThreadPool()
{
for (int i = 0; i < _threadnum; i++)
{
std::string name = "thread-" + std::to_string(i + 1);
_threads.emplace_back(std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1), name);
LOG(INFO, "init thread %s done", name.c_str());
}
_isrunning = true;
}
void Start()
{
for (auto &thread : _threads)
{
thread.start();
}
}
void Stop()
{
pthread_mutex_lock(&_mutex);
_isrunning = false;
pthread_cond_broadcast(&_cond);
pthread_mutex_unlock(&_mutex);
}
bool Enqueue(const T &t)
{
bool ret = false;
pthread_mutex_lock(&_mutex);
if (_isrunning)
{
_task_queue.push(t);
if (_waitnum > 0)
{
pthread_cond_signal(&_cond);
}
LOG(DEBUG, "enqueue task success");
ret = true;
}
pthread_mutex_unlock(&_mutex);
return ret;
}
void Wait()
{
for (auto &thread : _threads)
{
thread.join();
LOG(INFO, "%s is quit...", thread.name().c_str());
}
}
~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_cond);
}
private:
int _threadnum;
std::vector<Thread<T>> _threads;
std::queue<T> _task_queue;
pthread_mutex_t _mutex;
pthread_cond_t _cond;
int _waitnum;
bool _isrunning;
};
//MyThread.hpp
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include <pthread.h>
using namespace std;
using fun_t = function<void(string&)>;
template <class T>
class Thread
{
private:
void excute(string& name)
{
_func(name);
}
public:
Thread(fun_t func, const string&name="noname")
: _func(func), _name(name), _stop(true) {}
static void *threadrun(void *args)//如果不是静态,会有this指针
{
Thread<T> *ptr = reinterpret_cast<Thread<T> *>(args);
ptr->excute(ptr->_name);
return nullptr;
}
bool start()
{
int n = pthread_create(&_id, nullptr, threadrun, this);//把this当参数传过去
if (n == 0)
{
_stop = false;
return true;
}
else
{
return false;
}
}
void join()
{
if (!_stop)
{
pthread_join(_id, nullptr);
}
}
void detach()
{
if (!_stop)
{
pthread_detach(_id);
}
}
void stop()
{
_stop = true;
}
string name()
{
return _name;
}
private:
pthread_t _id;
string _name;
bool _stop;
fun_t _func;
};
//Log.hpp
#pragma once
#include <iostream>
#include <string>
#include<fstream>
#include <ctime>
#include <cstdarg>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>
bool gIsSave = false;
const char *logname = "log.txt";
enum Level
{
DEBUG,
INFO,
WARNING,
ERROR,
FATAL
};
std::string LevelToString(int level)
{
switch (level)
{
case DEBUG:
return "Debug";
case INFO:
return "Info";
case WARNING:
return "Warning";
case ERROR:
return "Error";
case FATAL:
return "Fatal";
default:
return "Unknown";
}
}
std::string GetTimeString()
{
time_t curr_time = time(nullptr); // 获取当前时间戳
struct tm *format_time = localtime(&curr_time);
if (format_time == nullptr)
return "None_time";
char time_buffer[1024];
snprintf(time_buffer, sizeof(time_buffer), "%d-%d-%d %d:%d:%d",
format_time->tm_year + 1900,
format_time->tm_mon + 1,
format_time->tm_mday,
format_time->tm_hour,
format_time->tm_min,
format_time->tm_sec);
return time_buffer;
}
pthread_mutex_t glock = PTHREAD_MUTEX_INITIALIZER;
void SaveMessage(const char *filename, std::string &message)
{
std::ofstream out(filename, std::ios::app);
if (!out.is_open())
{
return;
}
out << message;
out.close();
}
void LogMessage(std::string filename, int line, bool issave, int level, const char *format, ...)
{
std::string levelstr = LevelToString(level);
std::string timestr = GetTimeString();
pid_t selfid = getpid();
char buffer[1024];
va_list arg;
va_start(arg, format); // arg和第一个参数
vsnprintf(buffer, sizeof(buffer), format, arg);
va_end(arg);
std::string message = "[" + timestr + "]" + "[" + levelstr + "]" +
"[" + std::to_string(selfid) + "]" +
"[" + filename + "]" + "[" + std::to_string(line) + "] " + buffer + "\n";
pthread_mutex_lock(&glock);
if (!issave) // 如果不用保存
{
std::cout << message;
}
else
{
SaveMessage(logname, message);
}
pthread_mutex_unlock(&glock);
}
#define LOG(level,format,...) do{LogMessage(__FILE__,__LINE__,gIsSave,level,format,##__VA_ARGS__);}while(0)
#define EnableFile() do{gIsSave=true;}while(0)
#define Enablescreen() do{gIsSave=false;}while(0)
//Task.hpp
#pragma once
#include <iostream>
#include <string>
#include <functional>
class Task
{
public:
Task() {}
Task(int a, int b) : _a(a), _b(b), _result(0)
{
}
void Excute()
{
_result = _a + _b;
}
std::string ResultToString()
{
return std::to_string(_a) + "+" + std::to_string(_b) + "=" + std::to_string(_result);
}
std::string DebugToString()
{
return std::to_string(_a) + "+" + std::to_string(_b) + "=?";
}
void operator()()
{
Excute();
}
private:
int _a;
int _b;
int _result;
};