Skip to content

Commit 529a58a

Browse files
authored
Merge pull request #327 from assemblaj/assemblaj-partial_cmp_final
Adds Stream::partial_cmp
2 parents 00a8433 + 80bee9a commit 529a58a

File tree

2 files changed

+130
-0
lines changed

2 files changed

+130
-0
lines changed

src/stream/stream/mod.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ mod map;
3737
mod min_by;
3838
mod next;
3939
mod nth;
40+
mod partial_cmp;
4041
mod scan;
4142
mod skip;
4243
mod skip_while;
@@ -56,6 +57,7 @@ use for_each::ForEachFuture;
5657
use min_by::MinByFuture;
5758
use next::NextFuture;
5859
use nth::NthFuture;
60+
use partial_cmp::PartialCmpFuture;
5961
use try_for_each::TryForEeachFuture;
6062

6163
pub use chain::Chain;
@@ -1187,6 +1189,42 @@ extension_trait! {
11871189
{
11881190
Merge::new(self, other)
11891191
}
1192+
1193+
#[doc = r#"
1194+
Lexicographically compares the elements of this `Stream` with those
1195+
of another.
1196+
1197+
# Examples
1198+
```
1199+
# fn main() { async_std::task::block_on(async {
1200+
#
1201+
use async_std::prelude::*;
1202+
use std::collections::VecDeque;
1203+
use std::cmp::Ordering;
1204+
let s1 = VecDeque::from(vec![1]);
1205+
let s2 = VecDeque::from(vec![1, 2]);
1206+
let s3 = VecDeque::from(vec![1, 2, 3]);
1207+
let s4 = VecDeque::from(vec![1, 2, 4]);
1208+
assert_eq!(s1.clone().partial_cmp(s1.clone()).await, Some(Ordering::Equal));
1209+
assert_eq!(s1.clone().partial_cmp(s2.clone()).await, Some(Ordering::Less));
1210+
assert_eq!(s2.clone().partial_cmp(s1.clone()).await, Some(Ordering::Greater));
1211+
assert_eq!(s3.clone().partial_cmp(s4.clone()).await, Some(Ordering::Less));
1212+
assert_eq!(s4.clone().partial_cmp(s3.clone()).await, Some(Ordering::Greater));
1213+
#
1214+
# }) }
1215+
```
1216+
"#]
1217+
fn partial_cmp<S>(
1218+
self,
1219+
other: S
1220+
) -> impl Future<Output = Option<Ordering>> [PartialCmpFuture<Self, S>]
1221+
where
1222+
Self: Sized + Stream,
1223+
S: Stream,
1224+
<Self as Stream>::Item: PartialOrd<S::Item>,
1225+
{
1226+
PartialCmpFuture::new(self, other)
1227+
}
11901228
}
11911229

11921230
impl<S: Stream + Unpin + ?Sized> Stream for Box<S> {

src/stream/stream/partial_cmp.rs

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
use std::cmp::Ordering;
2+
use std::pin::Pin;
3+
4+
use super::fuse::Fuse;
5+
use crate::future::Future;
6+
use crate::prelude::*;
7+
use crate::stream::Stream;
8+
use crate::task::{Context, Poll};
9+
10+
// Lexicographically compares the elements of this `Stream` with those
11+
// of another.
12+
#[doc(hidden)]
13+
#[allow(missing_debug_implementations)]
14+
pub struct PartialCmpFuture<L: Stream, R: Stream> {
15+
l: Fuse<L>,
16+
r: Fuse<R>,
17+
l_cache: Option<L::Item>,
18+
r_cache: Option<R::Item>,
19+
}
20+
21+
impl<L: Stream, R: Stream> PartialCmpFuture<L, R> {
22+
pin_utils::unsafe_pinned!(l: Fuse<L>);
23+
pin_utils::unsafe_pinned!(r: Fuse<R>);
24+
pin_utils::unsafe_unpinned!(l_cache: Option<L::Item>);
25+
pin_utils::unsafe_unpinned!(r_cache: Option<R::Item>);
26+
27+
pub(super) fn new(l: L, r: R) -> Self {
28+
PartialCmpFuture {
29+
l: l.fuse(),
30+
r: r.fuse(),
31+
l_cache: None,
32+
r_cache: None,
33+
}
34+
}
35+
}
36+
37+
impl<L: Stream, R: Stream> Future for PartialCmpFuture<L, R>
38+
where
39+
L: Stream + Sized,
40+
R: Stream + Sized,
41+
L::Item: PartialOrd<R::Item>,
42+
{
43+
type Output = Option<Ordering>;
44+
45+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
46+
loop {
47+
// Short circuit logic
48+
// Stream that completes earliest can be considered Less, etc
49+
let l_complete = self.l.done && self.as_mut().l_cache.is_none();
50+
let r_complete = self.r.done && self.as_mut().r_cache.is_none();
51+
52+
if l_complete && r_complete {
53+
return Poll::Ready(Some(Ordering::Equal));
54+
} else if l_complete {
55+
return Poll::Ready(Some(Ordering::Less));
56+
} else if r_complete {
57+
return Poll::Ready(Some(Ordering::Greater));
58+
}
59+
60+
// Get next value if possible and necesary
61+
if !self.l.done && self.as_mut().l_cache.is_none() {
62+
let l_next = futures_core::ready!(self.as_mut().l().poll_next(cx));
63+
if let Some(item) = l_next {
64+
*self.as_mut().l_cache() = Some(item);
65+
}
66+
}
67+
68+
if !self.r.done && self.as_mut().r_cache.is_none() {
69+
let r_next = futures_core::ready!(self.as_mut().r().poll_next(cx));
70+
if let Some(item) = r_next {
71+
*self.as_mut().r_cache() = Some(item);
72+
}
73+
}
74+
75+
// Compare if both values are available.
76+
if self.as_mut().l_cache.is_some() && self.as_mut().r_cache.is_some() {
77+
let l_value = self.as_mut().l_cache().take().unwrap();
78+
let r_value = self.as_mut().r_cache().take().unwrap();
79+
let result = l_value.partial_cmp(&r_value);
80+
81+
if let Some(Ordering::Equal) = result {
82+
// Reset cache to prepare for next comparison
83+
*self.as_mut().l_cache() = None;
84+
*self.as_mut().r_cache() = None;
85+
} else {
86+
// Return non equal value
87+
return Poll::Ready(result);
88+
}
89+
}
90+
}
91+
}
92+
}

0 commit comments

Comments
 (0)