Skip to content

Commit bd61ac6

Browse files
committed
feat: implement IMAP COMPRESS
1 parent 1954ce4 commit bd61ac6

File tree

5 files changed

+194
-2
lines changed

5 files changed

+194
-2
lines changed

.github/workflows/ci.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,12 @@ jobs:
3131
- name: check tokio
3232
run: cargo check --workspace --all-targets --no-default-features --features runtime-tokio
3333

34+
- name: check compress feature with tokio
35+
run: cargo check --workspace --all-targets --no-default-features --features runtime-tokio,compress
36+
37+
- name: check compress feature with async-std
38+
run: cargo check --workspace --all-targets --no-default-features --features runtime-async-std,compress
39+
3440
- name: check async-std examples
3541
working-directory: examples
3642
run: cargo check --workspace --all-targets --no-default-features --features runtime-async-std

Cargo.toml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@ is-it-maintained-open-issues = { repository = "async-email/async-imap" }
2020

2121
[features]
2222
default = ["runtime-async-std"]
23+
compress = ["async-compression"]
2324

24-
runtime-async-std = ["async-std"]
25-
runtime-tokio = ["tokio"]
25+
runtime-async-std = ["async-std", "async-compression?/futures-io"]
26+
runtime-tokio = ["tokio", "async-compression?/tokio"]
2627

2728
[dependencies]
2829
async-channel = "2.0.0"
30+
async-compression = { git = "https://github.com/link2xt/async-compression.git", default-features = false, features = ["deflate"], optional = true, branch = "link2xt/miniz_oxide-consumes-all-input" }
2931
async-std = { version = "1.8.0", default-features = false, features = ["std", "unstable"], optional = true }
3032
base64 = "0.21"
3133
bytes = "1"
@@ -35,6 +37,7 @@ imap-proto = "0.16.4"
3537
log = "0.4.8"
3638
nom = "7.0"
3739
once_cell = "1.8.0"
40+
pin-project = "1"
3841
pin-utils = "0.1.0-alpha.4"
3942
self_cell = "1.0.1"
4043
stop-token = "0.7"

