Linux线程同步
1.互斥量
确保同一时间只有一个线程访问数据。
pthread_mutex_t mut;
//两种初始化方法
mut = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_init(&mut, NULL);
1.初始化
int pthread_mutex_init (pthread_mutex_t *__mutex,
const pthread_mutexattr_t *__mutexattr);
2.销毁
int pthread_mutex_destroy (pthread_mutex_t *__mutex)
3.加锁/解锁
extern int pthread_mutex_trylock (pthread_mutex_t *__mutex);
/* Lock a mutex. */
extern int pthread_mutex_lock (pthread_mutex_t *__mutex);
#ifdef __USE_XOPEN2K
/* Wait until lock becomes available, or specified time passes. */
extern int pthread_mutex_timedlock (pthread_mutex_t *__restrict __mutex,
const struct timespec *__restrict);
#endif
/* Unlock a mutex. */
extern int pthread_mutex_unlock (pthread_mutex_t *__mutex);
4.按顺序输出abcd
分别定义四个锁(a锁、b锁、c锁、d锁),初始时,将四个锁全部锁住,主线程创建完子线程后,解开a锁,然后a线程输出完后解开b锁,锁住a锁,后面同理,d输出后解开a锁,锁住d锁,这样就能循环有序输出了。
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#define N 4
static pthread_mutex_t muts[N];
static int flag = 1;
static int get_next(int i){
if(i < N - 1)
return i + 1;
else
return 0;
}
static void *pth_fun(void *arg)
{
char ch = 'a';
int i = (int)arg;
char n = ch + i;
while (flag)
{
pthread_mutex_lock(muts + i);
putchar(n);
pthread_mutex_unlock(muts + get_next(i));
}
pthread_exit((void *)0);
}
int main(int argc, char const *argv[])
{
int err, i;
pthread_t tids[N];
for (i = 0; i < N; i++)
{
pthread_mutex_init(muts + i, NULL);
pthread_mutex_lock(muts + i);
err = pthread_create(tids + i, NULL, pth_fun, (void *)i);
if (err)
{
fprintf(stderr, "pthread_creare() %s\n", strerror(err));
exit(err);
}
}
pthread_mutex_unlock(muts);
sleep(3);
flag = 0;
for (i = 0; i < N; i++)
{
pthread_join(tids[i], NULL);
pthread_mutex_destroy(muts + i);
}
return 0;
}
5.带引用计数的结构体使用
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
//带引用次数的结构
struct foo
{
int count;
int id;
pthread_mutex_t mut;
};
#define N 4
static pthread_t tids[N];
static struct foo *foop;//全局指针,每个线程都能访问到
//为数据分配内存
static struct foo *foo_alloc(int id)
{
struct foo *fp;
fp = malloc(sizeof(*fp));
if (fp == NULL)
{
perror("malloc()");
exit(-1);
}
fp->count = 0;// 初始引用数为0
fp->id = id;
pthread_mutex_init(&fp->mut, NULL);
return fp;
}
//每引用一次引用数+1
static void foo_hold(struct foo *fp)
{
pthread_mutex_lock(&fp->mut);
fp->count++;
pthread_mutex_unlock(&fp->mut);
}
//释放时引用数-1
static void foo_rele(struct foo *fp)
{
pthread_mutex_lock(&fp->mut);//对count操作前,加锁,防止当前线程修改时,其他线程也在修改
if (--fp->count == 0)//引用数为0时,销毁锁,释放内存
{
printf("aaaa\n");
pthread_mutex_unlock(&fp->mut);
pthread_mutex_destroy(&fp->mut);
free(fp);
fp = NUll;
}
else
{
pthread_mutex_unlock(&fp->mut);
}
}
//线程启动函数
static void *thr_fun(void *arg)
{
int i = (int)arg;
if(foop == NUll)
pthread_exit((void *)0);
foo_hold(foop);//引用
printf("th%d %d,%d\n", i, foop->id, foop->count);
sleep(2);//线程睡2秒在释放
printf("th%d %d,%d\n", i, foop->id, foop->count);
foo_rele(foop);//释放
pthread_exit((void *)0);
}
int main(int argc, char const *argv[])
{
int i, err;
foop = foo_alloc(10);//数据设为10
for (i = 0; i < N; i++)
{
//构造线程
err = pthread_create(tids + i, NULL, thr_fun, (void *)i);
if (err)
{
fprintf(stderr, "pthread_create() %s\n", strerror(err));
exit(err);
}
}
for (i = 0; i < N; i++)
{
//回收线程资源
pthread_join(tids[i], NULL);
}
return 0;
}
执行结果:
th0 10,1
th1 10,2
th2 10,3
th3 10,4
th2 10,3
th0 10,2
th1 10,1
aaaa
th3 0,0
从执行结果可以看出,线程在引用数据时,引用数是有序递增的,释放数据时,虽然线程运行的次序和创建是不一致但是引用数也是有序递减的,
2.读写锁
读写锁有三种状态,读模式下加锁状态,写模式下加锁状态,不加锁状态。一次只有一个线程占有写模式下的读写锁,但是多个线程可以同时占有读模式下的读写锁。
/* Initialize read-write lock RWLOCK using attributes ATTR, or use
the default values if later is NULL. */
extern int pthread_rwlock_init (pthread_rwlock_t *__restrict __rwlock,
const pthread_rwlockattr_t *__restrict);
/* Destroy read-write lock RWLOCK. */
extern int pthread_rwlock_destroy (pthread_rwlock_t *__rwlock);
/* Acquire read lock for RWLOCK. */
extern int pthread_rwlock_rdlock (pthread_rwlock_t *__rwlock);
/* Acquire write lock for RWLOCK. */
extern int pthread_rwlock_wrlock (pthread_rwlock_t *__rwlock);
/* Try to acquire write lock for RWLOCK. */
extern int pthread_rwlock_trywrlock (pthread_rwlock_t *__rwlock);
/* Unlock RWLOCK. */
extern int pthread_rwlock_unlock (pthread_rwlock_t *__rwlock);
1.多线程下双向链表的使用。
同时只能一个线程修改链表中的数据时,但可以多个线程同时读链表中的数据。
下面的例子模拟的是一种生产者和消费者的情形。主线程作为生产者,根据不同的线程id,向不同的子线程分派任务(向链表中写入数据),多个子线程根据自己的线程id,从链表中找到自己的任务(从链表中查找数据),将任务消费掉(从链表中删除数据)。
注意:
-
当主线程分配完任务后,等待一定的时间,取消子线程。
调用pthread_cancel()时,系统不会立即关闭被取消的线程,而是在被取消线程下一次系统调用时结束线程。如果没有系统调用,可以在被取消线程中,执行pthread_testcancel()检查线程是否被取消,如果被取消,线程会立即结束。
-
线程取消返回的值为-1。
-
系统调用:一般是会访问到硬件资源的调用,执行在核心态。
如:文件读写。
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
struct job_st
{
struct job_st *prev;
struct job_st *next;
pthread_t tid;
int id;
};
struct queue_st
{
struct job_st *head;
struct job_st *tail;
pthread_rwlock_t q_lock;
};
//初始化链表
static struct queue_st *queue_init()
{
struct queue_st *ptr;
int err;
if ((ptr = malloc(sizeof(*ptr))) == NULL)
{
return NULL;
}
ptr->head = NULL;
ptr->tail = NULL;
err = pthread_rwlock_init(&ptr->q_lock, NULL);
if (err)
{
perror("lock init()");
free(ptr);
return NULL;
}
return ptr;
}
//前插
static void queue_insert(struct queue_st *qp, struct job_st *jp)
{
pthread_rwlock_wrlock(&qp->q_lock);//要修改链表,加写锁
jp->next = qp->head;//将当前节点的后继指向链表的头结点。
jp->prev = NULL;//当前节点作为头节点,没有前驱节点。
if (qp->head != NULL)//如果先前链表的头节点不为空,将其前驱节点设为当前节点,
{
qp->head->prev = jp;
}
else//否则,说明原先的链表中没有节点,将链表的尾节点也设为当前节点
{
qp->tail = jp;
//这种情况,链表中只有一个节点,jp,jp->next = NULL,jp->prev = NULL
}
qp->head = jp;//链表头节点指向当前节点
pthread_rwlock_unlock(&qp->q_lock);
}
//后插
static void queue_append(struct queue_st *qp, struct job_st *jp)
{
pthread_rwlock_wrlock(&qp->q_lock);//要修改链表,加写锁
jp->prev = qp->tail;//当前节点的前驱指向链表的尾节点。
jp->next = NULL;
if (qp->tail != NULL)
{
qp->tail->next = jp;//尾节点后继,指向当前节点
}
else
{
qp->head = jp;
}
qp->tail = jp;//尾节点指向当前节点。
pthread_rwlock_unlock(&qp->q_lock);
}
static void queue_remove(struct queue_st *qp, struct job_st *jp)
{
if(jp == NULL){
return;
}
pthread_rwlock_wrlock(&qp->q_lock);//要修改链表,加写锁
struct job_st *p;
if (jp == qp->head)//当前节点是头节点时
{
qp->head = jp->next;
if (qp->tail == jp)//当前节点也是尾节点
{
qp->tail = NULL;
}
else
{
qp->head->prev = jp->prev;
}
}
else if (jp == qp->tail)//当前节点是尾节点时
{
qp->tail = jp->prev;
qp->tail->next = jp->next;
}
else
{
jp->next->prev = jp->prev;
jp->prev->next = jp->next;
}
pthread_rwlock_unlock(&qp->q_lock);
free(jp);
}
static struct job_st *queue_find(struct queue_st *qp, pthread_t tid)
{
pthread_rwlock_rdlock(&qp->q_lock);//查找数据,不会修改链表,加读锁
struct job_st *jp = NULL;
for (jp = qp->head; jp != NULL; jp = jp->next)
{
if (pthread_equal(tid, jp->tid))
{
break;
}
}
pthread_rwlock_unlock(&qp->q_lock);
return jp;
}
static void queue_destory(struct queue_st *qp)
{
struct job_st *jp = qp->head, *tmp;
pthread_rwlock_wrlock(&qp->q_lock);
while (jp != NULL)
{
tmp = jp;
jp = jp->next;
free(jp);
}
pthread_rwlock_unlock(&qp->q_lock);
pthread_rwlock_destroy(&qp->q_lock);
free(qp);
}
static struct queue_st *main_qp;
static void clean_fun(void *arg){
printf("th%d: cleanup\n", (int)arg);
}
//消费者线程
static void *thr_fun(void *arg)
{
int th = (int)arg;
struct job_st *tmp, *jp;
pthread_cleanup_push(clean_fun, (void *)th);
while (1)
{
//检查线程是否被取消,不加上,子线程不会取消的,因为下面的代码没有系统调用
pthread_testcancel();
jp = queue_find(main_qp, pthread_self());//查找需要消费的任务
if (jp != NULL)
{
tmp = jp;
printf("th%d: %d\n", th,jp->id);
jp = tmp->next;
queue_remove(main_qp, tmp);//消费任务
}else{
sched_yield();//如果没有找到让出CPU
}
}
pthread_cleanup_pop(0);
pthread_exit((void *)0);
}
//消费者线程(这是函数(例程),只是消费者线程执行该函数,不要搞混)
int main(int argc, char const *argv[])
{
int err, i;
struct job_st *jp;
pthread_t tids[5];
main_qp = queue_init();
for (i = 0; i < 5; i++)
{
err = pthread_create(tids + i, NULL, thr_fun, (void *)i);//创建消费者线程
if (err)
{
fprintf(stderr, "thread created %s\n", strerror(err));
exit(err);
}
}
for (i = 0; i < 10; i++)
{
jp = malloc(sizeof(*jp));
jp->id = i;
jp->tid = tids[i%5];
queue_insert(main_qp, jp);//向链表中插入数据(生成任务)
}
sleep(5);//五秒后取消消费者线程
for (i = 0; i < 5; i++)
pthread_cancel(tids[i]);
queue_destory(main_qp);
main_qp = NULL;
for (i = 0; i < 5; i++)
{
err = pthread_join(tids[i], NULL);//回收子线程资源
if (err)
{
fprintf(stderr, "thread join %s\n", strerror(err));
exit(err);
}
}
return 0;
}
执行结果:
th3: 3
th3: 8
th1: 6
th1: 1
th2: 7
th2: 2
th0: 5
th0: 0
th4: 9
th4: 4
th3: cleanup
th1: cleanup
th2: cleanup
th4: cleanup
th0: cleanup
可以看到对应的线程消费了与其线程id匹配的任务。
3.条件变量
pthread_cond_t cond;
cond = PTHRAD_COND_INITIALIZER;
pthread_cond_init(&cond, NULL);
1.初始化
int pthread_cond_init (pthread_cond_t *__cond,
const pthread_condattr_t *__cond_attr);
2.销毁
/* Destroy condition variable COND. */
int pthread_cond_destroy (pthread_cond_t *__cond);
3.阻塞线程,等待条件变量
/* Wait for condition variable COND to be signaled or broadcast.
MUTEX is assumed to be locked before.
This function is a cancellation point and therefore not marked with
__THROW. */
int pthread_cond_wait (pthread_cond_t *__cond,
pthread_mutex_t *__mutex);
/* Wait for condition variable COND to be signaled or broadcast until
ABSTIME. MUTEX is assumed to be locked before. ABSTIME is an
absolute time specification; zero is the beginning of the epoch
(00:00:00 GMT, January 1, 1970).
This function is a cancellation point and therefore not marked with
__THROW. */
int pthread_cond_timedwait (pthread_cond_t *__cond,pthread_mutex_t *__mutex,
const struct timespec *__abstime);
4.唤醒被阻塞的线程
//给某一条别阻塞的线程发通知,那一条不确定,其余线程继续阻塞
/* Wake up one thread waiting for condition variable COND. */
int pthread_cond_signal (pthread_cond_t *__cond);
//给所有被阻塞的线程发通知,抢到通知的线程运行,其余线程继续阻塞
/* Wake up all threads waiting for condition variables COND. */
int pthread_cond_broadcast (pthread_cond_t *__cond);
在上面互斥量的两个例子中,当不满足线程处理的条件时,程序一直处于循环等待,这就使得CPU一直处于忙等状态,浪费CPU资源。我们可以使用条件变量改善这种情况,使线程在不满足条件时,处于阻塞态,不消耗CPU。下面是改进后的两个例子。
5.按顺序输出abcd
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#define N 4
static pthread_t tids[N];
static pthread_mutex_t lock = PTHREAD_COND_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;//引入条件变量
static int pos = 0;
static int get_next(int i)
{
if (i < N - 1)
return i + 1;
else
return 0;
}
static void *pth_fun(void *arg)
{
int n = (int)arg;
char buf[] = {'a' + n};
while (1)
{
pthread_mutex_lock(&lock);
while (!pthread_equal(tids[pos], pthread_self()))//是否满足条件
{
pthread_cond_wait(&cond, &lock);//不满足,阻塞,等待通知
/*相当于
pthread_mutex_unlock(&lock);
while(&cond);//只不过,这儿不是死循环,忙等,而是阻塞线程
pthread_mutex_lock(&lock);
*/
}
//putchar(ch + n);
write(STDOUT_FILENO, buf, 1);
pos = get_next(pos);//修改条件
pthread_mutex_unlock(&lock);
pthread_cond_broadcast(&cond);//通知被阻塞的线程,持有cond的线程等会被通知到
}
pthread_exit((void *)0);
}
int main(int argc, char const *argv[])
{
int err, i;
pthread_mutex_lock(&lock);
for (i = 0; i < N; i++)
{
err = pthread_create(tids + i, NULL, pth_fun, (void *)i);
if (err)
{
fprintf(stderr, "pthread_creare() %s\n", strerror(err));
exit(err);
}
}
pthread_mutex_unlock(&lock);
pthread_cond_broadcast(&cond);
alarm(3);//三秒后,终止进程
for (i = 0; i < N; i++)
{
pthread_join(tids[i], NULL);
}
return 0;
}
6.多线程下双向链表的使用
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
struct job_st
{
struct job_st *prev;
struct job_st *next;
pthread_t tid;
int id;
};
struct queue_st
{
struct job_st *head;
struct job_st *tail;
pthread_rwlock_t q_lock;
};
//初始化链表
static struct queue_st *queue_init()
{
struct queue_st *ptr;
int err;
if ((ptr = malloc(sizeof(*ptr))) == NULL)
{
return NULL;
}
ptr->head = NULL;
ptr->tail = NULL;
err = pthread_rwlock_init(&ptr->q_lock, NULL);
if (err)
{
perror("lock init()");
free(ptr);
return NULL;
}
return ptr;
}
//前插
static void queue_insert(struct queue_st *qp, struct job_st *jp)
{
pthread_rwlock_wrlock(&qp->q_lock); //要修改链表,加写锁
jp->next = qp->head; //将当前节点的后继指向链表的头结点。
jp->prev = NULL; //当前节点作为头节点,没有前驱节点。
if (qp->head != NULL) //如果先前链表的头节点不为空,将其前驱节点设为当前节点,
{
qp->head->prev = jp;
}
else //否则,说明原先的链表中没有节点,将链表的尾节点也设为当前节点
{
qp->tail = jp;
//这种情况,链表中只有一个节点,jp,jp->next = NULL,jp->prev = NULL
}
qp->head = jp; //链表头节点指向当前节点
pthread_rwlock_unlock(&qp->q_lock);
}
//后插
static void queue_append(struct queue_st *qp, struct job_st *jp)
{
pthread_rwlock_wrlock(&qp->q_lock); //要修改链表,加写锁
jp->prev = qp->tail; //当前节点的前驱指向链表的尾节点。
jp->next = NULL;
if (qp->tail != NULL)
{
qp->tail->next = jp; //尾节点后继,指向当前节点
}
else
{
qp->head = jp;
}
qp->tail = jp; //尾节点指向当前节点。
pthread_rwlock_unlock(&qp->q_lock);
}
static void queue_remove(struct queue_st *qp, struct job_st *jp)
{
if (jp == NULL)
{
return;
}
pthread_rwlock_wrlock(&qp->q_lock); //要修改链表,加写锁
struct job_st *p;
if (jp == qp->head) //当前节点是头节点时
{
qp->head = jp->next;
if (qp->tail == jp) //当前节点也是尾节点
{
qp->tail = NULL;
}
else
{
qp->head->prev = jp->prev;
}
}
else if (jp == qp->tail) //当前节点是尾节点时,
{
qp->tail = jp->prev;
qp->tail->next = jp->next;
}
else
{
jp->next->prev = jp->prev;
jp->prev->next = jp->next;
}
pthread_rwlock_unlock(&qp->q_lock);
free(jp);
}
static struct job_st *queue_find(struct queue_st *qp, pthread_t tid)
{
pthread_rwlock_rdlock(&qp->q_lock); //查找数据,不会修改链表,加读锁
struct job_st *jp = NULL;
for (jp = qp->head; jp != NULL; jp = jp->next)
{
if (pthread_equal(tid, jp->tid))
{
break;
}
}
pthread_rwlock_unlock(&qp->q_lock);
return jp;
}
static void queue_destory(struct queue_st *qp)
{
struct job_st *jp = qp->head, *tmp;
pthread_rwlock_wrlock(&qp->q_lock);
while (jp != NULL)
{
tmp = jp;
jp = jp->next;
free(jp);
}
pthread_rwlock_unlock(&qp->q_lock);
pthread_rwlock_destroy(&qp->q_lock);
free(qp);
}
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
struct queue_st *main_qp = NULL;
static void clean_fun(void *arg)
{
printf("th%d: cleanup\n", (int)arg);
}
static void *thr_fun(void *arg)
{
int th = (int)arg;
struct job_st *jp;
pthread_cleanup_push(clean_fun, (void *)th);
pid_t pid;
while (main_qp)
{
pthread_mutex_lock(&mut);
while (main_qp && (jp = queue_find(main_qp, pthread_self())) == NULL)//判断
{
pthread_cond_wait(&cond, &mut);//不满足继续阻塞
}
pthread_mutex_unlock(&mut);//先解锁,在通知
pthread_cond_broadcast(&cond);//给所有线程发通知,抢到的线程运行,没抢到的继续阻塞
if (main_qp)
{
printf("th%d: %d\n", th, jp->id);
queue_remove(main_qp, jp);
}
}
pthread_cleanup_pop(1);
pthread_exit((void *)0);
}
int main(int argc, char const *argv[])
{
int err, i;
struct job_st *jp;
pthread_t tids[5];
main_qp = queue_init();
for (i = 0; i < 5; i++)
{
err = pthread_create(tids + i, NULL, thr_fun, (void *)i);
if (err)
{
fprintf(stderr, "thread created %s\n", strerror(err));
exit(err);
}
}
for (i = 0; i < 10; i++)
{
jp = malloc(sizeof(*jp));
jp->id = i;
jp->tid = tids[i % 5];
queue_insert(main_qp, jp);
}
sleep(2);
queue_destory(main_qp);
main_qp = NULL;
pthread_mutex_unlock(&mut);
pthread_cond_broadcast(&cond);
for (i = 0; i < 5; i++)
{
err = pthread_join(tids[i], NULL);
if (err)
{
fprintf(stderr, "thread join %s\n", strerror(err));
exit(err);
}
}
pthread_mutex_destroy(&mut);
pthread_cond_destroy(&cond);
return 0;
}
执行结果:
th4: 9
th4: 4
th3: 8
th3: 3
th2: 7
th2: 2
th1: 6
th0: 5
th0: 0
th1: 1
th2: cleanup
th3: cleanup
th4: cleanup
th1: cleanup
th0: cleanup