trait Future {
    /// The type of the value returned when the future completes.
    type Item;

    /// The type representing errors that occurred while processing the computation.
    type Error;

    /// The function that will be repeatedly called to see if the future
    /// has completed or not. The `Async` enum can either be `Ready` or
    /// `NotReady` and indicates whether the future is ready to produce
    /// a value or not.
    fn poll(Pin<&mut Self>) -> Result<Async<Self::Item>, Self::Error>;



trait Stream {
    /// The type of the value yielded by the stream.
    type Item;

    /// The type representing errors that occurred while processing the computation.
    type Error;

    /// The function that will be repeatedly called to see if the stream has
    /// another value it can yield
    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>;

和Future不一样的是他要持续返回数据,所以Ready中又分了Some和None两种情况. 分成四种情况返回: 1. Ok(Async::Ready(Some(value))) 表示数据准备好了 2. Ok(Async::NotReady) 数据没准备好 3. Ok(Async::Ready(None)) 没数据了 4. Err(error) 出错了




async {


// The `Future` type generated by our `async { ... }` block
struct AsyncFuture {
    fut_one: FutOne,
    fut_two: FutTwo,
    state: State,
 // List of states our `async` block can be in
enum State {
 impl AsyncFuture {
    fn poll(...) -> Poll<()> {
        loop {
            match self.state {
                State::AwaitingFutOne => match self.fut_one.poll(..) {
                    Poll::Ready(()) => self.state = State::AwaitingFutTwo,
                    Poll::Pending => return Poll::Pending,
                State::AwaitingFutTwo => match self.fut_two.poll(..) {
                    Poll::Ready(()) => self.state = State::Done,
                    Poll::Pending => return Poll::Pending,
                State::Done => return Poll::Ready(()),



async {
    let mut x = [0; 128];
    let read_into_buf_fut = read_into_buf(&mut x);
    println!("{:?}", x);


struct ReadIntoBuf<'a> {
    buf: &'a mut [u8], // points to `x` below
 struct AsyncFuture {
    x: [u8; 128],
    read_into_buf_fut: ReadIntoBuf<'what_lifetime?>,




use pin_utils::pin_mut; // `pin_utils` is a handy crate available on
 // A function which takes a `Future` that implements `Unpin`.
fn execute_unpin_future(x: impl Future<Output = ()> + Unpin) { ... }
 let fut = async { ... };
execute_unpin_future(fut); // Error: `fut` does not implement `Unpin` trait
 // Pinning with `Box`:
let fut = async { ... };
let fut = Box::pinned(fut);
execute_unpin_future(fut); // OK
 // Pinning with `pin_mut!`:
let fut = async { ... };
execute_unpin_future(fut); // OK

tokio runtime

As hinted at earlier, the Rust asynchronous model is very different than that of other languages. Most other languages use a “completion” based model, usually built using some form of callbacks. In this case, when an asynchronous action is started, it is submitted with a function to call once the operation completes. When the process receives the I/O notification from the operating system, it finds the function associated with it and calls it immediately. This is a push based model because the value is pushed into the callback.

The rust asynchronous model is pull based. Instead of a Future being responsible for pushing the data into a callback, it relies on something else asking if it is complete or not. In the case of Tokio, that something else is the Tokio runtime.

Using a poll based model offers many advantages, including being a zero cost abstraction, i.e., using Rust futures has no added overhead compared to writing the asynchronous code by hand.
use std::pin::Pin;
use std::task::{Context, Poll};

pub trait Future {
    /// The type of value produced on completion.
    type Output;

    /// Attempt to resolve the future to a final value, registering
    /// the current task for wakeup if the value is not yet available.
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;

配合Context中的awake,我觉得大多数情况下和push模型差不多啊. 可能是我理解的不深.


注意这里面的use, 自己写的时候是缺一不可的.

extern crate tokio;
extern crate bytes;
extern crate futures;

use tokio::io::AsyncWrite;
use tokio::net::{TcpStream, tcp::ConnectFuture};
use bytes::{Bytes, Buf};
use futures::{Future, Async, Poll};
use std::io::{self, Cursor};

// HelloWorld has two states, namely waiting to connect to the socket
// and already connected to the socket
enum HelloWorld {
    Connected(TcpStream, Cursor<Bytes>),

impl Future for HelloWorld {
    type Item = ();
    type Error = io::Error;

    fn poll(&mut self) -> Poll<(), io::Error> {
        use self::HelloWorld::*;

        loop {
            match self {
                Connecting(ref mut f) => {
                    let socket = try_ready!(f.poll());
                    let data = Cursor::new(Bytes::from_static(b"hello world"));
                    *self = Connected(socket, data);
                Connected(ref mut socket, ref mut data) => {
                    // Keep trying to write the buffer to the socket as long as the
                    // buffer has more bytes available for consumption
                    while data.has_remaining() {
                    return Ok(Async::Ready(()));

fn main() {
    let addr = "".parse().unwrap();
    let connect_future = TcpStream::connect(&addr);
    let hello_world = HelloWorld::Connecting(connect_future);

    // Run it, here we map the error since tokio::run expects a Future<Item=(), Error=()>
    tokio::run(hello_world.map_err(|e| println!("{0}", e)))


有点像标准库中的Option,iter等,futures也为Future实现了一些辅助工具 1. map 2. and_then

本站总访问量 本站访客数人次