0
点赞
收藏
分享

微信扫一扫

Trait与生命周期

用过android的同学对于handler应该都很了解,用起来比较方便。这里用rust设计了一个简单的rust。

1.处理接口

pub trait ProcessMessage {
    fn handleMessage(&self,msg:Message);
}

2.Message结构体

pub struct Message {
    pub what:u32,
    pub arg1:i32,
    pub arg2:i32,
    pub next_time:u64,
    pub next:Option<Rc<RefCell<Message>>>, //1
    pub target:Option<Arc<Box<dyn ProcessMessage>>>//2
}

1.指向下一个message,整个消息队列采用链表结构,使用option主要是为了确认是否还有message

2.target是一个回调实现,每个message都需要带有target,这样message就能由指定的handler处理了。

关键函数

impl Message {
    ....

    fn execute(&self) {
        match &self.target {
            None=> {},
            Some(handler) => {
                handler.handleMessage(
                Message{
                        what:self.what,
                        arg1:self.arg1,
                        arg2:self.arg2,
                        next_time:self.next_time,
                        next:None,
                        target:None
                    }
                )
            } //1
        }
    }
}

1.message处理的时候调用了ProcessMessage,这里主要考虑到使用的方便(不要再用什么unwrap),所以传出去的数据重新构造了一个新数据,性能上会有损耗

3.消息队列

pub struct MessageQueue {
    head:Mutex<Message>,//1
    cond:Condvar//2
}

1.消息队列头,为了减少option,这里规定了第一个消息一直不使用,所有消息从head的next开始排队

2.cond:如果没有消息,或者消息需要延迟发送,则使用这个做等待处理。每次有新消息加入的时候,重新针对消息队列排序,然后cond.notify一下,唤醒等待线程

4.Looper

struct Looper {
    queue:Arc<RefCell<MessageQueue>>,
}

Looper主要就是一个queue,

关键函数:

fn loop_self(&self) {
        loop {
            println!("loop self start");
            let msg = self.queue.clone().try_borrow_mut().unwrap().dequeue_message();//1
            println!("loop self trace1");
            match msg {
                None=>{},
                Some(m) => {
                    let _msg = m.clone();
                    _msg.try_borrow_mut().unwrap().execute();//2
                }
            }
        }
    }

这里loop_self主要是给一个线程做死循环用的。

1.从queue中获取message,如果没有则会在这里卡死

2.message执行

5.HandlerThread

struct HandlerThread {
    looper:Arc<Looper>
}

主要函数:

fn start(&self) {
        loop {
            self.looper.loop_self();//1
        }
    }

1.循环处理messagequeue的数据,这个start函数最后会在一个线程中调用

6.Handler

pub struct Handler {
    queue:Arc<RefCell<MessageQueue>>,
    processor:Arc<Box<dyn ProcessMessage>>,
}

关键函数:

pub fn new(processor:Box<dyn ProcessMessage>)->Self {
        println!("handler construct trace1", );
        let handler_th = HandlerThread::new();
        let handler = Handler {
            queue:handler_th.get_looper().queue.clone(),
            processor:Arc::new(processor)
        };

        println!("handler construct trace2", );
        thread::spawn(move||{
            handler_th.start();//1
        });
        
        handler
    }

1.上面数据结构的消息队列是在这里被驱动的。

测试代码:

struct MyProcessor {}

impl ProcessMessage for MyProcessor {
    fn handleMessage(&self,msg:Message) {
        println!("msg is {}",msg.what);
        //thread::sleep(Duration::from_secs(10));
    }
}

pub fn test_handler_send_empty_message() {
    println!("test_handler_send_empty_message trace1");
    let h = handler::Handler::new(Box::new(MyProcessor{}));
    println!("test_handler_send_empty_message trace2");
    h.send_empty_message(1);
    h.send_empty_message_delayed(2,1000);
    thread::sleep(Duration::from_secs(100));
}

全代码:

use std::borrow::Borrow;
use std::borrow::BorrowMut;
use std::cell::RefCell;
use std::cell::Cell;
use std::ptr::NonNull;

use std::rc::Rc;
use std::sync::Arc;
use std::sync::{Mutex,Condvar};
use std::thread;
use std::time::Duration;

use tokio::time::Interval;

use crate::system;


pub trait ProcessMessage {
    fn handleMessage(&self,msg:Message);
}

//---- Message ----
pub struct Message {
    pub what:u32,
    pub arg1:i32,
    pub arg2:i32,
    pub next_time:u64,
    pub next:Option<Rc<RefCell<Message>>>,
    pub target:Option<Arc<Box<dyn ProcessMessage>>>
}

impl Message {
    pub fn new(what:u32)->Self {
        Message {
            what:what,
            arg1:0,
            arg2:0,
            next_time:0,
            next:None,
            target:None
        }
    }

    fn set_next(&mut self,next:Message)->&mut Self {
        self.next = Some(Rc::new(RefCell::new(next)));//Some(RefCell::new(Rc::new(Box::new(next))));
        self
    }

    // pub fn set_target(&mut self,target:Box<dyn ProcessMessage>)->&mut Self {
    //     self.target = Some(target);
    //     self
    // }

    fn execute(&self) {
        match &self.target {
            None=> {},
            Some(handler) => {
                handler.handleMessage(
                Message{
                        what:self.what,
                        arg1:self.arg1,
                        arg2:self.arg2,
                        next_time:self.next_time,
                        next:None,
                        target:None
                    }
                )
            }
        }
    }
}

