0
点赞
收藏
分享

微信扫一扫

Linux线程同步

清冷的蓝天天 2022-01-08 阅读 82

Linux线程同步

1.互斥量

确保同一时间只有一个线程访问数据。

pthread_mutex_t mut;
//两种初始化方法
mut = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_init(&mut, NULL);
1.初始化
int pthread_mutex_init (pthread_mutex_t *__mutex,
                              const pthread_mutexattr_t *__mutexattr);
2.销毁
 int pthread_mutex_destroy (pthread_mutex_t *__mutex)
3.加锁/解锁
extern int pthread_mutex_trylock (pthread_mutex_t *__mutex);

/* Lock a mutex.  */
extern int pthread_mutex_lock (pthread_mutex_t *__mutex);

#ifdef __USE_XOPEN2K
/* Wait until lock becomes available, or specified time passes. */
extern int pthread_mutex_timedlock (pthread_mutex_t *__restrict __mutex,
                                    const struct timespec *__restrict);
#endif

/* Unlock a mutex.  */
extern int pthread_mutex_unlock (pthread_mutex_t *__mutex);
4.按顺序输出abcd

分别定义四个锁(a锁、b锁、c锁、d锁),初始时,将四个锁全部锁住,主线程创建完子线程后,解开a锁,然后a线程输出完后解开b锁,锁住a锁,后面同理,d输出后解开a锁,锁住d锁,这样就能循环有序输出了。

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>

#define N 4
static pthread_mutex_t muts[N];
static int flag = 1;

static int get_next(int i){
    if(i < N - 1)
        return i + 1;
    else
        return 0;
}

static void *pth_fun(void *arg)
{
    char ch = 'a';
    int i = (int)arg;
    char n = ch + i;
    while (flag)
    {
        pthread_mutex_lock(muts + i);
        putchar(n);
        pthread_mutex_unlock(muts + get_next(i));
    }
    pthread_exit((void *)0);
}

int main(int argc, char const *argv[])
{
    int err, i;
    pthread_t tids[N];
    for (i = 0; i < N; i++)
    {
        pthread_mutex_init(muts + i, NULL);
        pthread_mutex_lock(muts + i);
        err = pthread_create(tids + i, NULL, pth_fun, (void *)i);
        if (err)
        {
            fprintf(stderr, "pthread_creare() %s\n", strerror(err));
            exit(err);
        }
    }
    pthread_mutex_unlock(muts);
    sleep(3);
    flag = 0;
    for (i = 0; i < N; i++)
    {
        pthread_join(tids[i], NULL);
        pthread_mutex_destroy(muts + i);
    }
    return 0;
}
5.带引用计数的结构体使用
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>

//带引用次数的结构
struct foo
{
    int count;
    int id;
    pthread_mutex_t mut;
};

#define N 4
static pthread_t tids[N];
static struct foo *foop;//全局指针,每个线程都能访问到

//为数据分配内存
static struct foo *foo_alloc(int id)
{
    struct foo *fp;
    fp = malloc(sizeof(*fp));
    if (fp == NULL)
    {
        perror("malloc()");
        exit(-1);
    }
    fp->count = 0;// 初始引用数为0
    fp->id = id;
    pthread_mutex_init(&fp->mut, NULL);
    return fp;
}

//每引用一次引用数+1
static void foo_hold(struct foo *fp)
{
    pthread_mutex_lock(&fp->mut);
    fp->count++;
    pthread_mutex_unlock(&fp->mut);
}

//释放时引用数-1
static void foo_rele(struct foo *fp)
{
    pthread_mutex_lock(&fp->mut);//对count操作前,加锁,防止当前线程修改时,其他线程也在修改
    if (--fp->count == 0)//引用数为0时,销毁锁,释放内存
    {
        printf("aaaa\n");
        pthread_mutex_unlock(&fp->mut);
        pthread_mutex_destroy(&fp->mut);
        free(fp);
        fp = NUll;
    }
    else
    {
        pthread_mutex_unlock(&fp->mut);
    }
}

