imago/
vector_select.rs

1//! Async select over future vectors.
2//!
3//! Allows collecting `dyn Future` objects (i.e. async function instances) in a vector, and
4//! `select`ing (awaiting one) or `join`ing (awaiting all) them.
5
6use std::future::Future;
7use std::marker::Unpin;
8use std::pin::Pin;
9use std::task::{Context, Poll};
10
11/// Collect futures and await one or all of them.
12pub(crate) struct FutureVector<R, E, F: Future<Output = Result<R, E>> + Unpin> {
13    /// Pending futures.
14    vec: Vec<F>,
15}
16
17/// Await a single future.
18pub(crate) struct FutureVectorSelect<'a, R, E, F: Future<Output = Result<R, E>> + Unpin>(
19    &'a mut FutureVector<R, E, F>,
20);
21
22/// Await all futures, discarding successful results.
23pub(crate) struct FutureVectorDiscardingJoin<'a, R, E, F: Future<Output = Result<R, E>> + Unpin>(
24    &'a mut FutureVector<R, E, F>,
25);
26
27impl<R, E, F: Future<Output = Result<R, E>> + Unpin> FutureVector<R, E, F> {
28    /// Create a new `FutureVector`.
29    pub fn new() -> Self {
30        FutureVector { vec: Vec::new() }
31    }
32
33    /// Add a future.
34    pub fn push(&mut self, future: F) {
35        self.vec.push(future);
36    }
37
38    /// `true` if and only if there are no pending futures.
39    pub fn is_empty(&self) -> bool {
40        self.vec.is_empty()
41    }
42
43    /// Number of pending futures.
44    pub fn len(&self) -> usize {
45        self.vec.len()
46    }
47
48    /// Await any one future.
49    ///
50    /// Return the result of the first future that becomes ready, removing it from the vector.
51    ///
52    /// Functionally, behaves like:
53    /// ```ignore
54    /// async fn select(&mut self) -> Result<R, E>;
55    /// ```
56    pub fn select(&mut self) -> FutureVectorSelect<'_, R, E, F> {
57        FutureVectorSelect(self)
58    }
59
60    /// Join all futures, discarding successful results.
61    ///
62    /// If an error occurs, return it immediately.  All pending futures remain.
63    ///
64    /// Functionally, behaves like:
65    /// ```ignore
66    /// async fn discarding_join(&mut self) -> Result<(), E>;
67    /// ```
68    pub fn discarding_join(&mut self) -> FutureVectorDiscardingJoin<'_, R, E, F> {
69        FutureVectorDiscardingJoin(self)
70    }
71}
72
73impl<R, E, F: Future<Output = Result<R, E>> + Unpin> Future for FutureVectorSelect<'_, R, E, F> {
74    type Output = F::Output;
75
76    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<F::Output> {
77        assert!(!self.0.is_empty());
78
79        for (i, fut) in self.0.vec.iter_mut().enumerate() {
80            if let Poll::Ready(result) = F::poll(Pin::new(fut), ctx) {
81                self.0.vec.swap_remove(i);
82                return Poll::Ready(result);
83            }
84        }
85
86        Poll::Pending
87    }
88}
89
90impl<R, E, F: Future<Output = Result<R, E>> + Unpin> Future
91    for FutureVectorDiscardingJoin<'_, R, E, F>
92{
93    type Output = Result<(), E>;
94
95    fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Result<(), E>> {
96        let mut i = 0;
97        while i < self.0.len() {
98            if let Poll::Ready(result) = F::poll(Pin::new(&mut self.0.vec[i]), ctx) {
99                self.0.vec.swap_remove(i);
100                if let Err(err) = result {
101                    return Poll::Ready(Err(err));
102                }
103            } else {
104                i += 1;
105            }
106        }
107
108        if self.0.is_empty() {
109            Poll::Ready(Ok(()))
110        } else {
111            Poll::Pending
112        }
113    }
114}