Skip to content

Commit 75dc819

Browse files
feat(io): implement Read::take
1 parent f6a2393 commit 75dc819

File tree

2 files changed

+256
-0
lines changed

2 files changed

+256
-0
lines changed

src/io/read/mod.rs

+42
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ mod read_exact;
33
mod read_to_end;
44
mod read_to_string;
55
mod read_vectored;
6+
mod take;
67

78
use read::ReadFuture;
89
use read_exact::ReadExactFuture;
@@ -261,6 +262,47 @@ extension_trait! {
261262
{
262263
ReadExactFuture { reader: self, buf }
263264
}
265+
266+
#[doc = r#"
267+
Creates an adaptor which will read at most `limit` bytes from it.
268+
269+
This function returns a new instance of `Read` which will read at most
270+
`limit` bytes, after which it will always return EOF ([`Ok(0)`]). Any
271+
read errors will not count towards the number of bytes read and future
272+
calls to [`read()`] may succeed.
273+
274+
# Examples
275+
276+
[`File`]s implement `Read`:
277+
278+
[`File`]: ../fs/struct.File.html
279+
[`Ok(0)`]: ../../std/result/enum.Result.html#variant.Ok
280+
[`read()`]: tymethod.read
281+
282+
```no_run
283+
use async_std::io::prelude::*;
284+
use async_std::fs::File;
285+
286+
fn main() -> std::io::Result<()> {
287+
async_std::task::block_on(async {
288+
let f = File::open("foo.txt").await?;
289+
let mut buffer = [0; 5];
290+
291+
// read at most five bytes
292+
let mut handle = f.take(5);
293+
294+
handle.read(&mut buffer).await?;
295+
Ok(())
296+
})
297+
}
298+
```
299+
"#]
300+
fn take(self, limit: u64) -> take::Take<Self>
301+
where
302+
Self: Sized,
303+
{
304+
take::Take { inner: self, limit }
305+
}
264306
}
265307