//线程启动函数
static void *thr_fun(void *arg)
{
    int i = (int)arg;
    if(foop == NUll)
        pthread_exit((void *)0);
    foo_hold(foop);//引用
    printf("th%d %d,%d\n", i, foop->id, foop->count);
    sleep(2);//线程睡2秒在释放
    printf("th%d %d,%d\n", i, foop->id, foop->count);
    foo_rele(foop);//释放
    pthread_exit((void *)0);
}

int main(int argc, char const *argv[])
{
    int i, err;
    foop = foo_alloc(10);//数据设为10
    for (i = 0; i < N; i++)
    {
        //构造线程
        err = pthread_create(tids + i, NULL, thr_fun, (void *)i);
        if (err)
        {
            fprintf(stderr, "pthread_create() %s\n", strerror(err));
            exit(err);
        }
    }
    for (i = 0; i < N; i++)
    {
        //回收线程资源
        pthread_join(tids[i], NULL);
    }
    return 0;
}
执行结果:
th0 10,1
th1 10,2
th2 10,3
th3 10,4
th2 10,3
th0 10,2
th1 10,1
aaaa
th3 0,0
    从执行结果可以看出,线程在引用数据时,引用数是有序递增的,释放数据时,虽然线程运行的次序和创建是不一致但是引用数也是有序递减的,

2.读写锁

读写锁有三种状态,读模式下加锁状态,写模式下加锁状态,不加锁状态。一次只有一个线程占有写模式下的读写锁,但是多个线程可以同时占有读模式下的读写锁。

/* Initialize read-write lock RWLOCK using attributes ATTR, or use
   the default values if later is NULL.  */
extern int pthread_rwlock_init (pthread_rwlock_t *__restrict __rwlock,
                                const pthread_rwlockattr_t *__restrict);
/* Destroy read-write lock RWLOCK.  */
extern int pthread_rwlock_destroy (pthread_rwlock_t *__rwlock);
/* Acquire read lock for RWLOCK.  */
extern int pthread_rwlock_rdlock (pthread_rwlock_t *__rwlock);
/* Acquire write lock for RWLOCK.  */
extern int pthread_rwlock_wrlock (pthread_rwlock_t *__rwlock);
/* Try to acquire write lock for RWLOCK.  */
extern int pthread_rwlock_trywrlock (pthread_rwlock_t *__rwlock);
/* Unlock RWLOCK.  */
extern int pthread_rwlock_unlock (pthread_rwlock_t *__rwlock);
1.多线程下双向链表的使用。

同时只能一个线程修改链表中的数据时,但可以多个线程同时读链表中的数据。

下面的例子模拟的是一种生产者和消费者的情形。主线程作为生产者,根据不同的线程id,向不同的子线程分派任务(向链表中写入数据),多个子线程根据自己的线程id,从链表中找到自己的任务(从链表中查找数据),将任务消费掉(从链表中删除数据)。

注意:

  1. 当主线程分配完任务后,等待一定的时间,取消子线程。

    调用pthread_cancel()时,系统不会立即关闭被取消的线程,而是在被取消线程下一次系统调用时结束线程。如果没有系统调用,可以在被取消线程中,执行pthread_testcancel()检查线程是否被取消,如果被取消,线程会立即结束。

  2. 线程取消返回的值为-1。

  3. 系统调用:一般是会访问到硬件资源的调用,执行在核心态。

    如:文件读写。

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>

struct job_st
{
    struct job_st *prev;
    struct job_st *next;
    pthread_t tid;
    int id;
};

struct queue_st
{
    struct job_st *head;
    struct job_st *tail;
    pthread_rwlock_t q_lock;
};

