从零实现消息中间件-server

该部分功能相对比较简单,主要是

  • listen & accept
  • new client
    当然真实的Server中会做其他很多工作,比如配置选项等等,我们这里暂时都不考虑了.

数据结构定义

#[derive(Debug, Default)]
pub struct Server<T: SubListTrait> {
    state: Arc<Mutex<ServerState<T>>>,
}
#[derive(Debug, Default)]
pub struct ServerState<T: SubListTrait> {
    clients: HashMap<u64, Arc<Mutex<ClientMessageSender>>>,
    pub sublist: T,
    pub gen_cid: u64,
}

其中最核心的就是ServerState,注意到他被放在了 Arc<Mutex>中,这也就意味着

  1. 他要多线程访问
  2. 多线程中读写
    如果一个复杂结构体,需要多线程读,我们可以使用Arc包裹,避免多次内存分配
    如果一个变量,需要多线程读写,我们必须使用Mutex包裹,否则肯定无法编译
    这里的SubListTrait就是上节课从零实现消息中间件-sublist中讲到的.

关于channel和mutex

标准库中有channel和mutex,tokio也另外提供了一套,他们的接口使用起来差不多. 最大的区别就是标准库里的阻塞是会导致整个线程阻塞,而tokio提供的只是阻塞当前task.
不要在tokio框架中使用标准库中的channel和mutex

泛型 & async

因为ServerState中的sublist,他需要在多个tokio的task之间传递,所以我们要求他除了实现SubListTrait这个我们要求的功能性trait之外,还要满足tokio的要求.
也就是Send+'static. 顺便说一句'static这个生命周期,简单理解他的意思就是我这个struct中不包含任何借用.

 impl<T: SubListTrait + Send + 'static> Server<T> {
 }

接口设计

从功能上来说,Server这个结构体很简单,就是

  1. 主要任务就是listen & accept
  2. 创建Client实例, 后续需要的时候好利用起来.
    这分别对应下面的start和new_client两个函数.

    impl<T: SubListTrait + Send + 'static> Server<T> {
      pub async fn start(&self) -> Result<(), Box<dyn Error>> {
    
    }
    async fn new_client(&self, conn: TcpStream) {
    }
    }
    

如何使用

这里用到了tokio的使用方式,为了简化使用,tokio提供了两个宏main和test,他们位于tokio-macros这个crate下面.
有了这两个宏,我们的main函数可以简化很多.看起来就和普通的main函数差别不大,只是多了一个async关键字.

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    println!("server start..");
    let s: Server<SimpleSubList> = Server::default();
    s.start().await
}

代码实现

use crate::client::*;
use crate::simple_sublist::SubListTrait;
use std::collections::HashMap;
use std::error::Error;
use std::sync::Arc;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::Mutex;

#[derive(Debug, Default)]
pub struct Server<T: SubListTrait> {
    state: Arc<Mutex<ServerState<T>>>,
}
#[derive(Debug, Default)]
pub struct ServerState<T: SubListTrait> {
    clients: HashMap<u64, Arc<Mutex<ClientMessageSender>>>,
    pub sublist: T,
    pub gen_cid: u64,
}

impl<T: SubListTrait + Send + 'static> Server<T> {
    pub async fn start(self) -> Result<(), Box<dyn Error>> {
        let addr = "127.0.0.1:4222";
        let mut listener = TcpListener::bind(addr).await?;
        //go func(){}
        loop {
            let (conn, _) = listener.accept().await?;
            self.new_client(conn).await;
        }
        Ok(())
    }
    async fn new_client(&self, conn: TcpStream) {
        let state = self.state.clone();
        let cid = {
            let mut state = state.lock().await;
            state.gen_cid += 1;
            state.gen_cid
        };
        let c = Client::process_connection(cid, state, conn);
        self.state.lock().await.clients.insert(cid, c);
    }
}

其他

相关代码都在我的github rnats 欢迎围观

目录

本站总访问量 本站访客数人次