Skip to content

Commit 859b420

Browse files
authored
Merge pull request routerify#8 from seanpianka/master
Update to Tokio 1.0
2 parents 14aed75 + 6988d69 commit 859b420

File tree

4 files changed

+36
-14
lines changed

4 files changed

+36
-14
lines changed

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ license = "MIT"
1212
edition = "2018"
1313

1414
[dependencies]
15-
tokio = { version = "0.2", features= [] }
15+
tokio = { version = "1", features= [] }
1616
log = "0.4"
1717

1818
[dev-dependencies]
19-
tokio = { version = "0.2", features = ["full"] }
19+
tokio = { version = "1", features = ["full"] }

examples/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use async_pipe;
2-
use tokio::prelude::*;
2+
use tokio::io::{AsyncWriteExt, AsyncReadExt};
33

44
#[tokio::main]
55
async fn main() {

src/lib.rs

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
//! ```
66
//! # async fn run() {
77
//! use async_pipe;
8-
//! use tokio::prelude::*;
8+
//! use tokio::io::{AsyncWriteExt, AsyncReadExt};
99
//!
1010
//! let (mut w, mut r) = async_pipe::pipe();
1111
//!
@@ -45,12 +45,34 @@ pub fn pipe() -> (PipeWriter, PipeReader) {
4545
}));
4646

4747
let w = PipeWriter {
48-
state: Arc::clone(&shared_state),
48+
state: shared_state.clone()
4949
};
5050

5151
let r = PipeReader {
52-
state: Arc::clone(&shared_state),
52+
state: shared_state.clone(),
5353
};
5454

5555
(w, r)
5656
}
57+
58+
#[cfg(test)]
59+
mod tests {
60+
use super::*;
61+
use tokio::io::{AsyncWriteExt, AsyncReadExt};
62+
63+
#[tokio::test]
64+
async fn should_read_expected_text() {
65+
const EXPECTED: &'static str = "hello world";
66+
67+
let (mut w, mut r) = pipe();
68+
69+
tokio::spawn(async move {
70+
w.write_all(EXPECTED.as_bytes()).await.unwrap();
71+
});
72+
73+
let mut v = Vec::new();
74+
r.read_to_end(&mut v).await.unwrap();
75+
let actual = String::from_utf8(v).unwrap();
76+
assert_eq!(EXPECTED, actual.as_str());
77+
}
78+
}

src/reader.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::pin::Pin;
33
use std::ptr;
44
use std::sync::{Arc, Mutex};
55
use std::task::{Context, Poll};
6-
use tokio::io::{self, AsyncRead};
6+
use tokio::io::{self, AsyncRead, ReadBuf};
77

88
/// The read half of the pipe which implements [`AsyncRead`](https://docs.rs/tokio/0.2.15/tokio/io/trait.AsyncRead.html).
99
pub struct PipeReader {
@@ -55,10 +55,10 @@ impl PipeReader {
5555
}
5656
}
5757

58-
fn copy_data_into_buffer(&self, data: &Data, buf: &mut [u8]) -> usize {
59-
let len = data.len.min(buf.len());
58+
fn copy_data_into_buffer(&self, data: &Data, buf: &mut ReadBuf) -> usize {
59+
let len = data.len.min(buf.capacity());
6060
unsafe {
61-
ptr::copy_nonoverlapping(data.ptr, buf.as_mut_ptr(), len);
61+
ptr::copy_nonoverlapping(data.ptr, buf.initialize_unfilled().as_mut_ptr(), len);
6262
}
6363
len
6464
}
@@ -80,8 +80,8 @@ impl AsyncRead for PipeReader {
8080
fn poll_read(
8181
self: Pin<&mut Self>,
8282
cx: &mut Context,
83-
buf: &mut [u8],
84-
) -> Poll<io::Result<usize>> {
83+
buf: &mut ReadBuf,
84+
) -> Poll<io::Result<()>> {
8585
let mut state;
8686
match self.state.lock() {
8787
Ok(s) => state = s,
@@ -98,7 +98,7 @@ impl AsyncRead for PipeReader {
9898
}
9999

100100
if state.closed {
101-
return Poll::Ready(Ok(0));
101+
return Poll::Ready(Ok(()));
102102
}
103103

104104
return if state.done_cycle {
@@ -115,7 +115,7 @@ impl AsyncRead for PipeReader {
115115

116116
self.wake_writer_half(&*state);
117117

118-
Poll::Ready(Ok(copied_bytes_len))
118+
Poll::Ready(Ok(()))
119119
} else {
120120
state.reader_waker = Some(cx.waker().clone());
121121
Poll::Pending

0 commit comments

Comments
 (0)