//初始化链表
static struct queue_st *queue_init()
{
    struct queue_st *ptr;
    int err;
    if ((ptr = malloc(sizeof(*ptr))) == NULL)
    {
        return NULL;
    }
    ptr->head = NULL;
    ptr->tail = NULL;
    err = pthread_rwlock_init(&ptr->q_lock, NULL);
    if (err)
    {
        perror("lock init()");
        free(ptr);
        return NULL;
    }
    return ptr;
}
//前插
static void queue_insert(struct queue_st *qp, struct job_st *jp)
{
    pthread_rwlock_wrlock(&qp->q_lock);//要修改链表,加写锁
    jp->next = qp->head;//将当前节点的后继指向链表的头结点。
    jp->prev = NULL;//当前节点作为头节点,没有前驱节点。
    if (qp->head != NULL)//如果先前链表的头节点不为空,将其前驱节点设为当前节点,
    {
        qp->head->prev = jp;
    }
    else//否则,说明原先的链表中没有节点,将链表的尾节点也设为当前节点
    {
        qp->tail = jp;
        //这种情况,链表中只有一个节点,jp,jp->next = NULL,jp->prev = NULL
    }
    qp->head = jp;//链表头节点指向当前节点
    pthread_rwlock_unlock(&qp->q_lock);
}

//后插
static void queue_append(struct queue_st *qp, struct job_st *jp)
{
    pthread_rwlock_wrlock(&qp->q_lock);//要修改链表,加写锁
    jp->prev = qp->tail;//当前节点的前驱指向链表的尾节点。
    jp->next = NULL;
    if (qp->tail != NULL)
    {
        qp->tail->next = jp;//尾节点后继,指向当前节点
    }
    else
    {
        qp->head = jp;
    }
    qp->tail = jp;//尾节点指向当前节点。
    pthread_rwlock_unlock(&qp->q_lock);
}

static void queue_remove(struct queue_st *qp, struct job_st *jp)
{
    if(jp == NULL){
        return;
    }
    pthread_rwlock_wrlock(&qp->q_lock);//要修改链表,加写锁
    struct job_st *p;
    if (jp == qp->head)//当前节点是头节点时
    {
        qp->head = jp->next;
        if (qp->tail == jp)//当前节点也是尾节点
        {
            qp->tail = NULL;
        }
        else
        {
            qp->head->prev = jp->prev;
        }
    }
    else if (jp == qp->tail)//当前节点是尾节点时
    {
        qp->tail = jp->prev;
        qp->tail->next = jp->next;
    }
    else
    {
        jp->next->prev = jp->prev;
        jp->prev->next = jp->next;
    }
    pthread_rwlock_unlock(&qp->q_lock);
    free(jp);
}

static struct job_st *queue_find(struct queue_st *qp, pthread_t tid)
{
    pthread_rwlock_rdlock(&qp->q_lock);//查找数据,不会修改链表,加读锁
    struct job_st *jp = NULL;
    for (jp = qp->head; jp != NULL; jp = jp->next)
    {
        if (pthread_equal(tid, jp->tid))
        {
            break;
        }
    }
    pthread_rwlock_unlock(&qp->q_lock);
    return jp;
}

static void queue_destory(struct queue_st *qp)
{
    struct job_st *jp = qp->head, *tmp;
    pthread_rwlock_wrlock(&qp->q_lock);
    while (jp != NULL)
    {
        tmp = jp;
        jp = jp->next;
        free(jp);
    }
    pthread_rwlock_unlock(&qp->q_lock);
    pthread_rwlock_destroy(&qp->q_lock);
    free(qp);
}

static struct queue_st *main_qp;
static void clean_fun(void *arg){
    printf("th%d: cleanup\n", (int)arg);
}

//消费者线程
static void *thr_fun(void *arg)
{
    int th = (int)arg;
    struct job_st *tmp, *jp;
    pthread_cleanup_push(clean_fun, (void *)th);
    while (1)
    {
        //检查线程是否被取消,不加上,子线程不会取消的,因为下面的代码没有系统调用
        pthread_testcancel();
        jp = queue_find(main_qp, pthread_self());//查找需要消费的任务
        if (jp != NULL)
        {
            tmp = jp;
            printf("th%d: %d\n", th,jp->id);
            jp = tmp->next;
            queue_remove(main_qp, tmp);//消费任务
        }else{
            sched_yield();//如果没有找到让出CPU
        }
    }
    pthread_cleanup_pop(0);
    pthread_exit((void *)0);
}

