从零实现消息中间件-client


presentation:
width: 1024

height: 800

需求

  1. 提供pub接口
  2. 提供sub接口
  3. 处理sub后收到的消息


数据结构定义

#[derive(Debug)]
pub struct Client {
    addr: String,
    writer: Arc<Mutex<WriteHalf<TcpStream>>>,
    pub stop: oneshot::Sender<()>,
    sid: u64,
    handler: Arc<Mutex<HashMap<String,  
                  mpsc::UnboundedSender<Vec<u8>>>>>,
}

接口-connect

  1. 建立连接
  2. 创建Client
  3. 启动后台Client读取消息任务
    rust pub async fn connect(addr: &str) -> std::io::Result<Client> {}

接口-pub_message

向服务器发布一条pub消息

pub async fn pub_message(&mut self, 
  subject: &str, 
  msg: &[u8]) 
     -> std::io::Result<()> {}

接口-sub_message

向服务器发布一条sub消息
然后等待服务器推送相关消息

    //sub消息格式为SUB subject {queue} {sid}\r\n
    pub async fn sub_message(
        &mut self,
        subject: &str,
        queue: Option<&str>
    ) -> std::io::Result<mpsc::UnboundedReceiver<Vec<u8>>> {}

client-task

  1. 解析消息
  2. 分派给相应的Subscribe

    async fn receive_task(
        mut reader: ReadHalf<TcpStream>,
        stop: oneshot::Receiver<()>,
        handler: Arc<Mutex<HashMap<String,
                 mpsc::UnboundedSender<Vec<u8>>>>>,
        writer: Arc<Mutex<WriteHalf<TcpStream>>>,
    ) 

API的使用-pub

 c.pub_message("test", format!("hello{}", i).as_bytes())
            .await?;

API的使用-sub

 let mut rx = c.sub_message("test", None).await?;
    for i in 0..10 {
        let r = rx.recv().await;
        if r.is_none() {
            break;
        }
        let r = r.unwrap();
        println!("{} receive on test {}", i, unsafe {
            std::str::from_utf8_unchecked(r.as_slice())
        });
    }