0
点赞
收藏
分享

微信扫一扫

【代码备份】C语言线程池传参成功



代码目录

  • ​​main.c​​
  • ​​thread_pool.c​​
  • ​​thread_pool.h​​
  • ​​Makefile​​
  • ​​公众号【图控大叔】​​

main.c

#include "thread_pool.h"

void *mytask(void *arg1, void *arg2)
{
long n = (long)arg1;
printf("第二个参数是 is %s\n", (char *)arg2);
printf("线程id为[%ld]的线程准备工作 %ld 秒...\n",
pthread_self(), n);

sleep(n);

printf("线程id为[%ld]的线程工作 %ld 秒结束了******************\n",
pthread_self(), n);

return NULL;
}

void *count_time(void *arg)
{
int i = 0;
while(1)
{
sleep(1);
printf("当前是第%d秒\n", ++i);
}
}

int main(void)
{
pthread_t a;

//只是用来创建一天用于计时的线程,跟线程池一点关系都没有
pthread_create(&a, NULL, count_time, NULL);

thread_pool *pool = malloc(sizeof(thread_pool));

// 1, 初始化线程池,并且一开始就有两条线程
init_pool(pool, 2);

// 2, 任务投放
printf("投放3个任务\n");
add_task(pool, mytask, (void *)((rand()%10)*1L), (void *)("hhhhh"));
add_task(pool, mytask, (void *)((rand()%10)*1L), (void *)("xixixi"));
add_task(pool, mytask, (void *)((rand()%10)*1L), (void *)("heiheihei"));

// 3, 检查当前线程数量
printf("当前线程数量为:%d\n",
remove_thread(pool, 0));
sleep(9);

//增加线程数量
add_thread(pool, 2);

sleep(5);

// 6, 删除线程(随意,因为删除线程池的时候,会全部删除线程)
printf("删除3条线程,当前线程数还剩: %d\n",
remove_thread(pool, 3));

// 7, 删除线程池
destroy_pool(pool);

//8、释放线程池资源
free(pool);

return 0;
}

thread_pool.c

#include "thread_pool.h"

void handler(void *arg)
{

pthread_mutex_unlock((pthread_mutex_t *)arg); //防止死锁,所以在这里添加解锁操作
}

/* 每一个线程池中的线程所执行的内容, arg就是线程池的地址 */
void *routine(void *arg)
{
thread_pool *pool = (thread_pool *)arg; //将线程池的地址存放进去pool
struct task *p; //定义一个缓冲指针,后期任务队列遍历的时候使用

while(1)
{

pthread_cleanup_push(handler, (void *)&pool->lock);//提前登记线程被取消后需要处理的事情
pthread_mutex_lock(&pool->lock); //由于需要操作线程池中的共有资源,所以加锁


while(pool->waiting_tasks == 0 && !pool->shutdown) //判断是否没有需要运行的任务
{
pthread_cond_wait(&pool->cond, &pool->lock); //让线程睡眠
}


if(pool->waiting_tasks == 0 && pool->shutdown == true) //判断线程池是否没有任务并且需要关闭
{
pthread_mutex_unlock(&pool->lock); //解锁
pthread_exit(NULL); //退出线程
}

p = pool->task_list->next; //让p登记需要运行的任务节点

pool->task_list->next = p->next; //将此任务节点从链表中删除

pool->waiting_tasks--; //将等待运行的任务队列-1

pthread_mutex_unlock(&pool->lock); //解锁
pthread_cleanup_pop(0); //解除登记取消线程之后所做的函数

pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); //忽略线程的取消操作
(p->task)(p->myarg1, p->myarg2); //函数调用
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); //重启线程取消操作

free(p); //释放任务节点
}

pthread_exit(NULL);
}

/* pool:线程池结构体的地址,threads_number:一开始初始化的线程数量 */
bool init_pool(thread_pool *pool, unsigned int threads_number)
{

pthread_mutex_init(&pool->lock, NULL); //初始化互斥锁
pthread_cond_init(&pool->cond, NULL); //初始化条件变量

pool->shutdown = false; //开启线程池的标志false:开
pool->task_list = malloc(sizeof(struct task)); //申请任务链表头
pool->tids = malloc(sizeof(pthread_t) * MAX_ACTIVE_THREADS);//申请最大线程数量的ID内存

if(pool->task_list == NULL || pool->tids == NULL) //判断两个申请的内存是否成功
{
perror("allocate memory error");
return false;
}

pool->task_list->next = NULL; //初始化链表头,将下一个节点指向NULL

pool->waiting_tasks = 0; //将等待运行的任务数量置0
pool->active_threads = threads_number; //登记当前的线程数量

int i;

for(i=0; i<pool->active_threads; i++) //创建线程池里面的线程
{
if(pthread_create(&((pool->tids)[i]), NULL,
routine, (void *)pool) != 0) //每一个线程都去跑routine这个函数的内容
{
perror("create threads error");
return false;
}
}

return true;
}