266308
impl<T: Read + Unpin + ?Sized> Read for Box<T> {

src/io/read/take.rs

+214
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
use std::cmp;
2+
use std::pin::Pin;
3+
4+
use crate::io::{self, Read};
5+
use crate::task::{Context, Poll};
6+
7+
/// Reader adaptor which limits the bytes read from an underlying reader.
8+
///
9+
/// This struct is generally created by calling [`take`] on a reader.
10+
/// Please see the documentation of [`take`] for more details.
11+
///
12+
/// [`take`]: trait.Read.html#method.take
13+
#[derive(Debug)]
14+
pub struct Take<T> {
15+
pub(crate) inner: T,
16+
pub(crate) limit: u64,
17+
}
18+
19+
impl<T> Take<T> {
20+
/// Returns the number of bytes that can be read before this instance will
21+
/// return EOF.
22+
///
23+
/// # Note
24+
///
25+
/// This instance may reach `EOF` after reading fewer bytes than indicated by
26+
/// this method if the underlying [`Read`] instance reaches EOF.
27+
///
28+
/// [`Read`]: trait.Read.html
29+
///
30+
/// # Examples
31+
///
32+
/// ```no_run
33+
/// use async_std::io;
34+
/// use async_std::prelude::*;
35+
/// use async_std::fs::File;
36+
///
37+
/// fn main() -> io::Result<()> { async_std::task::block_on(async {
38+
/// let f = File::open("foo.txt").await?;
39+
///
40+
/// // read at most five bytes
41+
/// let handle = f.take(5);
42+
///
43+
/// println!("limit: {}", handle.limit());
44+
/// Ok(())
45+
/// }) }
46+
/// ```
47+
pub fn limit(&self) -> u64 {
48+
self.limit
49+
}
50+
51+
/// Sets the number of bytes that can be read before this instance will
52+
/// return EOF. This is the same as constructing a new `Take` instance, so
53+
/// the amount of bytes read and the previous limit value don't matter when
54+
/// calling this method.
55+
///
56+
/// # Examples
57+
///
58+
/// ```no_run
59+
/// use async_std::io;
60+
/// use async_std::prelude::*;
61+
/// use async_std::fs::File;
62+
///
63+
/// fn main() -> io::Result<()> { async_std::task::block_on(async {
64+
/// let f = File::open("foo.txt").await?;
65+
///
66+
/// // read at most five bytes
67+
/// let mut handle = f.take(5);
68+
/// handle.set_limit(10);
69+
///
70+
/// assert_eq!(handle.limit(), 10);
71+
/// Ok(())
72+
/// }) }
73+
/// ```
74+
pub fn set_limit(&mut self, limit: u64) {
75+
self.limit = limit;
76+
}
77+
78+
/// Consumes the `Take`, returning the wrapped reader.
79+
///
80+
/// # Examples
81+
///
82+
/// ```no_run
83+
/// use async_std::io;
84+
/// use async_std::prelude::*;
85+
/// use async_std::fs::File;
86+
///
87+
/// fn main() -> io::Result<()> { async_std::task::block_on(async {
88+
/// let file = File::open("foo.txt").await?;
89+
///
90+
/// let mut buffer = [0; 5];
91+
/// let mut handle = file.take(5);
92+
/// handle.read(&mut buffer).await?;
93+
///
94+
/// let file = handle.into_inner();
95+
/// Ok(())
96+
/// }) }
97+
/// ```
98+
pub fn into_inner(self) -> T {
99+
self.inner
100+
}
101+
102+
/// Gets a reference to the underlying reader.
103+
///
104+
/// # Examples
105+
///
106+
/// ```no_run
107+
/// use async_std::io;
108+
/// use async_std::prelude::*;
109+
/// use async_std::fs::File;
110+
///
111+
/// fn main() -> io::Result<()> { async_std::task::block_on(async {
112+
/// let file = File::open("foo.txt").await?;
113+
///
114+
/// let mut buffer = [0; 5];
115+
/// let mut handle = file.take(5);
116+
/// handle.read(&mut buffer).await?;
117+
///
118+
/// let file = handle.get_ref();
119+
/// Ok(())
120+
/// }) }
121+
/// ```
122+
pub fn get_ref(&self) -> &T {
123+
&self.inner
124+
}
125+
126+
/// Gets a mutable reference to the underlying reader.
127+
///
128+
/// Care should be taken to avoid modifying the internal I/O state of the
129+
/// underlying reader as doing so may corrupt the internal limit of this
130+
/// `Take`.
131+
///
132+
/// # Examples
133+
///
134+
/// ```no_run
135+
/// use async_std::io;
136+
/// use async_std::prelude::*;
137+
/// use async_std::fs::File;
138+
///
139+
/// fn main() -> io::Result<()> { async_std::task::block_on(async {
140+
/// let file = File::open("foo.txt").await?;
141+
///
142+
/// let mut buffer = [0; 5];
143+
/// let mut handle = file.take(5);
144+
/// handle.read(&mut buffer).await?;
145+
///
146+
/// let file = handle.get_mut();
147+
/// Ok(())
148+
/// }) }
149+
/// ```
150+
pub fn get_mut(&mut self) -> &mut T {
151+
&mut self.inner
152+
}
153+
}
154+
155+
impl<T: Read + Unpin> Read for Take<T> {
156+
/// Attempt to read from the `AsyncRead` into `buf`.
157+
fn poll_read(
158+
mut self: Pin<&mut Self>,
159+
cx: &mut Context<'_>,
160+
buf: &mut [u8],
161+
) -> Poll<io::Result<usize>> {
162+
let Self { inner, limit } = &mut *self;
163+
take_read_internal(Pin::new(inner), cx, buf, limit)
164+
}
165+
}
166+
167+
pub fn take_read_internal<R: Read + ?Sized>(
168+
mut rd: Pin<&mut R>,
169+
cx: &mut Context<'_>,
170+
buf: &mut [u8],
171+
limit: &mut u64,
172+
) -> Poll<io::Result<usize>> {
173+
// Don't call into inner reader at all at EOF because it may still block
174+
if *limit == 0 {
175+
return Poll::Ready(Ok(0));
176+
}
177+
178+
let max = cmp::min(buf.len() as u64, *limit) as usize;
179+
180+
match futures_core::ready!(rd.as_mut().poll_read(cx, &mut buf[..max])) {
181+
Ok(n) => {
182+
*limit -= n as u64;
183+
Poll::Ready(Ok(n))
184+
}
185+
Err(e) => Poll::Ready(Err(e)),
186+
}
187+
}
188+
189+
#[cfg(test)]
190+
mod tests {
191+
use crate::io;
192+
use crate::prelude::*;
193+
use crate::task;
194+
195+
#[test]
196+
fn test_take_basics() -> std::io::Result<()> {
197+
let source: io::Cursor<Vec<u8>> = io::Cursor::new(vec![0, 1, 2, 3, 4, 5, 6, 7, 8]);
198+
199+
task::block_on(async move {
200+
let mut buffer = [0u8; 5];
201+
202+
// read at most five bytes
203+
let mut handle = source.take(5);
204+
205+
handle.read(&mut buffer).await?;
206+
assert_eq!(buffer, [0, 1, 2, 3, 4]);
207+
208+
// check that the we are actually at the end
209+
assert_eq!(handle.read(&mut buffer).await.unwrap(), 0);
210+
211+
Ok(())
212+
})
213+
}
214+
}

0 commit comments

Comments
 (0)