pub fn select_with_strategy<St1, St2, Clos, State>(
    stream1: St1,
    stream2: St2,
    which: Clos
) -> SelectWithStrategy<St1, St2, Clos, State> where
    St1: Stream,
    St2: Stream<Item = <St1 as Stream>::Item>,
    Clos: FnMut(&mut State) -> PollNext,
    State: Default
Expand description

This function will attempt to pull items from both streams. You provide a closure to tell SelectWithStrategy which stream to poll. The closure can store state on SelectWithStrategy to which it will receive a &mut on every invocation. This allows basing the strategy on prior choices.

After one of the two input streams completes, the remaining one will be polled exclusively. The returned stream completes when both input streams have completed.

Note that this function consumes both streams and returns a wrapped version of them.

Examples

Priority

This example shows how to always prioritize the left stream.

use futures::stream::{ repeat, select_with_strategy, PollNext, StreamExt };

let left = repeat(1);
let right = repeat(2);

// We don't need any state, so let's make it an empty tuple.
// We must provide some type here, as there is no way for the compiler
// to infer it. As we don't need to capture variables, we can just
// use a function pointer instead of a closure.
fn prio_left(_: &mut ()) -> PollNext { PollNext::Left }

let mut out = select_with_strategy(left, right, prio_left);

for _ in 0..100 {
    // Whenever we poll out, we will alwas get `1`.
    assert_eq!(1, out.select_next_some().await);
}

Round Robin

This example shows how to select from both streams round robin. Note: this special case is provided by [futures-util::stream::select].

use futures::stream::{ repeat, select_with_strategy, PollNext, StreamExt };

let left = repeat(1);
let right = repeat(2);

let rrobin = |last: &mut PollNext| last.toggle();

let mut out = select_with_strategy(left, right, rrobin);

for _ in 0..100 {
    // We should be alternating now.
    assert_eq!(1, out.select_next_some().await);
    assert_eq!(2, out.select_next_some().await);
}