0
点赞
收藏
分享

微信扫一扫

超轻量 pthread 集结点实现


我需要的 pthread 线程集结点功能,使用同一集结点的线程将通过 rend_wait 函数等待,当集结点到达指定数量的线程后同时激发继续执行。使用 pthread 的 mutex 和 cond 超轻量实现。下面 rend.h 是集结点实现,rendezvous.c 是测试应用。

 


​​



  1. /*
  2.  * rend.h
  3.  *
  4.  *  Created on: 2009-11-14
  5.  *      Author: liuzy (lzy.dev@gmail.com)
  6.  */
  7.   
  8. #ifndef REND_H_
  9. #define REND_H_
  10.   
  11. #include <pthread.h>
  12. #include <assert.h>
  13.   
  14. struct
  15. volatile int
  16.     pthread_mutex_t count_lock;   
  17.     pthread_cond_t ready;   
  18. };   
  19.   
  20. #define DECLARE_REND(name, count) /
  21. struct
  22.   
  23. int rend_init(struct rend_t* prend, int
  24. int
  25.   
  26.     assert(prend);   
  27.   
  28.     prend->count = count;   
  29.   
  30. if
  31. return
  32.   
  33. if
  34. return
  35.   
  36. return
  37. }   
  38.   
  39. int rend_wait(struct
  40. int
  41.   
  42.     assert(prend);   
  43.   
  44. if
  45. return
  46.   
  47. /* check count value is ready to weak up block code */
  48. if
  49. if
  50. return
  51.   
  52. if
  53. return
  54. else
  55.         prend->count--;   
  56.   
  57.         ret = pthread_cond_wait(&prend->ready, &prend->count_lock);   
  58.         prend->count++;   
  59.   
  60. if
  61.             pthread_mutex_unlock(&prend->count_lock);   
  62. return
  63.         }   
  64.   
  65. if
  66. return
  67.     }   
  68.   
  69. return
  70. }   
  71.   
  72. int rend_free(struct
  73. int
  74.   
  75.     assert(prend);   
  76.   
  77.     prend->count = 0;   
  78.   
  79. if
  80. return
  81.   
  82. if
  83. return
  84.   
  85. return
  86. }   
  87.   
  88. #endif /* REND_H_ */


/* * rend.h * * Created on: 2009-11-14 * Author: liuzy (lzy.dev@gmail.com) */ #ifndef REND_H_ #define REND_H_ #include <pthread.h> #include <assert.h> struct rend_t { volatile int count; pthread_mutex_t count_lock; pthread_cond_t ready; }; #define DECLARE_REND(name, count) / struct rend_t name = {(count), PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER} int rend_init(struct rend_t* prend, int count) { int ret = 0; assert(prend); prend->count = count; if ((ret = pthread_mutex_init(&prend->count_lock, NULL))) return ret; if ((ret = pthread_cond_init(&prend->ready, NULL))) return ret; return EXIT_SUCCESS; } int rend_wait(struct rend_t* prend) { int ret = 0; assert(prend); if ((ret = pthread_mutex_lock(&prend->count_lock))) return ret; /* check count value is ready to weak up block code */ if (prend->count == 1) { if ((ret = pthread_cond_broadcast(&prend->ready))) return ret; if ((ret = pthread_mutex_unlock(&prend->count_lock))) return ret; } else { prend->count--; ret = pthread_cond_wait(&prend->ready, &prend->count_lock); prend->count++; if (ret) { pthread_mutex_unlock(&prend->count_lock); return ret; } if ((ret = pthread_mutex_unlock(&prend->count_lock))) return ret; } return EXIT_SUCCESS; } int rend_free(struct rend_t* prend) { int ret = 0; assert(prend); prend->count = 0; if ((ret = pthread_mutex_destroy(&prend->count_lock))) return ret; if ((ret = pthread_cond_destroy(&prend->ready))) return ret; return EXIT_SUCCESS; } #endif /* REND_H_ */


 

rend 使用更简单:

 

  1. 定义/初始化 rend_t 集结点对象。DECLARE_REND 宏用于静态定义,rend_init 函数可以对动态创建的集结点结构初始化;
  2. pthread 线程通过调用 rend_wait 函数 P/V 集结状态。集结关系的线程要 P/V 在同一个 rend_t 集结对象上;
  3. 释放集结对象,rend_free 函数。