src/extensions/compress.rs

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
//! IMAP COMPRESS extension specified in [RFC4978](https://www.rfc-editor.org/rfc/rfc4978.html).
2+
3+
use std::fmt;
4+
use std::pin::Pin;
5+
use std::task::{Context, Poll};
6+
7+
use pin_project::pin_project;
8+
9+
use crate::client::Session;
10+
use crate::error::Result;
11+
use crate::imap_stream::ImapStream;
12+
use crate::types::IdGenerator;
13+
use crate::Connection;
14+
15+
#[cfg(feature = "runtime-async-std")]
16+
use async_std::io::{Read, Write};
17+
#[cfg(feature = "runtime-tokio")]
18+
use tokio::io::{AsyncRead as Read, AsyncWrite as Write, ReadBuf};
19+
20+
#[cfg(feature = "runtime-async-std")]
21+
use futures::AsyncReadExt;
22+
23+
#[cfg(feature = "runtime-tokio")]
24+
use async_compression::tokio::bufread::DeflateDecoder;
25+
#[cfg(feature = "runtime-tokio")]
26+
use async_compression::tokio::write::DeflateEncoder;
27+
28+
#[cfg(feature = "runtime-async-std")]
29+
use async_compression::futures::bufread::DeflateDecoder;
30+
#[cfg(feature = "runtime-async-std")]
31+
use async_compression::futures::write::DeflateEncoder;
32+
33+
/// IMAP stream
34+
#[derive(Debug)]
35+
#[pin_project]
36+
pub struct DeflateStream<T: Read + Write + Unpin + fmt::Debug> {
37+
#[cfg(feature = "runtime-tokio")]
38+
#[pin]
39+
decoder: DeflateDecoder<tokio::io::BufReader<tokio::io::ReadHalf<T>>>,
40+
41+
#[cfg(feature = "runtime-tokio")]
42+
#[pin]
43+
encoder: DeflateEncoder<tokio::io::WriteHalf<T>>,
44+
45+
#[cfg(feature = "runtime-async-std")]
46+
#[pin]
47+
decoder: DeflateDecoder<async_std::io::BufReader<futures::io::ReadHalf<T>>>,
48+
49+
#[cfg(feature = "runtime-async-std")]
50+
#[pin]
51+
encoder: DeflateEncoder<futures::io::WriteHalf<T>>,
52+
}
53+
54+
#[cfg(feature = "runtime-tokio")]
55+
impl<T: Read + Write + Unpin + fmt::Debug> DeflateStream<T> {
56+
pub(crate) fn new(stream: T) -> Self {
57+
let (read_half, write_half) = tokio::io::split(stream);
58+
let read_half = tokio::io::BufReader::new(read_half);
59+
let decoder = DeflateDecoder::new(read_half);
60+
let encoder = DeflateEncoder::new(write_half);
61+
Self { decoder, encoder }
62+
}
63+
}
64+
65+
#[cfg(feature = "runtime-async-std")]
66+
impl<T: Read + Write + Unpin + fmt::Debug> DeflateStream<T> {
67+
pub(crate) fn new(stream: T) -> Self {
68+
let (read_half, write_half) = stream.split();
69+
let read_half = async_std::io::BufReader::new(read_half);
70+
let decoder = DeflateDecoder::new(read_half);
71+
let encoder = DeflateEncoder::new(write_half);
72+
Self { decoder, encoder }
73+
}
74+
}
75+
76+
#[cfg(feature = "runtime-tokio")]
77+
impl<T: Read + Write + Unpin + fmt::Debug> Read for DeflateStream<T> {
78+
fn poll_read(
79+
self: Pin<&mut Self>,
80+
cx: &mut Context<'_>,
81+
buf: &mut ReadBuf<'_>,
82+
) -> Poll<std::io::Result<()>> {
83+
self.project().decoder.poll_read(cx, buf)
84+
}
85+
}
86+
87+
#[cfg(feature = "runtime-async-std")]
88+
impl<T: Read + Write + Unpin + fmt::Debug> Read for DeflateStream<T> {
89+
fn poll_read(
90+
self: Pin<&mut Self>,
91+
cx: &mut Context<'_>,
92+
buf: &mut [u8],
93+
) -> Poll<async_std::io::Result<usize>> {
94+
self.project().decoder.poll_read(cx, buf)
95+
}
96+
}
97+
98+
#[cfg(feature = "runtime-tokio")]
99+
impl<T: Read + Write + Unpin + fmt::Debug> Write for DeflateStream<T> {
100+
fn poll_write(
101+
self: Pin<&mut Self>,
102+
cx: &mut std::task::Context<'_>,
103+
buf: &[u8],
104+
) -> Poll<std::io::Result<usize>> {
105+
self.project().encoder.poll_write(cx, buf)
106+
}
107+
108+
fn poll_flush(
109+
self: Pin<&mut Self>,
110+
cx: &mut std::task::Context<'_>,
111+
) -> Poll<std::io::Result<()>> {
112+
self.project().encoder.poll_flush(cx)
113+
}
114+
115+
fn poll_shutdown(
116+
self: Pin<&mut Self>,
117+
cx: &mut std::task::Context<'_>,
118+
) -> Poll<std::io::Result<()>> {
119+
self.project().encoder.poll_shutdown(cx)
120+
}
121+
}
122+
123+
#[cfg(feature = "runtime-async-std")]
124+
impl<T: Read + Write + Unpin + fmt::Debug> Write for DeflateStream<T> {
125+
fn poll_write(
126+
self: Pin<&mut Self>,
127+
cx: &mut std::task::Context<'_>,
128+
buf: &[u8],
129+
) -> Poll<async_std::io::Result<usize>> {
130+
self.project().encoder.poll_write(cx, buf)
131+
}
132+
133+
fn poll_flush(
134+
self: Pin<&mut Self>,
135+
cx: &mut std::task::Context<'_>,
136+
) -> Poll<async_std::io::Result<()>> {
137+
self.project().encoder.poll_flush(cx)
138+
}
139+
140+
fn poll_close(
141+
self: Pin<&mut Self>,
142+
cx: &mut std::task::Context<'_>,
143+
) -> Poll<async_std::io::Result<()>> {
144+
self.project().encoder.poll_close(cx)
145+
}
146+
}
147+
148+
impl<T: Read + Write + Unpin + fmt::Debug + Send> Session<T> {
149+
/// Runs `COMPRESS DEFLATE` command.
150+
pub async fn compress<F, S>(self, f: F) -> Result<Session<S>>
151+
where
152+
S: Read + Write + Unpin + fmt::Debug,
153+
F: FnOnce(DeflateStream<T>) -> S,
154+
{
155+
let Self {
156+
mut conn,
157+
unsolicited_responses_tx,
158+
unsolicited_responses,
159+
} = self;
160+
conn.run_command_and_check_ok("COMPRESS DEFLATE", Some(unsolicited_responses_tx.clone()))
161+
.await?;
162+
163+
let stream = conn.into_inner();
164+
let deflate_stream = DeflateStream::new(stream);
165+
let stream = ImapStream::new(f(deflate_stream));
166+
let conn = Connection {
167+
stream,
168+
request_ids: IdGenerator::new(),
169+
};
170+
let session = Session {
171+
conn,
172+
unsolicited_responses_tx,
173+
unsolicited_responses,
174+
};
175+
Ok(session)
176+
}
177+
}

src/extensions/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
//! Implementations of various IMAP extensions.
2+
#[cfg(feature = "compress")]
3+
pub mod compress;
4+
25
pub mod idle;
36

47
pub mod quota;

src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,9 @@ mod imap_stream;
9595
mod parse;
9696
pub mod types;
9797

98+
#[cfg(feature = "compress")]
99+
pub use crate::extensions::compress::DeflateStream;
100+
98101
pub use crate::authenticator::Authenticator;
99102
pub use crate::client::*;
100103

0 commit comments

Comments
 (0)