//消费者线程(这是函数(例程),只是消费者线程执行该函数,不要搞混)
int main(int argc, char const *argv[])
{
    int err, i;
    struct job_st *jp;
    pthread_t tids[5];
    main_qp = queue_init();
    for (i = 0; i < 5; i++)
    {
        err = pthread_create(tids + i, NULL, thr_fun, (void *)i);//创建消费者线程
        if (err)
        {
            fprintf(stderr, "thread created %s\n", strerror(err));
            exit(err);
        }
    }
    for (i = 0; i < 10; i++)
    {
        jp = malloc(sizeof(*jp));
        jp->id = i;
        jp->tid = tids[i%5];
        queue_insert(main_qp, jp);//向链表中插入数据(生成任务)
    }
    sleep(5);//五秒后取消消费者线程
    for (i = 0; i < 5; i++)
        pthread_cancel(tids[i]);
    queue_destory(main_qp);
    main_qp = NULL;
    for (i = 0; i < 5; i++)
    {
        err = pthread_join(tids[i], NULL);//回收子线程资源
        if (err)
        {
            fprintf(stderr, "thread join %s\n", strerror(err));
            exit(err);
        }
    }
    return 0;
}
执行结果: 
th3: 3
th3: 8
th1: 6
th1: 1
th2: 7
th2: 2
th0: 5
th0: 0
th4: 9
th4: 4
th3: cleanup
th1: cleanup
th2: cleanup
th4: cleanup
th0: cleanup
    可以看到对应的线程消费了与其线程id匹配的任务。

3.条件变量

pthread_cond_t cond;
cond = PTHRAD_COND_INITIALIZER;
pthread_cond_init(&cond, NULL);
1.初始化
int pthread_cond_init (pthread_cond_t *__cond,
                              const pthread_condattr_t *__cond_attr);
2.销毁
/* Destroy condition variable COND.  */
int pthread_cond_destroy (pthread_cond_t *__cond);
3.阻塞线程,等待条件变量
/* Wait for condition variable COND to be signaled or broadcast.
   MUTEX is assumed to be locked before.

   This function is a cancellation point and therefore not marked with
   __THROW.  */
int pthread_cond_wait (pthread_cond_t *__cond,
                              pthread_mutex_t *__mutex);

/* Wait for condition variable COND to be signaled or broadcast until
   ABSTIME.  MUTEX is assumed to be locked before.  ABSTIME is an
   absolute time specification; zero is the beginning of the epoch
   (00:00:00 GMT, January 1, 1970).

   This function is a cancellation point and therefore not marked with
   __THROW.  */
int pthread_cond_timedwait (pthread_cond_t *__cond,pthread_mutex_t *__mutex,
                                   const struct timespec *__abstime);
4.唤醒被阻塞的线程
//给某一条别阻塞的线程发通知,那一条不确定,其余线程继续阻塞
/* Wake up one thread waiting for condition variable COND.  */
int pthread_cond_signal (pthread_cond_t *__cond);

//给所有被阻塞的线程发通知,抢到通知的线程运行,其余线程继续阻塞
/* Wake up all threads waiting for condition variables COND.  */
int pthread_cond_broadcast (pthread_cond_t *__cond);

在上面互斥量的两个例子中,当不满足线程处理的条件时,程序一直处于循环等待,这就使得CPU一直处于忙等状态,浪费CPU资源。我们可以使用条件变量改善这种情况,使线程在不满足条件时,处于阻塞态,不消耗CPU。下面是改进后的两个例子。

5.按顺序输出abcd
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>

#define N 4
static pthread_t tids[N];

static pthread_mutex_t lock = PTHREAD_COND_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;//引入条件变量
static int pos = 0;

