从零实现消息中间件-client

功能设计

client实现功能相对比较单一,就是能够向服务器pub消息,然后就会说订阅消息,订阅的主题收到消息以后能够得到通知.因此总结起来就是下面三个功能:

  1. 提供pub接口
  2. 提供sub接口
  3. 处理sub后收到的消息


数据结构定义

提供给用户的接口是上面的三个,
为了实现这三个接口,client一定要有的就是writer以及handler. 而sid则是因为一个client可以有多个sub,每一个sub要有唯一的id,主要是编号用. stop则是为了client正常关闭使用.

#[derive(Debug)]
pub struct Client {
    addr: String,
    writer: Arc<Mutex<WriteHalf<TcpStream>>>,
    pub stop: oneshot::Sender<()>,
    sid: u64,
    handler: Arc<Mutex<HashMap<String,  
                  mpsc::UnboundedSender<Vec<u8>>>>>,
}

接口-connect

connect的功能非常直白就是创建和服务器的连接,同时后台会启动一个任务来处理tcp连接,主要是接收消息.

 pub async fn connect(addr: &str) -> std::io::Result<Client> {}

接口-pub_message

向服务器发布一条pub消息

pub async fn pub_message(&mut self, 
  subject: &str, 
  msg: &[u8]) 
     -> std::io::Result<()> {}

接口-sub_message

向服务器发布一条sub消息,然后等待服务器推送相关消息.
需要说明的是这里的参数subjectqueue完全没有必要使用String,&str即可. 这应该是rust的一个bug,在1.41和nightly 1.43都是编译不过去的.所以退而求其次,使用了String.

    //sub消息格式为SUB subject {queue} {sid}\r\n
    pub async fn sub_message(
        &mut self,
        subject: String,
        queue: Option<String>,
        handler: MessageHandler,
    ) -> std::io::Result<()> {}

receive_task

receive_task主要是做消息的接收,解析,以及将消息派发给合适的handler.
这个其实是本模块最复杂的地方,总体上比较直观.
主要有以下两点

  1. 使用futures::select这个宏来辅助实现同时监控多个future
  2. TcpStream如果read到size为0,说明连接已经关闭,无需继续

    async fn receive_task(
        mut reader: ReadHalf<TcpStream>,
        stop: oneshot::Receiver<()>,
        handler: Arc<Mutex<HashMap<String,
                 mpsc::UnboundedSender<Vec<u8>>>>>,
        writer: Arc<Mutex<WriteHalf<TcpStream>>>,
    ) 

API的使用

pub

 c.pub_message("test", format!("hello{}", i).as_bytes())
            .await?;

sub

 c.sub_message(
        "test".into(),
        None,
        Box::new(move |msg| {
            println!("recevied:{}", unsafe { std::str::from_utf8_unchecked(msg) });
            Ok(())
        }),
    )

代码实现


type MessageHandler = Box<dyn FnMut(&[u8]) -> std::result::Result<(), ()> + Sync + Send>;
//#[derive(Debug)]
pub struct Client {
    addr: String,
    writer: Arc<Mutex<WriteHalf<TcpStream>>>,
    pub stop: Option<oneshot::Sender<()>>,
    sid: u64,
    handler: Arc<Mutex<HashMap<String, MessageHandler>>>,
}