以上函数都是成功返回 0,出错返回 errno 值(非 0)。

 

 



  1. /*
  2.  ==============================
  3.  Name        : rendezvous.c
  4.  Author      : liuzy (lzy.dev@gmail.com)
  5.  Version     : 0.1
  6. ==============================
  7.  */
  8.   
  9. #include <stdio.h>
  10. #include <stdlib.h>
  11. #include <stdarg.h>       /* va_list */
  12. #include <unistd.h>
  13. #include <string.h>
  14. #include <errno.h>        /* errno */
  15. #include <syslog.h>       /* for syslog(2) and level */
  16. #include <pthread.h>
  17.   
  18. #include "rend.h"
  19.   
  20. static int daemon_proc = 0; /* for syslog in err_doit */
  21.   
  22. #define MAXLINE 4096        /* max text line length */
  23.   
  24. void err_doit(int errnoflag, int level, const char* fmt, va_list
  25.   
  26. char
  27. int
  28.   
  29. #ifdef HAVE_VSNPRINTF
  30.     vsnprintf(buf, MAXLINE, fmt, ap);   
  31. #else
  32.     vsprintf(buf, fmt, ap);   
  33. #endif  /* HAVE_VSNPRINTF */
  34.   
  35.     n = strlen(buf);   
  36. if
  37. ": %s", strerror(errno_save));   
  38. "/n");   
  39.   
  40. if
  41. "%s", buf);   
  42. else
  43.         fflush(stdout);   
  44.         fputs(buf, stderr);   
  45.         fflush(stderr);   
  46.     }   
  47.   
  48. return;   
  49. }   
  50.   
  51. void err_msg(const char* fmt, ...) {   
  52. va_list
  53.   
  54.     va_start(ap, fmt);   
  55.     err_doit(0, LOG_INFO, fmt, ap);   
  56.     va_end(ap);   
  57.   
  58. return;   
  59. }   
  60.   
  61. void err_sys(const char* fmt, ...) {   
  62. va_list
  63.   
  64.     va_start(ap, fmt);   
  65.     err_doit(1, LOG_ERR, fmt, ap);   
  66.     va_end(ap);   
  67.   
  68.     exit(EXIT_FAILURE);   
  69. }   
  70.   
  71. #define THREAD_COUNT 100    /* rendezvous test thread workers */
  72.   
  73. struct
  74. int
  75. struct
  76. };   
  77.   
  78. static void* pthread_worker(void* arg) {   
  79. struct worker_arg* parg = (struct
  80.   
  81. "worker #%d running.", (int) parg->worker_id);   
  82.   
  83.     srand(parg->worker_id * 2);   
  84.     sleep(rand() % 5);   
  85.   
  86. /* workers rendezvous */
  87.   
  88. "worker #%d exiting.", (int) parg->worker_id);   
  89.   
  90. return
  91. }   
  92.   
  93. int main(void) {   
  94. int
  95. void* exitcode = NULL;   
  96.   
  97.     pthread_t thds[THREAD_COUNT];   
  98. struct
  99.     DECLARE_REND(rend, THREAD_COUNT);   
  100.   
  101. "workers creating.");   
  102.   
  103. for
  104.         arg[idx].prend = &rend;   
  105.         arg[idx].worker_id = idx;   
  106.   
  107. if (pthread_create(thds + idx, NULL, pthread_worker, (void*) &arg[idx]))   
  108. "worker #%d create error.", idx);   
  109.     }   
  110.   
  111. "workers exiting.");   
  112.   
  113. for
  114. if
  115. "worker #%d exit error.", idx);   
  116.   
  117. "all done. exit 0.");   
  118.   
  119.     rend_free(&rend);   
  120.   
  121. return
  122. }  