static int get_next(int i)
{
    if (i < N - 1)
        return i + 1;
    else
        return 0;
}

static void *pth_fun(void *arg)
{
    int n = (int)arg;
    char buf[] = {'a' + n};
    while (1)
    {
        pthread_mutex_lock(&lock);
        while (!pthread_equal(tids[pos], pthread_self()))//是否满足条件
        {
            pthread_cond_wait(&cond, &lock);//不满足,阻塞,等待通知
            /*相当于
            pthread_mutex_unlock(&lock);
            while(&cond);//只不过,这儿不是死循环,忙等,而是阻塞线程
            pthread_mutex_lock(&lock);
            */
        }
        //putchar(ch + n);
        write(STDOUT_FILENO, buf, 1);
        pos = get_next(pos);//修改条件
        pthread_mutex_unlock(&lock);
        pthread_cond_broadcast(&cond);//通知被阻塞的线程,持有cond的线程等会被通知到
    }
    pthread_exit((void *)0);
}

int main(int argc, char const *argv[])
{
    int err, i;
    pthread_mutex_lock(&lock);
    for (i = 0; i < N; i++)
    {
        err = pthread_create(tids + i, NULL, pth_fun, (void *)i);
        if (err)
        {
            fprintf(stderr, "pthread_creare() %s\n", strerror(err));
            exit(err);
        }
    }
    pthread_mutex_unlock(&lock);
    pthread_cond_broadcast(&cond);
    alarm(3);//三秒后,终止进程
    for (i = 0; i < N; i++)
    {
        pthread_join(tids[i], NULL);
    }
    return 0;
}
6.多线程下双向链表的使用
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>

struct job_st
{
    struct job_st *prev;
    struct job_st *next;
    pthread_t tid;
    int id;
};

struct queue_st
{
    struct job_st *head;
    struct job_st *tail;
    pthread_rwlock_t q_lock;
};

//初始化链表
static struct queue_st *queue_init()
{
    struct queue_st *ptr;
    int err;
    if ((ptr = malloc(sizeof(*ptr))) == NULL)
    {
        return NULL;
    }
    ptr->head = NULL;
    ptr->tail = NULL;
    err = pthread_rwlock_init(&ptr->q_lock, NULL);
    if (err)
    {
        perror("lock init()");
        free(ptr);
        return NULL;
    }
    return ptr;
}
//前插
static void queue_insert(struct queue_st *qp, struct job_st *jp)
{
    pthread_rwlock_wrlock(&qp->q_lock); //要修改链表,加写锁
    jp->next = qp->head;                //将当前节点的后继指向链表的头结点。
    jp->prev = NULL;                    //当前节点作为头节点,没有前驱节点。
    if (qp->head != NULL)               //如果先前链表的头节点不为空,将其前驱节点设为当前节点,
    {
        qp->head->prev = jp;
    }
    else //否则,说明原先的链表中没有节点,将链表的尾节点也设为当前节点
    {
        qp->tail = jp;
        //这种情况,链表中只有一个节点,jp,jp->next = NULL,jp->prev = NULL
    }
    qp->head = jp; //链表头节点指向当前节点
    pthread_rwlock_unlock(&qp->q_lock);
}

//后插
static void queue_append(struct queue_st *qp, struct job_st *jp)
{
    pthread_rwlock_wrlock(&qp->q_lock); //要修改链表,加写锁
    jp->prev = qp->tail;                //当前节点的前驱指向链表的尾节点。
    jp->next = NULL;
    if (qp->tail != NULL)
    {
        qp->tail->next = jp; //尾节点后继,指向当前节点
    }
    else
    {
        qp->head = jp;
    }
    qp->tail = jp; //尾节点指向当前节点。
    pthread_rwlock_unlock(&qp->q_lock);
}