impl Client {
    //1. 建立到服务器的连接
    //2. 启动后台任务
    pub async fn connect(addr: &str) -> std::io::Result<Client> {
        let conn = TcpStream::connect(addr).await?;
        let (reader, writer) = tokio::io::split(conn);
        let (tx, rx) = tokio::sync::oneshot::channel();
        let c = Client {
            addr: addr.into(),
            writer: Arc::new(Mutex::new(writer)),
            stop: Some(tx),
            sid: 0,
            handler: Arc::new(Default::default()),
        };
        let handler = c.handler.clone();
        let writer = c.writer.clone();
        /*
        tokio::spawn 可以认为和go语言中的
        go func(){}()
        */
        tokio::spawn(async move {
            Self::receive_task(reader, rx, handler, writer).await;
        });
        Ok(c)
    }
    /*
    从服务器接收pub消息
    然后推送给相关的订阅方。
    */
    async fn receive_task(
        mut reader: ReadHalf<TcpStream>,
        stop: oneshot::Receiver<()>,
        handler: Arc<Mutex<HashMap<String, MessageHandler>>>,
        writer: Arc<Mutex<WriteHalf<TcpStream>>>,
    ) {
        let mut buf = [0 as u8; 512];
        let mut parser = Parser::new();
        use futures::*;
        let mut stop = stop.fuse();
        loop {
            select! {
                _=stop=>{
                    println!("client closed");
                    return;
                }
                r = reader.read(&mut buf[..]).fuse()=>{
                     let n = {
                        match r {
                            Err(e) => {
                                println!("read err {}", e);
                                let _ = writer.lock().await.shutdown().await;
                                return;
                            }
                            Ok(n) => n,
                        }
                    };
                    if n == 0 {
                        //EOF,说明对方关闭了连接
                        return;
                    }
                    let mut buf2 = &buf[..n];
                    loop {
                        let r = parser.parse(buf2);
                        let (r, n) = match r {
                            Err(e) => {
                                println!("parse err {}", e);
                                let _ = writer.lock().await.shutdown().await;
                                return;
                            }
                            Ok(r) => r,
                        };
                        //                println!("receive msg {:?}", r);
                        match r {
                            ParseResult::NoMsg => {
                                break;
                            }
                            ParseResult::MsgArg(msg) => {
                                Self::process_message(msg, &handler).await;
                                parser.clear_msg_buf();
                            }
                        }
                        //缓冲区处理完毕
                        if n == buf.len() {
                            break;
                        }
                        buf2 = &buf2[n..]
                    }
                }
            }
        }
    }
    /*
    根据消息的subject,找到订阅方,
    然后推送给他们
    */
    pub async fn process_message(
        msg: MsgArg<'_>,
        handler: &Arc<Mutex<HashMap<String, MessageHandler>>>,
    ) {
        //        println!("broadcast msg {}", msg.subject);
        let mut handler = handler.lock().await;
        let h = handler.get_mut(msg.subject);
        if let Some(h) = h {
            let _ = h(msg.msg);
        }
    }
    //pub消息格式为PUB subject size\r\n{message}
    pub async fn pub_message(&self, subject: &str, msg: &[u8]) -> std::io::Result<()> {
        let mut writer = self.writer.lock().await;
        let m = format!("PUB {} {}\r\n", subject, msg.len());
        let _ = writer.write_all(m.as_bytes()).await;
        let _ = writer.write_all(msg).await;
        writer.write_all("\r\n".as_bytes()).await
    }

    //sub消息格式为SUB subject {queue} {sid}\r\n
    //可能由于rustc的bug,导致如果subject是&str,则会报错E0700,暂时使用String来替代
    pub async fn sub_message(
        &mut self,
        subject: String,
        queue: Option<String>,
        handler: MessageHandler,
    ) -> std::io::Result<()> {
        self.sid += 1;
        let mut writer = self.writer.lock().await;
        let m = if let Some(queue) = queue {
            format!("SUB {} {} {}\r\n", subject.as_str(), queue, self.sid)
        } else {
            format!("SUB {} {}\r\n", subject.as_str(), self.sid)
        };
        self.handler.lock().await.insert(subject, handler);
        writer.write_all(m.as_bytes()).await
    }
    pub fn close(&mut self) {
        if let Some(stop) = self.stop.take() {
            if let Err(e) = stop.send(()) {
                println!("stop err {:?}", e);
            }
        }
    }
}

其他

相关代码都在我的github rnats 欢迎围观

目录

xxxxxxxsgg dsgggg 本站总访问量 本站访客数人次