从零实现消息中间件-client


presentation:
width: 1024

height: 800

需求

  1. 提供pub接口
  2. 提供sub接口
  3. 处理sub后收到的消息


数据结构定义

type MessageHandler = Box<dyn FnMut(&[u8]) -> std::result::Result<(), ()> + Sync + Send>;
//#[derive(Debug)]
pub struct Client {
    addr: String,
    writer: Arc<Mutex<WriteHalf<TcpStream>>>,
    pub stop: Option<oneshot::Sender<()>>,
    sid: u64,
    handler: Arc<Mutex<HashMap<String, MessageHandler>>>,
}

接口-connect

  1. 建立连接
  2. 创建Client
  3. 启动后台Client读取消息任务
    rust pub async fn connect(addr: &str) -> std::io::Result<Client> {}

接口-pub_message

向服务器发布一条pub消息

pub async fn pub_message(&mut self, 
  subject: &str, 
  msg: &[u8]) 
     -> std::io::Result<()> {}

接口-sub_message

向服务器发布一条sub消息
然后等待服务器推送相关消息

    //sub消息格式为SUB subject {queue} {sid}\r\n
    //可能由于rustc的bug,导致如果subject是&str,则会报错E0700,暂时使用String来替代
    pub async fn sub_message(
        &mut self,
        subject: String,
        queue: Option<String>,
        handler: MessageHandler,
    ) -> std::io::Result<()> {}

client-task

  1. 解析消息
  2. 分派给相应的Subscribe

    /*
    从服务器接收pub消息
    然后推送给相关的订阅方。
    */
    async fn receive_task(
        mut reader: ReadHalf<TcpStream>,
        stop: oneshot::Receiver<()>,
        handler: Arc<Mutex<HashMap<String, MessageHandler>>>,
        writer: Arc<Mutex<WriteHalf<TcpStream>>>,
    ) {}

API的使用-pub

 c.pub_message("test", format!("hello{}", i).as_bytes())
            .await?;

API的使用-sub

     c.sub_message(
        "test".into(),
        None,
        Box::new(move |msg| {
            println!("recevied:{}", unsafe { std::str::from_utf8_unchecked(msg) });
            Ok(())
        }),
    )
    .await?;
xxxxxxxsgg dsgggg 本站总访问量 本站访客数人次