static void queue_remove(struct queue_st *qp, struct job_st *jp)
{
    if (jp == NULL)
    {
        return;
    }
    pthread_rwlock_wrlock(&qp->q_lock); //要修改链表,加写锁
    struct job_st *p;
    if (jp == qp->head) //当前节点是头节点时
    {
        qp->head = jp->next;
        if (qp->tail == jp) //当前节点也是尾节点
        {
            qp->tail = NULL;
        }
        else
        {
            qp->head->prev = jp->prev;
        }
    }
    else if (jp == qp->tail) //当前节点是尾节点时,
    {
        qp->tail = jp->prev;
        qp->tail->next = jp->next;
    }
    else
    {
        jp->next->prev = jp->prev;
        jp->prev->next = jp->next;
    }
    pthread_rwlock_unlock(&qp->q_lock);
    free(jp);
}

static struct job_st *queue_find(struct queue_st *qp, pthread_t tid)
{
    pthread_rwlock_rdlock(&qp->q_lock); //查找数据,不会修改链表,加读锁
    struct job_st *jp = NULL;
    for (jp = qp->head; jp != NULL; jp = jp->next)
    {
        if (pthread_equal(tid, jp->tid))
        {
            break;
        }
    }
    pthread_rwlock_unlock(&qp->q_lock);
    return jp;
}

static void queue_destory(struct queue_st *qp)
{
    struct job_st *jp = qp->head, *tmp;
    pthread_rwlock_wrlock(&qp->q_lock);
    while (jp != NULL)
    {
        tmp = jp;
        jp = jp->next;
        free(jp);
    }
    pthread_rwlock_unlock(&qp->q_lock);
    pthread_rwlock_destroy(&qp->q_lock);
    free(qp);
}

static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;

struct queue_st *main_qp = NULL;
static void clean_fun(void *arg)
{
    printf("th%d: cleanup\n", (int)arg);
}

static void *thr_fun(void *arg)
{
    int th = (int)arg;
    struct job_st *jp;
    pthread_cleanup_push(clean_fun, (void *)th);
    pid_t pid;
    while (main_qp)
    {
        pthread_mutex_lock(&mut);
        while (main_qp && (jp = queue_find(main_qp, pthread_self())) == NULL)//判断
        {
            pthread_cond_wait(&cond, &mut);//不满足继续阻塞
        }
        pthread_mutex_unlock(&mut);//先解锁,在通知
        pthread_cond_broadcast(&cond);//给所有线程发通知,抢到的线程运行,没抢到的继续阻塞
        if (main_qp)
        {
            printf("th%d: %d\n", th, jp->id);
            queue_remove(main_qp, jp);
        }
    }
    pthread_cleanup_pop(1);
    pthread_exit((void *)0);
}

int main(int argc, char const *argv[])
{
    int err, i;
    struct job_st *jp;
    pthread_t tids[5];
    main_qp = queue_init();
    for (i = 0; i < 5; i++)
    {
        err = pthread_create(tids + i, NULL, thr_fun, (void *)i);
        if (err)
        {
            fprintf(stderr, "thread created %s\n", strerror(err));
            exit(err);
        }
    }
    for (i = 0; i < 10; i++)
    {
        jp = malloc(sizeof(*jp));
        jp->id = i;
        jp->tid = tids[i % 5];
        queue_insert(main_qp, jp);
    }
    sleep(2);
    queue_destory(main_qp);
    main_qp = NULL;
    pthread_mutex_unlock(&mut);
    pthread_cond_broadcast(&cond);
    for (i = 0; i < 5; i++)
    {
        err = pthread_join(tids[i], NULL);
        if (err)
        {
            fprintf(stderr, "thread join %s\n", strerror(err));
            exit(err);
        }
    }
    pthread_mutex_destroy(&mut);
    pthread_cond_destroy(&cond);
    return 0;
}
执行结果:
th4: 9
th4: 4
th3: 8
th3: 3
th2: 7
th2: 2
th1: 6
th0: 5
th0: 0
th1: 1
th2: cleanup
th3: cleanup
th4: cleanup
th1: cleanup
th0: cleanup
举报

相关推荐

0 条评论