Skip to content

Commit 454018e

Browse files
authored
Merge pull request #321 from async-rs/stream_merge
rename stream::join to Stream::merge
2 parents b9bddbb + 04342c7 commit 454018e

File tree

3 files changed

+83
-3
lines changed

3 files changed

+83
-3
lines changed

src/stream/mod.rs

+1-3
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,6 @@ cfg_if! {
4646
pub use from_stream::FromStream;
4747
pub use into_stream::IntoStream;
4848

49-
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
50-
#[doc(inline)]
51-
pub use async_macros::{join_stream as join, JoinStream as Join};
49+
pub use stream::Merge;
5250
}
5351
}

src/stream/stream/merge.rs

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
use std::pin::Pin;
2+
use std::task::{Context, Poll};
3+
4+
use futures_core::Stream;
5+
6+
/// A stream that merges two other streams into a single stream.
7+
///
8+
/// This stream is returned by [`Stream::merge`].
9+
///
10+
/// [`Stream::merge`]: trait.Stream.html#method.merge
11+
#[derive(Debug)]
12+
pub struct Merge<L, R> {
13+
left: L,
14+
right: R,
15+
}
16+
17+
impl<L, R> Unpin for Merge<L, R> {}
18+
19+
impl<L, R> Merge<L, R> {
20+
pub(crate) fn new(left: L, right: R) -> Self {
21+
Self { left, right }
22+
}
23+
}
24+
25+
impl<L, R, T> Stream for Merge<L, R>
26+
where
27+
L: Stream<Item = T> + Unpin,
28+
R: Stream<Item = T> + Unpin,
29+
{
30+
type Item = T;
31+
32+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
33+
if let Poll::Ready(Some(item)) = Pin::new(&mut self.left).poll_next(cx) {
34+
// The first stream made progress. The Merge needs to be polled
35+
// again to check the progress of the second stream.
36+
cx.waker().wake_by_ref();
37+
Poll::Ready(Some(item))
38+
} else {
39+
Pin::new(&mut self.right).poll_next(cx)
40+
}
41+
}
42+
}

src/stream/stream/mod.rs

+40
Original file line numberDiff line numberDiff line change
@@ -87,10 +87,14 @@ cfg_if! {
8787

8888
cfg_if! {
8989
if #[cfg(any(feature = "unstable", feature = "docs"))] {
90+
mod merge;
91+
9092
use std::pin::Pin;
9193

9294
use crate::future::Future;
9395
use crate::stream::FromStream;
96+
97+
pub use merge::Merge;
9498
}
9599
}
96100

@@ -1147,6 +1151,42 @@ extension_trait! {
11471151
{
11481152
FromStream::from_stream(self)
11491153
}
1154+
1155+
#[doc = r#"
1156+
Combines multiple streams into a single stream of all their outputs.
1157+
1158+
Items are yielded as soon as they're received, and the stream continues yield until both
1159+
streams have been exhausted.
1160+
1161+
# Examples
1162+
1163+
```
1164+
# async_std::task::block_on(async {
1165+
use async_std::prelude::*;
1166+
use async_std::stream;
1167+
1168+
let a = stream::once(1u8);
1169+
let b = stream::once(2u8);
1170+
let c = stream::once(3u8);
1171+
1172+
let mut s = a.merge(b).merge(c);
1173+
1174+
assert_eq!(s.next().await, Some(1u8));
1175+
assert_eq!(s.next().await, Some(2u8));
1176+
assert_eq!(s.next().await, Some(3u8));
1177+
assert_eq!(s.next().await, None);
1178+
# });
1179+
```
1180+
"#]
1181+
#[cfg(any(feature = "unstable", feature = "docs"))]
1182+
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
1183+
fn merge<U>(self, other: U) -> Merge<Self, U>
1184+
where
1185+
Self: Sized,
1186+
U: Stream<Item = Self::Item> + Sized,
1187+
{
1188+
Merge::new(self, other)
1189+
}
11501190
}
11511191

11521192
impl<S: Stream + Unpin + ?Sized> Stream for Box<S> {

0 commit comments

Comments
 (0)