0
点赞
收藏
分享

微信扫一扫

[并发编程]设置互斥锁的健壮性

互斥锁的健壮性

互斥锁的健壮性,指的是当持有锁的线程/进程在未释放 锁的情况下退出时,可以通过其他操作将锁恢复到正常状态。可以通过设置锁的PTHREAD_MUTEX_ROBUST属性来达到该目的。

相关接口

#include <pthread.h>

// __attr:互斥锁的属性
// __robustness:可取值PTHREAD_MUTEX_STALLED 或 PTHREAD_MUTEX_ROBUST 
int pthread_mutexattr_setrobust (pthread_mutexattr_t *__attr,
                                  int __robustness);
int pthread_mutexattr_getrobust (const pthread_mutexattr_t *__attr,
			         int *__robustness);

// 当在健壮性的互斥锁上调用pthread_mutex_lock返回EOWNERDEAD的时候
// 可以使用该函数恢复互斥锁的状态
int pthread_mutex_consistent (pthread_mutex_t *__mutex);

PTHREAD_MUTEX_STALLED

默认属性,如果用PTHREAD_MUTEX_STALLED属性初始化互斥体,并且它的所有者没有解锁它就死了,互斥体之后仍然是锁定的,以后任何在互斥体上调用pthread_mutex_lock (3)的尝试都将永久陷入阻塞。

PTHREAD_MUTEX_ROBUST

如果使用PTHREAD_MUTEX_ROBUST属性初始化互斥体,并且其所有者在未解锁的情况下死亡,则以后对此互斥体调用pthread_mutex_lock (3)的任何尝试都将成功,并返回 EOWNERDEAD 以指示原始所有者已不存在,互斥体处于不一致状态。通常在 EOWNERDEAD 返回后,下一个拥有者应该在获取的互斥体上调用pthread_mutex_consistent (3),以使其再次一致,然后再使用它。

如果下一个所有者在使互斥体保持一致之前使用pthread_mutex_unlock (3)解锁互斥体,则互斥体将永久不可用,并且随后使用pthread_mutex_lock (3)锁定互斥体的任何尝试都将失败,错误为ENOTRECOVERABLE。 在这样的互斥体上唯一允许的操作是pthread_mutex_destroy (3)。如果下一个所有者在调用pthread_mutex_consistent (3)之前终止,则在此互斥体上进一步的pthread_mutex_lock (3)操作仍将返回EOWNERDEAD

使用实例

下面是我们在实际的项目开发上遇到的一个例子。为了进行跨进程的同步,我们将互斥锁和条件变量都创建在了共享内存上,并且设置了互斥锁的PTHREAD_PROCESS_SHARED属性。但是,当进程A在持有锁的时候死掉,进程B正好处于等待加锁的状态,此时进程B将陷入死锁的状态。我们可以通过设置进程锁的``属性,当出现上述情况的时候,pthread_mutex_lock将立马返回EOWNERDEAD,此时可以通过pthread_mutex_consistent将进程所恢复正常。

核心代码如下:

        // 
        pthread_mutexattr_setrobust(&mutexattr,PTHREAD_MUTEX_ROBUST);

        int ret = pthread_mutex_lock(&_shm_mutex->_mutex);
        if (ret != 0)
        {
            if (ret == EOWNERDEAD)
            {
                if (pthread_mutex_consistent(&_shm_mutex->_mutex) == -1)
                {
                    exit_on_error(true,"pthread_condattr_destroy failed");
                }
                else 
                {
                    printf("pthread_mutex_consistent OK\r\n");
                }
            }
            else 
            {
                printf("pthread_mutex_lock failed,code:%d\r\n",ret);
                exit_on_error(true,"pthread_mutex_lock failed!");
            }
        }

完整代码示例

测试步骤:

  1. 编译代码
  2. 先启动productor,它会在持有锁的情况下休眠20s,然后再解锁
  3. 再运行consumer,它会去尝试获取锁
  4. 当productor在持有锁的过程中,将productor kill掉
  5. 此时可以看到consumer通过pthread_mutex_consistent将锁恢复。

如果在创建锁的时候不设置PTHREAD_MUTEX_ROBUST属性,然后将加锁的地方进行pthread_mutex_consistent的操作进行屏蔽,重复上面的1、2、3、4操作,将看到consumer永远的阻塞在了等待锁的地方。

// shm.h
#pragma once

