Skip to content

Commit 4a749ba

Browse files
Thomas Scholtesgeigerzaehler
Thomas Scholtes
authored andcommitted
Fix panics and unsafe code
This change fixes panics (routerify#6) and unsafe code (routerify#5). This comes at the cost of an additional copy of the data send through the pipe and having a buffer in the state. All unsafe code is removed and the need for a custom `Drop` implementation which makes the code overall easier. We also provide an implementation for traits from `futures` which is behind a feature flag. We also add tests.
1 parent 859b420 commit 4a749ba

File tree

7 files changed

+213
-146
lines changed

7 files changed

+213
-146
lines changed

Cargo.toml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,16 @@ readme = "README.md"
1111
license = "MIT"
1212
edition = "2018"
1313

14+
[features]
15+
default = ["tokio"]
16+
1417
[dependencies]
15-
tokio = { version = "1", features= [] }
18+
tokio = { version = "1", features= [], optional = true }
1619
log = "0.4"
20+
futures = { version = "0.3", optional = true }
1721

1822
[dev-dependencies]
1923
tokio = { version = "1", features = ["full"] }
24+
25+
[package.metadata.docs.rs]
26+
features = ["futures", "tokio"]

README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
[![Documentation](https://docs.rs/async-pipe/badge.svg)](https://docs.rs/async-pipe)
55
[![MIT](https://img.shields.io/crates/l/async-pipe.svg)](./LICENSE)
66

7-
Creates an asynchronous piped reader and writer pair using `tokio.rs`.
7+
Creates an asynchronous piped reader and writer pair using `tokio.rs` or
8+
`futures`
89

910
[Docs](https://docs.rs/async-pipe)
1011

@@ -38,4 +39,4 @@ async fn main() {
3839

3940
## Contributing
4041

41-
Your PRs and stars are always welcome.
42+
Your PRs and stars are always welcome.

examples/main.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,10 @@
1-
use async_pipe;
21
use tokio::io::{AsyncWriteExt, AsyncReadExt};
32

43
#[tokio::main]
54
async fn main() {
65
let (mut w, mut r) = async_pipe::pipe();
76

8-
tokio::spawn(async move {
9-
w.write_all(b"hello world").await.unwrap();
10-
});
7+
let _ = w.write_all(b"hello world").await;
118

129
let mut v = Vec::new();
1310
r.read_to_end(&mut v).await.unwrap();

src/lib.rs

Lines changed: 63 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
//! Creates an asynchronous piped reader and writer pair using `tokio.rs`.
1+
//! Creates an asynchronous piped reader and writer pair using `tokio.rs` and `futures`.
22
//!
33
//! # Examples
44
//!
@@ -21,6 +21,11 @@
2121
//!
2222
//! tokio::runtime::Runtime::new().unwrap().block_on(run());
2323
//! ```
24+
//!
25+
//! # Featues
26+
//!
27+
//! * `tokio` (default) Implement `AsyncWrite` and `AsyncRead` from `tokio::io`.
28+
//! * `futures` Implement `AsyncWrite` and `AsyncRead` from `futures::io`
2429
2530
use state::State;
2631
use std::sync::{Arc, Mutex};
@@ -37,42 +42,81 @@ pub fn pipe() -> (PipeWriter, PipeReader) {
3742
let shared_state = Arc::new(Mutex::new(State {
3843
reader_waker: None,
3944
writer_waker: None,
40-
data: None,
41-
done_reading: false,
42-
read: 0,
43-
done_cycle: true,
4445
closed: false,
46+
buffer: Vec::new(),
4547
}));
4648

4749
let w = PipeWriter {
48-
state: shared_state.clone()
50+
state: shared_state.clone(),
4951
};
5052

5153
let r = PipeReader {
52-
state: shared_state.clone(),
54+
state: shared_state,
5355
};
5456

5557
(w, r)
5658
}
5759

5860
#[cfg(test)]
59-
mod tests {
60-
use super::*;
61-
use tokio::io::{AsyncWriteExt, AsyncReadExt};
61+
mod test {
62+
use super::pipe;
63+
use std::io;
64+
use tokio::io::{AsyncReadExt, AsyncWriteExt};
6265

6366
#[tokio::test]
64-
async fn should_read_expected_text() {
65-
const EXPECTED: &'static str = "hello world";
67+
async fn read_write() {
68+
let (mut writer, mut reader) = pipe();
69+
let data = b"hello world";
70+
71+
let write_handle = tokio::spawn(async move {
72+
writer.write_all(data).await.unwrap();
73+
});
74+
75+
let mut read_buf = Vec::new();
76+
reader.read_to_end(&mut read_buf).await.unwrap();
77+
write_handle.await.unwrap();
6678

67-
let (mut w, mut r) = pipe();
79+
assert_eq!(&read_buf, data);
80+
}
81+
82+
#[tokio::test]
83+
async fn eof_when_writer_is_shutdown() {
84+
let (mut writer, mut reader) = pipe();
85+
writer.shutdown().await.unwrap();
86+
let mut buf = [0u8; 8];
87+
let bytes_read = reader.read(&mut buf).await.unwrap();
88+
assert_eq!(bytes_read, 0);
89+
}
90+
91+
#[tokio::test]
92+
async fn broken_pipe_when_reader_is_dropped() {
93+
let (mut writer, reader) = pipe();
94+
drop(reader);
95+
let io_error = writer.write_all(&[0u8; 8]).await.unwrap_err();
96+
assert_eq!(io_error.kind(), io::ErrorKind::BrokenPipe);
97+
}
98+
99+
#[tokio::test]
100+
async fn eof_when_writer_is_dropped() {
101+
let (writer, mut reader) = pipe();
102+
drop(writer);
103+
let mut buf = [0u8; 8];
104+
let bytes_read = reader.read(&mut buf).await.unwrap();
105+
assert_eq!(bytes_read, 0);
106+
}
107+
108+
#[tokio::test]
109+
async fn drop_read_exact() {
110+
let (mut writer, mut reader) = pipe();
111+
const BUF_SIZE: usize = 8;
68112

69-
tokio::spawn(async move {
70-
w.write_all(EXPECTED.as_bytes()).await.unwrap();
113+
let write_handle = tokio::spawn(async move {
114+
writer.write_all(&[0u8; BUF_SIZE]).await.unwrap();
71115
});
72116

73-
let mut v = Vec::new();
74-
r.read_to_end(&mut v).await.unwrap();
75-
let actual = String::from_utf8(v).unwrap();
76-
assert_eq!(EXPECTED, actual.as_str());
117+
let mut buf = [0u8; BUF_SIZE];
118+
reader.read_exact(&mut buf).await.unwrap();
119+
drop(reader);
120+
write_handle.await.unwrap();
77121
}
78122
}

src/reader.rs

Lines changed: 54 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,17 @@
1-
use crate::state::{Data, State};
1+
use crate::state::State;
2+
use std::io;
23
use std::pin::Pin;
3-
use std::ptr;
44
use std::sync::{Arc, Mutex};
55
use std::task::{Context, Poll};
6-
use tokio::io::{self, AsyncRead, ReadBuf};
76

8-
/// The read half of the pipe which implements [`AsyncRead`](https://docs.rs/tokio/0.2.15/tokio/io/trait.AsyncRead.html).
7+
/// The read half of the pipe
8+
///
9+
/// Implements [`tokio::io::AsyncRead`][tokio-async-read] when feature `tokio` is enabled (the
10+
/// default). Implements [`futures::io::AsyncRead`][futures-async-read] when feature `futures` is
11+
/// enabled.
12+
///
13+
/// [futures-async-read]: https://docs.rs/futures/0.3.16/futures/io/trait.AsyncRead.html
14+
/// [tokio-async-read]: https://docs.rs/tokio/1.9.0/tokio/io/trait.AsyncRead.html
915
pub struct PipeReader {
1016
pub(crate) state: Arc<Mutex<State>>,
1117
}
@@ -46,7 +52,7 @@ impl PipeReader {
4652
}
4753
};
4854

49-
Ok(state.done_cycle)
55+
Ok(state.buffer.is_empty())
5056
}
5157

5258
fn wake_writer_half(&self, state: &State) {
@@ -55,36 +61,13 @@ impl PipeReader {
5561
}
5662
}
5763

58-
fn copy_data_into_buffer(&self, data: &Data, buf: &mut ReadBuf) -> usize {
59-
let len = data.len.min(buf.capacity());
60-
unsafe {
61-
ptr::copy_nonoverlapping(data.ptr, buf.initialize_unfilled().as_mut_ptr(), len);
62-
}
63-
len
64-
}
65-
}
66-
67-
impl Drop for PipeReader {
68-
fn drop(&mut self) {
69-
if let Err(err) = self.close() {
70-
log::warn!(
71-
"{}: PipeReader: Failed to close the channel on drop: {}",
72-
env!("CARGO_PKG_NAME"),
73-
err
74-
);
75-
}
76-
}
77-
}
78-
79-
impl AsyncRead for PipeReader {
8064
fn poll_read(
8165
self: Pin<&mut Self>,
8266
cx: &mut Context,
83-
buf: &mut ReadBuf,
84-
) -> Poll<io::Result<()>> {
85-
let mut state;
86-
match self.state.lock() {
87-
Ok(s) => state = s,
67+
buf: &mut [u8],
68+
) -> Poll<io::Result<usize>> {
69+
let mut state = match self.state.lock() {
70+
Ok(s) => s,
8871
Err(err) => {
8972
return Poll::Ready(Err(io::Error::new(
9073
io::ErrorKind::Other,
@@ -95,31 +78,49 @@ impl AsyncRead for PipeReader {
9578
),
9679
)))
9780
}
98-
}
99-
100-
if state.closed {
101-
return Poll::Ready(Ok(()));
102-
}
103-
104-
return if state.done_cycle {
105-
state.reader_waker = Some(cx.waker().clone());
106-
Poll::Pending
107-
} else {
108-
if let Some(ref data) = state.data {
109-
let copied_bytes_len = self.copy_data_into_buffer(data, buf);
110-
111-
state.data = None;
112-
state.read = copied_bytes_len;
113-
state.done_reading = true;
114-
state.reader_waker = None;
115-
116-
self.wake_writer_half(&*state);
81+
};
11782

118-
Poll::Ready(Ok(()))
83+
if state.buffer.is_empty() {
84+
if state.closed || Arc::strong_count(&self.state) == 1 {
85+
Poll::Ready(Ok(0))
11986
} else {
87+
self.wake_writer_half(&*state);
12088
state.reader_waker = Some(cx.waker().clone());
12189
Poll::Pending
12290
}
123-
};
91+
} else {
92+
self.wake_writer_half(&*state);
93+
let size_to_read = state.buffer.len().min(buf.len());
94+
let (to_read, rest) = state.buffer.split_at(size_to_read);
95+
buf[..size_to_read].copy_from_slice(to_read);
96+
state.buffer = rest.to_vec();
97+
98+
Poll::Ready(Ok(size_to_read))
99+
}
100+
}
101+
}
102+
103+
#[cfg(feature = "tokio")]
104+
impl tokio::io::AsyncRead for PipeReader {
105+
fn poll_read(
106+
self: Pin<&mut Self>,
107+
cx: &mut Context,
108+
buf: &mut tokio::io::ReadBuf,
109+
) -> Poll<io::Result<()>> {
110+
let dst = buf.initialize_unfilled();
111+
self.poll_read(cx, dst).map_ok(|read| {
112+
buf.advance(read);
113+
})
114+
}
115+
}
116+
117+
#[cfg(feature = "futures")]
118+
impl futures::io::AsyncRead for PipeReader {
119+
fn poll_read(
120+
self: Pin<&mut Self>,
121+
cx: &mut Context,
122+
buf: &mut [u8],
123+
) -> Poll<io::Result<usize>> {
124+
self.poll_read(cx, buf)
124125
}
125126
}

src/state.rs

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,10 @@
11
use std::task::Waker;
22

3+
pub const BUFFER_SIZE: usize = 1024;
4+
35
pub(crate) struct State {
46
pub(crate) reader_waker: Option<Waker>,
57
pub(crate) writer_waker: Option<Waker>,
6-
pub(crate) data: Option<Data>,
7-
pub(crate) done_reading: bool,
8-
pub(crate) read: usize,
9-
pub(crate) done_cycle: bool,
108
pub(crate) closed: bool,
9+
pub(crate) buffer: Vec<u8>,
1110
}
12-
13-
pub(crate) struct Data {
14-
pub(crate) ptr: *const u8,
15-
pub(crate) len: usize,
16-
}
17-
18-
unsafe impl Send for Data {}

0 commit comments

Comments
 (0)