//---- MessageQueue ----
pub struct MessageQueue {
    head:Mutex<Message>,
    cond:Condvar
}

impl MessageQueue {
    pub fn new()->Self {
        MessageQueue {
            head:Mutex::new(Message::new(0)),
            cond:Condvar::new()
        }
    }

    pub fn dequeue_message(&mut self)->Option<Rc<RefCell<Message>>> {
        println!("dequeue_message start");
        loop {
            let mut head = self.head.lock().unwrap();
            println!("dequeue_message trace1");
            match head.next.clone() {
                None => {
                    println!("dequeue_message trace2");
                    self.cond.wait(head);
                    continue;
                },
                Some(msg)=>{
                    
                    let next_time = msg.try_borrow().unwrap().next_time;
                    println!("msg next time is {},current is {}", next_time,system::currentMillis());
                    if next_time <=  system::currentMillis() {
                        head.next = msg.try_borrow().unwrap().next.clone();
                        return Some(msg.clone());
                    }

                    self.cond.wait_timeout(head,
                        Duration::from_millis(next_time - system::currentMillis()));                   
                    
                }
            }
        }
    }

    pub fn enqueue_message(&mut self,mut msg:Message) {
        let mut head = self.head.lock().unwrap();
        println!("enqueue message trace1", );
        if let Some(v) = &head.next {
            let mut prev = v.clone();
            let mut current = v.clone();
            let mut pos:u32 = 0;
            println!("enqueue message trace2", );
            loop {
                if current.try_borrow().unwrap().next_time > msg.next_time {
                    println!("enqueue message trace3", );
                    if pos == 0 {
                        println!("enqueue message trace4", );
                        msg.next = Some(current);
                        head.next = Some(Rc::new(RefCell::new(msg)));
                    } else {
                        println!("enqueue message trace5", );
                        msg.next = Some(current);
                        prev.try_borrow_mut().unwrap().next = Some(Rc::new(RefCell::new(msg)));
                    }
                    self.cond.notify_one();
                    return;
                } else {
                    println!("enqueue message trace6", );
                    pos += 1;
                    let mut is_none = false;
                    match current.try_borrow().unwrap().next {
                        None => {
                            is_none = true;
                        },
                        Some(_) => {}
                    }

                    if is_none {
                        current.try_borrow_mut().unwrap().next =  Some(Rc::new(RefCell::new(msg)));
                        self.cond.notify_one();
                        return;
                    }
                    prev = current.clone();
                    let tmp: Rc<RefCell<Message>> = current.try_borrow_mut().unwrap().next.clone().unwrap().clone();
                    current = tmp;
                }
            }
        } else {
            println!("enqueue message trace8", );
            head.next = Some(Rc::new(RefCell::new(msg)));
            self.cond.notify_one();
            return
        }
    }
}

//
struct Looper {
    queue:Arc<RefCell<MessageQueue>>,
}

impl Looper {
    fn new()->Self {
        Looper {
            queue:Arc::new(RefCell::new(MessageQueue::new())),
        }
    }

    fn get_queue(&self)->Arc<RefCell<MessageQueue>> {
        self.queue.clone()
    }

    fn loop_self(&self) {
        loop {
            println!("loop self start");
            let msg = self.queue.clone().try_borrow_mut().unwrap().dequeue_message();
            println!("loop self trace1");
            match msg {
                None=>{},
                Some(m) => {
                    let _msg = m.clone();
                    _msg.try_borrow_mut().unwrap().execute();
                }
            }
        }
    }
}
struct HandlerThread {
    looper:Arc<Looper>
}

impl HandlerThread {
    fn new()->Self {
        HandlerThread {
            looper:Arc::new(Looper::new()),
        }
    }

    fn new_with_looper(looper:Arc<Looper>)->Self {
        HandlerThread {
            looper:looper.clone(),
        }
    }

    fn start(&self) {
        loop {
            self.looper.loop_self();
        }
    }

    pub fn get_looper(&self)->Arc<Looper> {
        self.looper.clone()
    }
}

pub struct Handler {
    queue:Arc<RefCell<MessageQueue>>,
    processor:Arc<Box<dyn ProcessMessage>>,
}

unsafe impl Send for HandlerThread{}

impl Handler {
    pub fn new(processor:Box<dyn ProcessMessage>)->Self {
        println!("handler construct trace1", );
        let handler_th = HandlerThread::new();
        let handler = Handler {
            queue:handler_th.get_looper().queue.clone(),
            processor:Arc::new(processor)
        };

        println!("handler construct trace2", );
        thread::spawn(move||{
            handler_th.start();
        });
        
        handler
    }

    pub fn send_empty_message(&self,what:u32) {
        println!("send_empty_message start");
        let mut msg = Message::new(what);
        msg.target = Some(self.processor.clone());
        self.queue.try_borrow_mut().unwrap().enqueue_message(msg);
    }

    pub fn send_empty_message_delayed(&self,what:u32,interval:u64) {
        let mut msg = Message::new(what);
        msg.next_time = system::currentMillis() + interval;
        msg.target = Some(self.processor.clone());
        self.queue.try_borrow_mut().unwrap().enqueue_message(msg);
    }
}
举报

相关推荐

0 条评论