#include <errno.h>
#include "sys.h"
#include <sys/mman.h>        // for shm_open
#include <sys/stat.h>        /* For mode constants */
#include <fcntl.h>           /* For O_* constants */
#include <pthread.h>         // for pthread_xx
#include <assert.h> // for assert
#include <unistd.h>
#include <stdlib.h>
#include <string.h>

struct SHM_MUTEX
{
    pthread_mutex_t _mutex;
    pthread_cond_t _cond;
};

static const char *shm_cond_name = "shm_cond_name";

class SHM_SYNC_COND
{
public:
    SHM_SYNC_COND() = default;
    bool init(const char *shm_name,size_t elm_size,size_t eml_count)
    {
        assert(_data == nullptr);
        assert(_shm_mutex == nullptr);
        
        bool first_create = false;
        struct stat st = {0};
        char shm_path[256];
        strcpy(shm_path,"/dev/shm/");
        strcat(shm_path,shm_cond_name);
        printf("shm path:%s\r\n",shm_path);
        if (stat(shm_path,&st) != 0)
        {
            first_create = true;
        }

        printf("first create shm file:%s\r\n",(first_create?"true":"false"));
        size_t s = sizeof(SHM_MUTEX) + elm_size * eml_count;// 计算内存大小
        int shm_fd = shm_open(shm_name,O_CREAT | O_RDWR,0666);// 创建/打开共享内存文件
        exit_on_error(shm_fd < 0,"shm_open failed!");

        if (first_create)
        {
            printf("fisrt to truncate shm file\r\n");
            int ret = ftruncate(shm_fd,s); // 截断共享文件大小
            exit_on_error(ret < 0,"ftruncate failed!");
        }

        void *addr = mmap(NULL,s,PROT_WRITE | PROT_READ,MAP_SHARED,shm_fd,0);// 将共享内存文件进行内存映射
        exit_on_error(addr == (void *)-1,"mmap failed");

        _shm_mutex = (SHM_MUTEX *)addr;// 获取共享内存锁
        _data = (char *)addr + sizeof(SHM_MUTEX);

        if (first_create)
        {
            printf("init mutex and cond\r\n");
            pthread_mutexattr_t mutexattr;// 设置 mutex 的 PTHREAD_PROCESS_SHARED 属性
            int ret = pthread_mutexattr_init(&mutexattr);
            exit_on_error(ret < 0,"pthread_mutexattr_init failed");
            ret = pthread_mutexattr_setpshared(&mutexattr,PTHREAD_PROCESS_SHARED);
            exit_on_error(ret < 0,"pthread_mutexattr_setpshared failed");
            ret = pthread_mutexattr_setrobust(&mutexattr,PTHREAD_MUTEX_ROBUST);
            exit_on_error(ret < 0,"pthread_mutexattr_setrobust failed");
            ret = pthread_mutex_init(&_shm_mutex->_mutex,&mutexattr);
            exit_on_error(ret < 0,"pthread_mutex_init failed");
            ret = pthread_mutexattr_destroy(&mutexattr);
            exit_on_error(ret < 0,"pthread_mutexattr_destroy failed");


            pthread_condattr_t condattr;// 设置 cond 的 PTHREAD_PROCESS_SHARED 属性
            ret = pthread_condattr_init(&condattr);
            exit_on_error(ret < 0,"pthread_condattr_init failed");
            ret = pthread_condattr_setpshared(&condattr,PTHREAD_PROCESS_SHARED);
            exit_on_error(ret < 0,"pthread_condattr_setpshared failed");
            ret = pthread_cond_init(&_shm_mutex->_cond,&condattr);
            exit_on_error(ret < 0,"pthread_cond_init failed");
            ret = pthread_condattr_destroy(&condattr);
            exit_on_error(ret < 0,"pthread_condattr_destroy failed");
        }
        return true;
    }

    void notify()// 跨进程进行条件变量通知
    {
        assert(_data != nullptr);
        assert(_shm_mutex != nullptr);
        static int32_t index = 0;
        pthread_mutex_lock(&_shm_mutex->_mutex); 
        printf("sleep 20s to notify %d..\r\n",++index);
        sleep(10);
        pthread_cond_broadcast(&_shm_mutex->_cond);
        pthread_mutex_unlock(&_shm_mutex->_mutex);
    }

