Skip to content

Make futures default and tokio optional #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 3, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ jobs:
run: cargo clippy --verbose

- name: Run tests
run: cargo test --verbose
run: cargo test --verbose --all-features
10 changes: 7 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
name = "async-pipe"
version = "0.1.3"
description = "Creates an asynchronous piped reader and writer pair using tokio.rs"
homepage = "https://github.com/rousan/async-pipe-rs"
repository = "https://github.com/rousan/async-pipe-rs"
homepage = "https://github.com/routerify/async-pipe-rs"
repository = "https://github.com/routerify/async-pipe-rs"
keywords = ["pipe", "future", "async", "reader", "writer"]
categories = ["asynchronous"]
authors = ["Rousan Ali <rousanali786@gmail.com>"]
Expand All @@ -12,7 +12,7 @@ license = "MIT"
edition = "2018"

[features]
default = ["tokio"]
default = ["futures"]

[dependencies]
tokio = { version = "1", features= [], optional = true }
Expand All @@ -24,3 +24,7 @@ tokio = { version = "1", features = ["full"] }

[package.metadata.docs.rs]
features = ["futures", "tokio"]

[[example]]
name = "tokio"
required-features = ["tokio"]
File renamed without changes.
129 changes: 66 additions & 63 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//!
//! # Examples
//!
//! ```
//! ```ignore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of ignoring this example you can use the futures traits

diff --git a/src/lib.rs b/src/lib.rs
index 8593b93..345f539 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -7,3 +7,3 @@
 //! use async_pipe;
-//! use tokio::io::{AsyncWriteExt, AsyncReadExt};
+//! use futures::io::{AsyncWriteExt, AsyncReadExt};
 //!

Then the example compiles with the default flags.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, thanks!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, the futures traits do not play well with tokio which is still used in the doc test for spawning and executing. On macOS things are fine though, but on Linux the doc test never finishes (https://github.com/routerify/async-pipe-rs/actions/runs/1093123889).

I've tried to play with cfg attrs and conditional compilation, but unsuccessfully, and honestly it's not really worth the trouble. I'm just gonna revert back to tokio and because our CI uses cargo test --all-features it's actually fine.

//! # async fn run() {
//! use async_pipe;
//! use tokio::io::{AsyncWriteExt, AsyncReadExt};
Expand All @@ -24,8 +24,8 @@
//!
//! # Featues
//!
//! * `tokio` (default) Implement `AsyncWrite` and `AsyncRead` from `tokio::io`.
//! * `futures` Implement `AsyncWrite` and `AsyncRead` from `futures::io`
//! * `futures` (default) Implement `AsyncWrite` and `AsyncRead` from `futures::io`
//! * `tokio` Implement `AsyncWrite` and `AsyncRead` from `tokio::io`.

use state::State;
use std::sync::{Arc, Mutex};
Expand All @@ -37,7 +37,7 @@ mod reader;
mod state;
mod writer;

/// Creates a piped pair of an [`AsyncWrite`](https://docs.rs/tokio/0.2.16/tokio/io/trait.AsyncWrite.html) and an [`AsyncRead`](https://docs.rs/tokio/0.2.15/tokio/io/trait.AsyncRead.html).
/// Creates a piped pair of an [`AsyncWrite`](https://docs.rs/tokio/1.9.0/tokio/io/trait.AsyncWrite.html) and an [`AsyncRead`](https://docs.rs/tokio/1.9.0/tokio/io/trait.AsyncRead.html).
pub fn pipe() -> (PipeWriter, PipeReader) {
let shared_state = Arc::new(Mutex::new(State {
reader_waker: None,
Expand All @@ -59,64 +59,67 @@ pub fn pipe() -> (PipeWriter, PipeReader) {

#[cfg(test)]
mod test {
use super::*;
use std::io;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[tokio::test]
async fn read_write() {
let (mut writer, mut reader) = pipe();
let data = b"hello world";

let write_handle = tokio::spawn(async move {
writer.write_all(data).await.unwrap();
});

let mut read_buf = Vec::new();
reader.read_to_end(&mut read_buf).await.unwrap();
write_handle.await.unwrap();

assert_eq!(&read_buf, data);
}

#[tokio::test]
async fn eof_when_writer_is_shutdown() {
let (mut writer, mut reader) = pipe();
writer.shutdown().await.unwrap();
let mut buf = [0u8; 8];
let bytes_read = reader.read(&mut buf).await.unwrap();
assert_eq!(bytes_read, 0);
}

#[tokio::test]
async fn broken_pipe_when_reader_is_dropped() {
let (mut writer, reader) = pipe();
drop(reader);
let io_error = writer.write_all(&[0u8; 8]).await.unwrap_err();
assert_eq!(io_error.kind(), io::ErrorKind::BrokenPipe);
}

#[tokio::test]
async fn eof_when_writer_is_dropped() {
let (writer, mut reader) = pipe();
drop(writer);
let mut buf = [0u8; 8];
let bytes_read = reader.read(&mut buf).await.unwrap();
assert_eq!(bytes_read, 0);
}

#[tokio::test]
async fn drop_read_exact() {
let (mut writer, mut reader) = pipe();
const BUF_SIZE: usize = 8;

let write_handle = tokio::spawn(async move {
writer.write_all(&[0u8; BUF_SIZE]).await.unwrap();
});

let mut buf = [0u8; BUF_SIZE];
reader.read_exact(&mut buf).await.unwrap();
drop(reader);
write_handle.await.unwrap();
#[cfg(feature = "tokio")]
mod test_tokio {
use crate::*;
use std::io;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[tokio::test]
async fn read_write() {
let (mut writer, mut reader) = pipe();
let data = b"hello world";

let write_handle = tokio::spawn(async move {
writer.write_all(data).await.unwrap();
});

let mut read_buf = Vec::new();
reader.read_to_end(&mut read_buf).await.unwrap();
write_handle.await.unwrap();

assert_eq!(&read_buf, data);
}

#[tokio::test]
async fn eof_when_writer_is_shutdown() {
let (mut writer, mut reader) = pipe();
writer.shutdown().await.unwrap();
let mut buf = [0u8; 8];
let bytes_read = reader.read(&mut buf).await.unwrap();
assert_eq!(bytes_read, 0);
}

#[tokio::test]
async fn broken_pipe_when_reader_is_dropped() {
let (mut writer, reader) = pipe();
drop(reader);
let io_error = writer.write_all(&[0u8; 8]).await.unwrap_err();
assert_eq!(io_error.kind(), io::ErrorKind::BrokenPipe);
}

#[tokio::test]
async fn eof_when_writer_is_dropped() {
let (writer, mut reader) = pipe();
drop(writer);
let mut buf = [0u8; 8];
let bytes_read = reader.read(&mut buf).await.unwrap();
assert_eq!(bytes_read, 0);
}

#[tokio::test]
async fn drop_read_exact() {
let (mut writer, mut reader) = pipe();
const BUF_SIZE: usize = 8;

let write_handle = tokio::spawn(async move {
writer.write_all(&[0u8; BUF_SIZE]).await.unwrap();
});

let mut buf = [0u8; BUF_SIZE];
reader.read_exact(&mut buf).await.unwrap();
drop(reader);
write_handle.await.unwrap();
}
}
}