Skip to content

Commit 74c0e25

Browse files
author
Thomas Scholtes
committed
Implement Async{Read,Write} from futures
We implement `AsyncRead` and `AsyncWrite` from `futures::io` for `PipeReader` and `PipeWriter`, respectively. This implementation is behind a feature flag.
1 parent 14aed75 commit 74c0e25

File tree

6 files changed

+114
-39
lines changed

6 files changed

+114
-39
lines changed

Cargo.toml

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,18 @@ readme = "README.md"
1111
license = "MIT"
1212
edition = "2018"
1313

14+
[features]
15+
default = ["tokio"]
16+
1417
[dependencies]
15-
tokio = { version = "0.2", features= [] }
18+
tokio = { version = "0.2", features= [], optional = true }
1619
log = "0.4"
20+
futures = { version = "0.3", optional = true }
1721

1822
[dev-dependencies]
1923
tokio = { version = "0.2", features = ["full"] }
24+
25+
[package.metadata.docs.rs]
26+
features = ["futures"]
27+
28+

README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
[![Documentation](https://docs.rs/async-pipe/badge.svg)](https://docs.rs/async-pipe)
55
[![MIT](https://img.shields.io/crates/l/async-pipe.svg)](./LICENSE)
66

7-
Creates an asynchronous piped reader and writer pair using `tokio.rs`.
7+
Creates an asynchronous piped reader and writer pair using `tokio.rs` or
8+
`futures`
89

910
[Docs](https://docs.rs/async-pipe)
1011

@@ -38,4 +39,4 @@ async fn main() {
3839

3940
## Contributing
4041

41-
Your PRs and stars are always welcome.
42+
Your PRs and stars are always welcome.

examples/main.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,7 @@ use tokio::prelude::*;
55
async fn main() {
66
let (mut w, mut r) = async_pipe::pipe();
77

8-
tokio::spawn(async move {
9-
w.write_all(b"hello world").await.unwrap();
10-
});
8+
let _ = w.write_all(b"hello world").await;
119

1210
let mut v = Vec::new();
1311
r.read_to_end(&mut v).await.unwrap();

src/lib.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
//! Creates an asynchronous piped reader and writer pair using `tokio.rs`.
1+
//! Creates an asynchronous piped reader and writer pair using `tokio.rs` and `futures`.
22
//!
33
//! # Examples
44
//!
@@ -21,6 +21,11 @@
2121
//!
2222
//! tokio::runtime::Runtime::new().unwrap().block_on(run());
2323
//! ```
24+
//!
25+
//! # Featues
26+
//!
27+
//! * `tokio` (default) Implement `AsyncWrite` and `AsyncRead` from `tokio::io`.
28+
//! * `futures` Implement `AsyncWrite` and `AsyncRead` from `futures::io`
2429
2530
use state::State;
2631
use std::sync::{Arc, Mutex};

src/reader.rs

Lines changed: 43 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,18 @@
11
use crate::state::{Data, State};
2+
use std::io;
23
use std::pin::Pin;
34
use std::ptr;
45
use std::sync::{Arc, Mutex};
56
use std::task::{Context, Poll};
6-
use tokio::io::{self, AsyncRead};
77

8-
/// The read half of the pipe which implements [`AsyncRead`](https://docs.rs/tokio/0.2.15/tokio/io/trait.AsyncRead.html).
8+
/// The read half of the pipe
9+
///
10+
/// Implements [`tokio::io::AsyncRead`][tokio-async-read] when feature `tokio` is enabled (the
11+
/// default). Implements [`futures::io::AsyncRead`][futures-async-read] when feature `futures` is
12+
/// enabled.
13+
///
14+
/// [futures-async-read]: https://docs.rs/futures/0.3.5/futures/io/trait.AsyncRead.html
15+
/// [tokio-async-read]: https://docs.rs/tokio/0.2.16/tokio/io/trait.AsyncRead.html
916
pub struct PipeReader {
1017
pub(crate) state: Arc<Mutex<State>>,
1118
}
@@ -62,21 +69,7 @@ impl PipeReader {
6269
}
6370
len
6471
}
65-
}
6672

67-
impl Drop for PipeReader {
68-
fn drop(&mut self) {
69-
if let Err(err) = self.close() {
70-
log::warn!(
71-
"{}: PipeReader: Failed to close the channel on drop: {}",
72-
env!("CARGO_PKG_NAME"),
73-
err
74-
);
75-
}
76-
}
77-
}
78-
79-
impl AsyncRead for PipeReader {
8073
fn poll_read(
8174
self: Pin<&mut Self>,
8275
cx: &mut Context,
@@ -123,3 +116,37 @@ impl AsyncRead for PipeReader {
123116
};
124117
}
125118
}
119+
120+
impl Drop for PipeReader {
121+
fn drop(&mut self) {
122+
if let Err(err) = self.close() {
123+
log::warn!(
124+
"{}: PipeReader: Failed to close the channel on drop: {}",
125+
env!("CARGO_PKG_NAME"),
126+
err
127+
);
128+
}
129+
}
130+
}
131+
132+
#[cfg(feature = "tokio")]
133+
impl tokio::io::AsyncRead for PipeReader {
134+
fn poll_read(
135+
self: Pin<&mut Self>,
136+
cx: &mut Context,
137+
buf: &mut [u8],
138+
) -> Poll<io::Result<usize>> {
139+
self.poll_read(cx, buf)
140+
}
141+
}
142+
143+
#[cfg(feature = "futures")]
144+
impl futures::io::AsyncRead for PipeReader {
145+
fn poll_read(
146+
self: Pin<&mut Self>,
147+
cx: &mut Context,
148+
buf: &mut [u8],
149+
) -> Poll<io::Result<usize>> {
150+
self.poll_read(cx, buf)
151+
}
152+
}

src/writer.rs

Lines changed: 51 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,18 @@
11
use crate::state::Data;
22
use crate::state::State;
3+
use std::io;
34
use std::pin::Pin;
45
use std::sync::{Arc, Mutex};
56
use std::task::{Context, Poll};
6-
use tokio::io::{self, AsyncWrite};
77

8-
/// The write half of the pipe which implements [`AsyncWrite`](https://docs.rs/tokio/0.2.16/tokio/io/trait.AsyncWrite.html).
8+
/// The write half of the pipe
9+
///
10+
/// Implements [`tokio::io::AsyncWrite`][tokio-async-write] when feature `tokio` is enabled (the
11+
/// default). Implements [`futures::io::AsyncWrite`][futures-async-write] when feature `futures` is
12+
/// enabled.
13+
///
14+
/// [futures-async-write]: https://docs.rs/futures/0.3.5/futures/io/trait.AsyncWrite.html
15+
/// [tokio-async-write]: https://docs.rs/tokio/0.2.16/tokio/io/trait.AsyncWrite.html
916
pub struct PipeWriter {
1017
pub(crate) state: Arc<Mutex<State>>,
1118
}
@@ -54,21 +61,7 @@ impl PipeWriter {
5461
waker.clone().wake();
5562
}
5663
}
57-
}
5864

59-
impl Drop for PipeWriter {
60-
fn drop(&mut self) {
61-
if let Err(err) = self.close() {
62-
log::warn!(
63-
"{}: PipeWriter: Failed to close the channel on drop: {}",
64-
env!("CARGO_PKG_NAME"),
65-
err
66-
);
67-
}
68-
}
69-
}
70-
71-
impl AsyncWrite for PipeWriter {
7265
fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
7366
let mut state;
7467
match self.state.lock() {
@@ -142,3 +135,45 @@ impl AsyncWrite for PipeWriter {
142135
}
143136
}
144137
}
138+
139+
impl Drop for PipeWriter {
140+
fn drop(&mut self) {
141+
if let Err(err) = self.close() {
142+
log::warn!(
143+
"{}: PipeWriter: Failed to close the channel on drop: {}",
144+
env!("CARGO_PKG_NAME"),
145+
err
146+
);
147+
}
148+
}
149+
}
150+
151+
#[cfg(feature = "tokio")]
152+
impl tokio::io::AsyncWrite for PipeWriter {
153+
fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
154+
self.poll_write(cx, buf)
155+
}
156+
157+
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
158+
self.poll_flush(cx)
159+
}
160+
161+
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
162+
self.poll_shutdown(cx)
163+
}
164+
}
165+
166+
#[cfg(feature = "futures")]
167+
impl futures::io::AsyncWrite for PipeWriter {
168+
fn poll_write(self: Pin<&mut Self>, cx: &mut Context, buf: &[u8]) -> Poll<io::Result<usize>> {
169+
self.poll_write(cx, buf)
170+
}
171+
172+
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
173+
self.poll_flush(cx)
174+
}
175+
176+
fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
177+
self.poll_shutdown(cx)
178+
}
179+
}

0 commit comments

Comments
 (0)