/* 投放任务:pool:线程池地址;task:任务需要运行的内容的函数指针; arg:传入给task函数的参数 */
bool add_task(thread_pool *pool,
void *(*fun_name)(void *myarg1, void *myarg2), void *arg1, void *arg2)
{
struct task *new_task = malloc(sizeof(struct task)); //新建一个任务节点
if(new_task == NULL)
{
perror("allocate memory error");
return false;
}
new_task->task = fun_name; //将任务需要做的函数存进task指针中
new_task->myarg1 = arg1; //将任务函数参数记录在arg里面
new_task->myarg2 = arg2; //将任务函数参数记录在arg里面

new_task->next = NULL; //将任务节点的下一个位置指向NULL


pthread_mutex_lock(&pool->lock); //上锁
if(pool->waiting_tasks >= MAX_WAITING_TASKS) //判断任务数量有没有超标
{
pthread_mutex_unlock(&pool->lock); //解锁

fprintf(stderr, "too many tasks.\n"); //反馈太多任务了
free(new_task); //释放掉刚才登记的任务节点

return false; //返回添加不了任务到任务链表中
}

struct task *tmp = pool->task_list; //将线程池中任务链表的头节点登记到tmp
while(tmp->next != NULL) //将tmp指向最后的节点的位置
tmp = tmp->next;

tmp->next = new_task; //将新建的任务节点插入到链表中
pool->waiting_tasks++; //将等待的任务数量+1

pthread_mutex_unlock(&pool->lock); //解锁
pthread_cond_signal(&pool->cond); //唤醒正在睡眠中的线程

return true; //返回添加成功
}

/* 添加线程到线程池中 pool:线程池结构体地址, additional_threads:添加的线程的数量 */
int add_thread(thread_pool *pool, unsigned additional_threads)
{
if(additional_threads == 0)
return 0;

unsigned total_threads =
pool->active_threads + additional_threads; //将总数记录在这个变量中

int i, actual_increment = 0;
for(i = pool->active_threads;
i < total_threads && i < MAX_ACTIVE_THREADS;
i++)
{
if(pthread_create(&((pool->tids)[i]),
NULL, routine, (void *)pool) != 0) //新建线程
{
perror("add threads error");

if(actual_increment == 0)
return -1;

break;
}
actual_increment++; //记录成功添加了多少条线程到线程池中
}

pool->active_threads += actual_increment; //将最后成功添加到线程池的线程总数记录线程池中
return actual_increment; //返回新建了多少条线程
}

int remove_thread(thread_pool *pool, unsigned int removing_threads)
{
if(removing_threads == 0)
return pool->active_threads;

int remain_threads = pool->active_threads - removing_threads; //将移除线程之后的线程数量登记下来
remain_threads = remain_threads>0 ? remain_threads:1; //如果这个数量不大于0,则把它置1

int i;
for(i=pool->active_threads-1; i>remain_threads-1; i--) //从id的最后一位线程开始取消
{
errno = pthread_cancel(pool->tids[i]);
if(errno != 0)
break;
}

if(i == pool->active_threads-1) //判断发送取消信号是否成功
return -1;
else
pool->active_threads = i+1; //将新的线程数量登记active_threads

return pool->active_threads; //返回剩下多少条线程在线程池中
}

bool destroy_pool(thread_pool *pool)
{

pool->shutdown = true; //使能线程池的退出开关
pthread_cond_broadcast(&pool->cond); //将所有的线程全部唤醒

int i;
for(i=0; i<pool->active_threads; i++) //开始接合线程
{
errno = pthread_join(pool->tids[i], NULL);
if(errno != 0)
{
printf("join tids[%d] error: %s\n",
i, strerror(errno));
}
else
printf("[%u] is joined\n", (unsigned)pool->tids[i]);

}

free(pool->task_list); //释放掉任务头节点
free(pool->tids); //释放掉线程ID内存

return true;
}

thread_pool.h

#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_

#include <stdio.h>
#include <stdbool.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>

#include <errno.h>
#include <pthread.h>

#define MAX_WAITING_TASKS 1000 //最大的等待任务数量
#define MAX_ACTIVE_THREADS 20 //最大的线程数量

struct task //任务链表结构体
{
void *(*task)(void *arg1,void *arg2); //任务做什么事情的函数指针
void *myarg1; //传入函数的参数
void *myarg2; //传入函数的参数

struct task *next; //链表的位置结构体指针
};


typedef struct thread_pool
{
pthread_mutex_t lock; //用于线程池同步互斥的互斥锁
pthread_cond_t cond; //用于让线程池里面的线程睡眠的条件变量
struct task *task_list; //线程池的执行任务链表

pthread_t *tids; //线程池里面线程的ID登记处

unsigned waiting_tasks; //等待的任务数量,也就是上面任务链表的长度
unsigned active_threads; //当前已经创建的线程数

bool shutdown; //线程池的开关
}thread_pool;


bool
init_pool(thread_pool *pool,
unsigned int threads_number); //初始化线程池

bool
add_task(thread_pool *pool,
void *(*fun_name)(void *myarg1, void *myarg2),
void *arg1, void *arg2); //往线程池里面添加任务节点
int
add_thread(thread_pool *pool,
unsigned int additional_threads_number); //添加线程池中线程的数量

int
remove_thread(thread_pool *pool,
unsigned int removing_threads_number); //移除线程

bool destroy_pool(thread_pool *pool); //销毁线程池
void *routine(void *arg); //线程池里面线程的执行函数

#endif

Makefile

CC = gcc
CFLAGS = -O0 -Wall -g -lpthread

test:main.c thread_pool.c
$(CC) $^ -o $@ $(CFLAGS)

debug:main.c thread_pool.c
$(CC) $^ -o $@ $(CFLAGS) -DDEBUG

clean:
$(RM) .*.sw? test debug *.o

.PHONY:all clean

公众号【图控大叔】



举报

相关推荐

0 条评论