    void wait(int32_t wait_sec)// 跨进程进行条件变量等待
    {
        assert(_data != nullptr);
        assert(_shm_mutex != nullptr);


        struct timeval now;
        struct timespec abstime;
        gettimeofday(&now,NULL);
        printf("wait sec:%ld,nsec:%ld,wait_sec:%d\r\n",now.tv_sec,now.tv_usec*1000,wait_sec);
        abstime.tv_nsec = (now.tv_usec) * 1000;
        abstime.tv_sec = now.tv_sec + wait_sec;
        int ret = pthread_mutex_lock(&_shm_mutex->_mutex);
        if (ret != 0)
        {
            if (ret == EOWNERDEAD)
            {
                if (pthread_mutex_consistent(&_shm_mutex->_mutex) == -1)
                {
                    exit_on_error(true,"pthread_condattr_destroy failed");
                }
                else 
                {
                    printf("pthread_mutex_consistent OK\r\n");
                }
            }
            else 
            {
                printf("pthread_mutex_lock failed,code:%d\r\n",ret);
                exit_on_error(true,"pthread_mutex_lock failed!");
            }
        }
        printf("wait sec:%ld,nsec:%ld\r\n",abstime.tv_sec,abstime.tv_nsec);
        exit_on_error(pthread_cond_timedwait(&_shm_mutex->_cond,&_shm_mutex->_mutex,&abstime) == -1,"pthread_cond_timedwait failed");
        exit_on_error(pthread_mutex_unlock(&_shm_mutex->_mutex) == -1,"pthread_mutex_unlock failed");
    }

    void *data_buf() const 
    {
        assert(_data != nullptr);
        return _data;
    }

private:
    SHM_MUTEX * _shm_mutex{nullptr};
    void *_data{nullptr};
};

struct IPC_DATA // 跨进程通信时使用的结构体
{
    pid_t _pid{0};
    char  _msg[256];
};
//consumer.cc
#include <sys/time.h>
#include <sys/types.h>
#include <sys/shm.h>
#include <stdlib.h>
#include <stdio.h>
#include "sys.h"
#include <unistd.h>
#include "./shm.h"



int main()
{
    const char *shm_name = shm_cond_name; // 共享内存文件名
    SHM_SYNC_COND shm_cond;
    size_t elm_size = sizeof(IPC_DATA);
    size_t elm_count = 1;
    bool ret = shm_cond.init(shm_name, elm_size,elm_count);// 创建共享锁以及内存映射
    exit_on_error(ret == false,"create shm cond failed");
    IPC_DATA *datas = (IPC_DATA *)shm_cond.data_buf();// 获取开辟的内存映射数据地址

    while(true)
    {
        static size_t index = 0;
        printf("consumer wait for msg index:%lu\r\n");
        shm_cond.wait(5); // 如果没有生产者进行notify,则需要等待10s超时,此处在10s内就已经被唤醒,说明条件变量跨进程生效
        printf("wait for index:%lu,msg:%s",index,datas[0]._msg);
        index++;
    }
    return 0;
}
// productor.cc
#include <sys/time.h>
#include <sys/types.h>
#include <sys/shm.h>
#include <stdlib.h>
#include <stdio.h>
#include "sys.h"
#include <unistd.h>

#include "./shm.h"

int main()
{
    const char *shm_name = shm_cond_name; // 共享内存文件名
    SHM_SYNC_COND shm_cond;
    size_t elm_size = sizeof(IPC_DATA);
    size_t elm_count = 1;
    shm_cond.init(shm_name, elm_size,elm_count);// 创建共享锁以及内存映射
    IPC_DATA *datas = (IPC_DATA *)shm_cond.data_buf();// 获取开辟的内存映射数据地址

    while(true) // 生产者,每隔5s发送一次数据,让消费者在被kill的时候阻塞在条件变量上
    {
        static size_t index = 0;
        usleep(1000*1000*1);
        datas[0]._pid = getpid();
        snprintf(datas[0]._msg,sizeof(datas[0]._msg),"[productor]write msg,pid:%d,index:%lu\r\n",getpid(),index);
        shm_cond.notify();
        index++;
    }

    return 0;
}
project(shm-mutex-cond-robust)

set(bin_name ${PROJECT_NAME})

set(CMAKE_CXX_FLAGS "-pthread")

aux_source_directory(${CMAKE_CURRENT_SOURCE_DIR} SRC_FILES)

set(consumer_src consumer.cc)
set(productor_src productor.cc)
add_executable(consumer-robost ${consumer_src})
add_executable(productor-robost ${productor_src})

target_link_libraries(consumer-robost rt)
target_link_libraries(productor-robost rt)
举报

相关推荐

0 条评论