/* ============================== Name : rendezvous.c Author : liuzy (lzy.dev@gmail.com) Version : 0.1 ============================== */ #include <stdio.h> #include <stdlib.h> #include <stdarg.h> /* va_list */ #include <unistd.h> #include <string.h> #include <errno.h> /* errno */ #include <syslog.h> /* for syslog(2) and level */ #include <pthread.h> #include "rend.h" static int daemon_proc = 0; /* for syslog in err_doit */ #define MAXLINE 4096 /* max text line length */ void err_doit(int errnoflag, int level, const char* fmt, va_list ap) { char buf[MAXLINE + 1] = { 0 }; int errno_save = errno, n = 0; #ifdef HAVE_VSNPRINTF vsnprintf(buf, MAXLINE, fmt, ap); #else vsprintf(buf, fmt, ap); #endif /* HAVE_VSNPRINTF */ n = strlen(buf); if (errnoflag) snprintf(buf + n, MAXLINE - n, ": %s", strerror(errno_save)); strcat(buf, "/n"); if (daemon_proc) { syslog(level, "%s", buf); } else { fflush(stdout); fputs(buf, stderr); fflush(stderr); } return; } void err_msg(const char* fmt, ...) { va_list ap; va_start(ap, fmt); err_doit(0, LOG_INFO, fmt, ap); va_end(ap); return; } void err_sys(const char* fmt, ...) { va_list ap; va_start(ap, fmt); err_doit(1, LOG_ERR, fmt, ap); va_end(ap); exit(EXIT_FAILURE); } #define THREAD_COUNT 100 /* rendezvous test thread workers */ struct worker_arg { int worker_id; struct rend_t* prend; }; static void* pthread_worker(void* arg) { struct worker_arg* parg = (struct worker_arg*) arg; err_msg("worker #%d running.", (int) parg->worker_id); srand(parg->worker_id * 2); sleep(rand() % 5); rend_wait(parg->prend); /* workers rendezvous */ err_msg("worker #%d exiting.", (int) parg->worker_id); return EXIT_SUCCESS; } int main(void) { int idx = 0; void* exitcode = NULL; pthread_t thds[THREAD_COUNT]; struct worker_arg arg[THREAD_COUNT]; DECLARE_REND(rend, THREAD_COUNT); err_msg("workers creating."); for (idx = 0; idx < THREAD_COUNT; idx++) { arg[idx].prend = &rend; arg[idx].worker_id = idx; if (pthread_create(thds + idx, NULL, pthread_worker, (void*) &arg[idx])) err_sys("worker #%d create error.", idx); } puts("workers exiting."); for (idx = 0; idx < THREAD_COUNT; idx++) if (pthread_join(thds[idx], &exitcode) || (exitcode != EXIT_SUCCESS)) err_msg("worker #%d exit error.", idx); err_msg("all done. exit 0."); rend_free(&rend); return EXIT_SUCCESS; }


 

          看了下 semaphore os syscall 及其 infrastructure,也许以后还需要进程间(非 pthread)集结时用得上。kernel 实现的超强啊,呵呵~

 

// 2009.11.17 14:34 添加

 


快速用户空间互斥锁(Futex)


快速用户空间互斥锁(fast userspace mutex,Futex)是快速的用户空间的锁,是对传统的System V同步方式的一种替代,传统同步方式如:信号量、文件锁和消息队列,在每次锁访问时需要进行系统调用。而futex仅在有竞争的操作时才用系统调用访问内核,这样,在竞争出现较少的情况下,可以大幅度地减少工作负载
futex在非竞争情况下可从用户空间获取和释放,不需要进入内核。与信号量类似,它有一个可以原子增减的计数器,进程可以等待计数器值变为正数。用户进程通过系统调用对资源的竞争作一个公断。
futex 是一个用户空间的整数值,被多个线程或进程共享。Futex的系统调用对该整数值时进行操作,仲裁竞争的访问。 glibc中的NPTL库封装了futex 系统调用,对futex接口进行了抽象。用户通过NPTL库像传统编程一样地使用线程同步API函数,而不会感觉到futex的存在。
futex 的实现机制是:如果当前进程访问临界区时,该临界区正被另一个进程使用,当前进程将锁用一个值标识,表示“有一个等待者正挂起”,并且调用 sys_futex(FUTEX_WAIT)等待其他进程释放它。内核在内部创建futex队列,以便以后与唤醒者匹配等待者。当临界区拥有者线程释放了 futex,它通过变量值发出通知表示还有多个等待者在挂起,并调用系统调用sys_futex(FUTEX_WAKE)唤醒它们。一旦所有等待者已获取资源并释放锁时,futex回到非竞争状态,并没有内核状态与它相关。
robust futex是为了解决futex锁崩溃而对futex进行了增强。例如:当一个进程在持有pthread_mutex_t锁正与其他进程发生竞争时,进程因某种意外原因而提前退出,如:进程发生段错误,或者被用户用shell命令kill -9-ed”强行退出,此时,需要有一种机制告诉等待者“锁的最一个持有者已经非正常地退出”。“
为了解决此类问题,NPTL创建了robust mutex用户空间API pthread_mutex_lock(),如果锁的拥有者进程提前退出,pthread_mutex_lock()返回一个错误值,新的拥有者进程可以决定是否可以安全恢复被锁保护的数据。


 

有几点不还不理解:

 

  1. “futex 如果说是一个用户空间的整数值,那怎么被多个进程共享?Futex 系统调用在 kernel 态怎么操作该值并仲裁竞争?这是那种直接映射到 userspace 的 kernel 地址么。这个需要程序间通过 mmap 在共享段中访问,与 futex 没什么关系。
  2. 这个“robust futex”机制指的应该就是 SVRx 传统 sem IPC 里的 SEM_UNDO flag 吧?

一篇不错的文章,引发对 glibc nptl 实现源码的探索:

​​关于信号量与线程互斥锁的区别与实现​​

举报

相关推荐

0 条评论