0
点赞
收藏
分享

微信扫一扫

rust嵌入式开发之基于await构造应用级临界区

程序员知识圈 04-16 21:30 阅读 2

在rust嵌入式开发之await一文中我们讨论了如何用await来实现异步操作的串行化。而并发编程时还有一个更重要的问题需要我们解决:资源竞争。

针对并发时的资源竞争,最简单的办法就是利用系统提供的临界区机制来互斥的使用资源。嵌入式rust提供了critical-section来提供临界区的原语,同时在cortex-m这样的crate中都加以了实现。

嵌入式的临界区有几种实现方式:

  • 单核无系统,关闭中断
  • 多核无系统,关闭中断加核心间的硬件自旋锁
  • ROTS,由系统以库函数/系统调用的方式提供

可以看到,临界区必须在硬件/或控制了硬件的系统【如rust的tock、c的rt-thread等】的支持下实现。如果没有系统,就只能通过关中断来实现互斥访问。

Embassy目前还只是一个有限的运行时,还不是一个ROTS,提供不了系统级的临界区。这就导致在用Embassy开发时,在需要用临界区解决资源竞争时必须快进快出,而无法用在串行化交互这种需要长期持有资源的场景中了,如通过RS485总线同时管理多台设备。

针对这个问题,笔者就考虑如何在应用层面提供不需要关中断就可以实现临界区保护的互斥锁。实质上,就是基于Embassy运行时来实现应用层面的互斥锁。

锁协议

嵌入式的应用场景比较简单,所以直接借鉴java的synchronized语义,即对象级的读写互斥锁,不支持共享读。其实,就嵌入式的应用来说,过于复杂的锁协议也没啥必要,属于过渡设计了。

此外,由于rust稳定版尚不支持异步闭包,所以锁的申请与释放必须分开。当然,对于FnOnce的闭包可以提供with来简化,但由于我们设计互斥锁的目的主要是用于异步串行化的资源长期持有,所以with语句用途有限。

所以呢,可长期持有的互斥锁的锁协议为:

  • 一个数据对象【代表一个资源】用一个可长期持有的锁来提供互斥性的临界区保护
  • 可长期持有的锁,应该有可配置的超时间隔
  • 可长期持有的锁允许竞争性申请,申请到锁的任务方可操作对应的受保护资源
  • 未申请到锁的任务应等待直至超时退出锁的竞争
  • 申请到锁的任务操作完毕后,应主动释放锁
  • 当锁释放时,如果有等待的任务,从中挑选一个授予锁

在锁的持有期内,完全可以执行各种await操作。

实现

由于笔者写的项目为商业项目,无法直接贴出源码,所以我们主要讨论原理并辅以说明性的伪码。

实现原理非常简单:

1、主要依托上篇文章讨论过的await机制,以Embassy运行时为基础来实现锁的超时与竞争调度

2、利用Embassy/嵌入式rust所提供的CriticalSectionRawMutex来保护对锁本身的操作,避免锁操作期间的再入问题

锁对象本身的定义非常简单:

pub struct Lock {
	//锁的内部数据,主要包括两个部分:
	//1、前篇文章中所提到的用于awake机制的waker等任务调度信息数据
	//2、竞争锁的排队数据,我是用BTreeMap来管理排队
	inner: _Lock,
	//由于锁的申请存在竞争,所以这两类锁的内部数据也是需要保护的,我用了CriticalSectionRawMutex
	//其可以提供跨线程的保护,也就是可以在中断中一样使用,在我使用的STM32F413芯片中其实就是关中断
	//所以所有的锁操作必须快进快出,要求尽可能的简短
    lock: Mutex<CriticalSectionRawMutex, bool>,
}

主要用来提供锁接口并实现对锁对象本身的互斥操作。

_Lock是锁实体,其主要提供对申请锁Future的管理,包括当前持有锁的Future、Future的ID管理以及所有申请锁的请求者队列管理。这些功能都很常规,我们无需赘述。

对_Lock的操作需要用CriticalSectionRawMutex进行保护,以避免再入。

申请锁的Future的poll函数示意如下:

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
    let id = self.id;
	//首先检查自己能否在竞争中获胜赢得锁
    if self.lock.check(id) {
        //竞争获胜
        Poll::Ready(LockCode::OK(id))
    }else if !self.polled {
        //第一次参加竞争,但失败了,需要准备waker,并设置超时。可参考上篇文章
        self.polled = true;
        let w: &core::task::Waker = cx.waker();
        self.waker = Some(w.clone());
        embassy_time_queue_driver::schedule_wake(self.expires_at.as_ticks(), w);
        Poll::Pending
    }else if self.expires_at <= Instant::now() {
        //超时了
        self.lock.remove(id);
        Poll::Ready(LockCode::Timeout)
    }else{
        //理论上执行不到,只是总得有个返回值
        Poll::Pending
    }
}

我们再看一下锁的check函数的竞争逻辑:

fn check(&self, id: u64) -> bool {
    //锁对象的操作需要用CriticalSectionRawMutex进行保护以避免再入
    self.lock(|p|{
        if p.current == id {
            //被唤醒并进行检查的Future,就是锁的持有者
            true
        }else if p.current == 0 {
            //锁目前没有人持有,所以立刻将锁变更为自己持有
            p.current = id;
            true
        }else{
            false
        }
    })
}

大家在编写Future的poll函数时必须牢记:一个waker只会执行一次

Waker的wake函数会自动删除自己:

// Don't call `drop` -- the waker will be consumed by `wake`.
crate::mem::forget(self);

所以我在这里所写的poll函数最多有两次执行机会:

  • Future创建后被第一次调度执行poll函数,此时如果锁没有持有者,则本Future将获得锁,此时就执行一次
  • 如果锁已经被其它Future持有,本Future就将被安排等待,这是第一次执行
  • 等待中的Future有两种可能被wake【超时、或锁被释放后自己被选中】,这是第二次执行

大家再看下poll函数,就会发现有一种状态是可以执行第三次的啊,即:check失败 + 已经poll过了 + 未超时。但这种情况我们必须避免出现。因为waker只能执行一次,如果出现这样的情况,这个Future将因为再无法被wake,而永远沉睡在系统任务队列中了。所以我们就需要设法防止这种状态的出现。

因此,在某Future被选中唤醒时,锁管理就会将锁先行授予该Future。即:

if let Some(w) = &n.waker {
    //这使得被wake后执行poll函数的check时,直接命中【p.current == id】而poll成功
    pb.current = n.id;
    w.clone().wake();
}

最后,获得锁后必须显式释放:

//获得锁对象,嵌入式比较简单,可以直接用静态的对象,但由于并发,所获得的锁对象不能是&mut
//这就要求锁的操作都不能是&mut self,而必须是&'self,这就是我们为什么需要外封装的原因
let lo = get_lock_...锁名...();
//竞争锁,10秒超时
let (rc, id, rd) = lo.wait(Duration::from_secs(10)).await;
if rc {
	if let Some(md) = rd {
		//在我的实现中,锁和待保护对象进行了泛型化的融合,md就是取到的数据对象
		...md是&mut的,所以可以进行修改等所有需要的操作...
		
		//还可以执行各种异步操作
		Timer::after_millis(1300).await;
		
		//必须显式释放锁,获取失败的id是0,即便调用release也无效
		lo.release(id);
	}
}else{
	//超时,可以在此执行容错处理
}
结语

以上,我们就获得了一个轻便而可靠的应用级的临界区互斥锁。

有了锁,我们就可以根据需要来对静态数据、融入泛型结构中提供数据保护了。

举报

相关推荐

0 条评论