Skip to content

Commit 714fa2e

Browse files
authored
Feat: Port forwarding closure (#57)
* Implement port forwarding closure * Fix fmt * Format nits and doc comments * Fix fmt * Test socket accept after closure * Try read instead * `read` returning 0 seems to mean a closed socket * Check `read` 0 for local forwarding as well * Fix fmt
1 parent f78a9ce commit 714fa2e

File tree

3 files changed

+163
-20
lines changed

3 files changed

+163
-20
lines changed

crates/mux-client/src/connection.rs

Lines changed: 149 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -375,11 +375,60 @@ impl Connection {
375375
Ok(())
376376
}
377377

378+
async fn send_close_fwd_request(&mut self, request_id: u32, fwd: &Fwd<'_>) -> Result<()> {
379+
let (fwd_mode, listen_socket, connect_socket) = fwd.as_serializable();
380+
let (listen_addr, listen_port) = listen_socket.as_serializable();
381+
let (connect_addr, connect_port) = connect_socket.as_ref().as_serializable();
382+
383+
let serialized_listen_port = serialize_u32(listen_port);
384+
let serialized_connect_port = serialize_u32(connect_port);
385+
386+
let listen_addr_len: u32 = listen_addr.get_len_as_u32()?;
387+
let connect_addr_len: u32 = connect_addr.get_len_as_u32()?;
388+
389+
let request = Request::CloseFwd {
390+
request_id,
391+
fwd_mode,
392+
};
393+
394+
// Serialize
395+
self.reset_serializer();
396+
397+
request.serialize(&mut self.serializer)?;
398+
let serialized_header = self.serializer.create_header(
399+
// len
400+
4 +
401+
listen_addr_len +
402+
// port
403+
4 +
404+
// len
405+
4 +
406+
connect_addr_len
407+
// port
408+
+ 4,
409+
)?;
410+
411+
let serialized_listen_addr_len = serialize_u32(listen_addr_len);
412+
let serialized_connect_addr_len = serialize_u32(connect_addr_len);
413+
414+
// Write them to self.raw_conn
415+
let mut io_slices = [
416+
IoSlice::new(&serialized_header),
417+
IoSlice::new(&self.serializer.output),
418+
IoSlice::new(&serialized_listen_addr_len),
419+
IoSlice::new(listen_addr.into_inner()),
420+
IoSlice::new(&serialized_listen_port),
421+
IoSlice::new(&serialized_connect_addr_len),
422+
IoSlice::new(connect_addr.into_inner()),
423+
IoSlice::new(&serialized_connect_port),
424+
];
425+
426+
write_vectored_all(&mut self.raw_conn, &mut io_slices).await?;
427+
428+
Ok(())
429+
}
430+
378431
/// Request for local/remote port forwarding.
379-
///
380-
/// # Warning
381-
///
382-
/// Local port forwarding hasn't been tested yet.
383432
pub async fn request_port_forward(
384433
&mut self,
385434
forward_type: ForwardType,
@@ -426,6 +475,53 @@ impl Connection {
426475
}
427476
}
428477

478+
/// Request for local/remote port forwarding closure.
479+
pub async fn close_port_forward(
480+
&mut self,
481+
forward_type: ForwardType,
482+
listen_socket: &Socket<'_>,
483+
connect_socket: &Socket<'_>,
484+
) -> Result<()> {
485+
use ForwardType::*;
486+
use Response::*;
487+
488+
let fwd = match forward_type {
489+
Local => Fwd::Local {
490+
listen_socket,
491+
connect_socket,
492+
},
493+
Remote => Fwd::Remote {
494+
listen_socket,
495+
connect_socket,
496+
},
497+
};
498+
499+
let request_id = self.get_request_id();
500+
self.send_close_fwd_request(request_id, &fwd).await?;
501+
502+
match self.read_response().await? {
503+
Ok { response_id } => Self::check_response_id(request_id, response_id),
504+
PermissionDenied {
505+
response_id,
506+
reason,
507+
} => {
508+
Self::check_response_id(request_id, response_id)?;
509+
Err(Error::PermissionDenied(reason))
510+
}
511+
Failure {
512+
response_id,
513+
reason,
514+
} => {
515+
Self::check_response_id(request_id, response_id)?;
516+
Err(Error::RequestFailure(reason))
517+
}
518+
response => Err(Error::invalid_server_response(
519+
&"Ok, PermissionDenied or Failure",
520+
&response,
521+
)),
522+
}
523+
}
524+
429525
/// **UNTESTED** Return remote port opened for dynamic forwarding.
430526
pub async fn request_dynamic_forward(
431527
&mut self,
@@ -622,26 +718,27 @@ mod tests {
622718
}
623719
run_test!(test_unordered_open_new_session, test_open_new_session_impl);
624720

625-
async fn test_remote_socket_forward_impl(mut conn: Connection) {
721+
async fn test_remote_socket_forward_impl(mut conn0: Connection, mut conn1: Connection) {
626722
let path = Path::new("/tmp/openssh-remote-forward.socket");
627723

628724
let output_listener = TcpListener::bind(("127.0.0.1", 1234)).await.unwrap();
629725

630726
eprintln!("Requesting port forward");
631-
conn.request_port_forward(
632-
ForwardType::Remote,
633-
&Socket::UnixSocket { path: path.into() },
634-
&Socket::TcpSocket {
635-
port: 1234,
636-
host: "127.0.0.1".into(),
637-
},
638-
)
639-
.await
640-
.unwrap();
727+
conn0
728+
.request_port_forward(
729+
ForwardType::Remote,
730+
&Socket::UnixSocket { path: path.into() },
731+
&Socket::TcpSocket {
732+
port: 1234,
733+
host: "127.0.0.1".into(),
734+
},
735+
)
736+
.await
737+
.unwrap();
641738

642739
eprintln!("Creating remote process");
643740
let cmd = format!("/usr/bin/socat OPEN:/data,rdonly UNIX-CONNECT:{:#?}", path);
644-
let (established_session, stdios) = create_remote_process(conn, &cmd).await;
741+
let (established_session, stdios) = create_remote_process(conn0, &cmd).await;
645742

646743
eprintln!("Waiting for connection");
647744
let (mut output, _addr) = output_listener.accept().await.unwrap();
@@ -655,8 +752,24 @@ mod tests {
655752

656753
assert_eq!(DATA, &buffer);
657754

658-
drop(output);
755+
eprintln!("Closing port forward");
756+
conn1
757+
.close_port_forward(
758+
ForwardType::Remote,
759+
&Socket::UnixSocket { path: path.into() },
760+
&Socket::TcpSocket {
761+
port: 1234,
762+
host: "127.0.0.1".into(),
763+
},
764+
)
765+
.await
766+
.unwrap();
767+
768+
eprintln!("Checking whether the forwarded socket is closed");
769+
assert_eq!(output.read(&mut buffer).await.unwrap(), 0);
770+
659771
drop(output_listener);
772+
drop(output);
660773
drop(stdios);
661774

662775
eprintln!("Waiting for session to end");
@@ -667,13 +780,13 @@ mod tests {
667780
if exit_value.unwrap() == 0
668781
);
669782
}
670-
run_test!(
783+
run_test2!(
671784
test_unordered_remote_socket_forward,
672785
test_remote_socket_forward_impl
673786
);
674787

675788
async fn test_local_socket_forward_impl(conn0: Connection, mut conn1: Connection) {
676-
let path = Path::new("/tmp/openssh-local-forward.socket").into();
789+
let path: Cow<'_, _> = Path::new("/tmp/openssh-local-forward.socket").into();
677790

678791
eprintln!("Creating remote process");
679792
let cmd = format!("socat -u OPEN:/data UNIX-LISTEN:{:#?} >/dev/stderr", path);
@@ -689,7 +802,7 @@ mod tests {
689802
port: 1235,
690803
host: "127.0.0.1".into(),
691804
},
692-
&Socket::UnixSocket { path },
805+
&Socket::UnixSocket { path: path.clone() },
693806
)
694807
.await
695808
.unwrap();
@@ -705,6 +818,22 @@ mod tests {
705818

706819
assert_eq!(DATA, buffer);
707820

821+
eprintln!("Closing port forward");
822+
conn1
823+
.close_port_forward(
824+
ForwardType::Local,
825+
&Socket::TcpSocket {
826+
port: 1235,
827+
host: "127.0.0.1".into(),
828+
},
829+
&Socket::UnixSocket { path },
830+
)
831+
.await
832+
.unwrap();
833+
834+
eprintln!("Checking whether the forwarded socket is closed");
835+
assert_eq!(output.read(&mut buffer).await.unwrap(), 0);
836+
708837
drop(output);
709838
drop(stdios);
710839

crates/mux-client/src/constants.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ def_constants!(MUX_MSG_HELLO, 0x00000001);
1010
def_constants!(MUX_C_NEW_SESSION, 0x10000002);
1111
def_constants!(MUX_C_ALIVE_CHECK, 0x10000004);
1212
def_constants!(MUX_C_OPEN_FWD, 0x10000006);
13+
def_constants!(MUX_C_CLOSE_FWD, 0x10000007);
1314
def_constants!(MUX_C_STOP_LISTENING, 0x10000009);
1415
def_constants!(MUX_S_OK, 0x80000001);
1516
def_constants!(MUX_S_PERMISSION_DENIED, 0x80000002);

crates/mux-client/src/request.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ pub(crate) enum Request {
4747
/// `Request::RemotePort`.
4848
OpenFwd { request_id: u32, fwd_mode: u32 },
4949

50+
/// A server may reply with `Response::Ok`, `Response::PermissionDenied`,
51+
/// or `Response::Failure`.
52+
CloseFwd { request_id: u32, fwd_mode: u32 },
53+
5054
/// A client may request the master to stop accepting new multiplexing requests
5155
/// and remove its listener socket.
5256
///
@@ -87,6 +91,15 @@ impl Serialize for Request {
8791
"OpenFwd",
8892
&(*request_id, fwd_mode),
8993
),
94+
CloseFwd {
95+
request_id,
96+
fwd_mode,
97+
} => serializer.serialize_newtype_variant(
98+
"Request",
99+
MUX_C_CLOSE_FWD,
100+
"CloseFwd",
101+
&(*request_id, fwd_mode),
102+
),
90103
StopListening { request_id } => serializer.serialize_newtype_variant(
91104
"Request",
92105
MUX_C_STOP_LISTENING,

0 commit comments

Comments
 (0)