Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
257 changes: 126 additions & 131 deletions dcore/src/inspector_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@ use core::convert::Infallible as Never;
use deno_core::InspectorMsg;
use deno_core::InspectorSessionKind;
use deno_core::InspectorSessionProxy;
use deno_core::JsRuntime;
use deno_core::JsRuntimeInspector;
use deno_core::anyhow::Context;
use deno_core::futures::channel::mpsc;
use deno_core::futures::channel::mpsc::UnboundedReceiver;
use deno_core::futures::channel::mpsc::UnboundedSender;
use deno_core::futures::channel::oneshot;
use deno_core::futures::prelude::*;
use deno_core::futures::select;
use deno_core::futures::stream::StreamExt;
use deno_core::serde_json::Value;
use deno_core::serde_json::json;
Expand Down Expand Up @@ -88,10 +87,9 @@ impl InspectorServer {
pub fn register_inspector(
&self,
module_url: String,
js_runtime: &mut JsRuntime,
inspector: Rc<JsRuntimeInspector>,
wait_for_session: bool,
) {
let inspector = js_runtime.inspector();
let session_sender = inspector.get_session_sender();
let deregister_rx = inspector.add_deregister_handler();

Expand Down Expand Up @@ -133,16 +131,16 @@ fn handle_ws_request(
.strip_prefix("/ws/")
.and_then(|s| Uuid::parse_str(s).ok());

if maybe_uuid.is_none() {
let Some(uuid) = maybe_uuid else {
return http::Response::builder()
.status(http::StatusCode::BAD_REQUEST)
.body(Box::new(Bytes::from("Malformed inspector UUID").into()));
}
};

// run in a block to not hold borrow to `inspector_map` for too long
let new_session_tx = {
let inspector_map = inspector_map_rc.borrow();
let maybe_inspector_info = inspector_map.get(&maybe_uuid.unwrap());
let maybe_inspector_info = inspector_map.get(&uuid);

if maybe_inspector_info.is_none() {
return http::Response::builder()
Expand All @@ -156,28 +154,19 @@ fn handle_ws_request(
let (parts, _) = req.into_parts();
let mut req = http::Request::from_parts(parts, body);

let (resp, fut) = match fastwebsockets::upgrade::upgrade(&mut req) {
Ok((resp, fut)) => {
let (parts, _body) = resp.into_parts();
let resp = http::Response::from_parts(
parts,
Box::new(http_body_util::Full::new(Bytes::new())),
);
(resp, fut)
}
_ => {
return http::Response::builder()
.status(http::StatusCode::BAD_REQUEST)
.body(Box::new(
Bytes::from("Not a valid Websocket Request").into(),
));
}
let Ok((resp, upgrade_fut)) = fastwebsockets::upgrade::upgrade(&mut req)
else {
return http::Response::builder()
.status(http::StatusCode::BAD_REQUEST)
.body(Box::new(
Bytes::from("Not a valid Websocket Request").into(),
));
};

// spawn a task that will wait for websocket connection and then pump messages between
// the socket and inspector proxy
spawn(async move {
let websocket = match fut.await {
let websocket = match upgrade_fut.await {
Ok(w) => w,
Err(err) => {
eprintln!(
Expand Down Expand Up @@ -206,6 +195,11 @@ fn handle_ws_request(
pump_websocket_messages(websocket, inbound_tx, outbound_rx).await;
});

let (parts, _body) = resp.into_parts();
let resp = http::Response::from_parts(
parts,
Box::new(http_body_util::Full::new(Bytes::new())),
);
Ok(resp)
}

Expand Down Expand Up @@ -249,34 +243,18 @@ async fn server(
Rc::new(RefCell::new(HashMap::<Uuid, InspectorInfo>::new()));

let inspector_map = Rc::clone(&inspector_map_);
let mut register_inspector_handler = pin!(
register_inspector_rx
.map(|info| {
eprintln!(
"Debugger listening on {}",
info.get_websocket_debugger_url(&info.host.to_string())
);
eprintln!("Visit chrome://inspect to connect to the debugger.");
if info.wait_for_session {
eprintln!("Deno is waiting for debugger to connect.");
}
if inspector_map.borrow_mut().insert(info.uuid, info).is_some() {
panic!("Inspector UUID already in map");
}
})
.collect::<()>()
);
let register_inspector_handler =
listen_for_new_inspectors(register_inspector_rx, inspector_map.clone())
.boxed_local();

let inspector_map = Rc::clone(&inspector_map_);
let mut deregister_inspector_handler = pin!(
future::poll_fn(|cx| {
inspector_map
.borrow_mut()
.retain(|_, info| info.deregister_rx.poll_unpin(cx) == Poll::Pending);
Poll::<Never>::Pending
})
.fuse()
);
let deregister_inspector_handler = future::poll_fn(|cx| {
inspector_map
.borrow_mut()
.retain(|_, info| info.deregister_rx.poll_unpin(cx) == Poll::Pending);
Poll::<Never>::Pending
})
.boxed_local();

let json_version_response = json!({
"Browser": name,
Expand All @@ -293,102 +271,119 @@ async fn server(
}
};

let mut server_handler = pin!(
deno_core::unsync::spawn(async move {
loop {
let mut rx = shutdown_server_rx.resubscribe();
let mut shutdown_rx = pin!(rx.recv());
let mut accept = pin!(listener.accept());

let stream = tokio::select! {
accept_result = &mut accept => {
match accept_result {
Ok((s, _)) => s,
Err(err) => {
eprintln!("Failed to accept inspector connection: {:?}", err);
continue;
}
let server_handler = async move {
loop {
let mut rx = shutdown_server_rx.resubscribe();
let mut shutdown_rx = pin!(rx.recv());
let mut accept = pin!(listener.accept());

let stream = tokio::select! {
accept_result = &mut accept => {
match accept_result {
Ok((s, _)) => s,
Err(err) => {
eprintln!("Failed to accept inspector connection: {:?}", err);
continue;
}
},

_ = &mut shutdown_rx => {
break;
}
};
let io = TokioIo::new(stream);

let inspector_map = Rc::clone(&inspector_map_);
let json_version_response = json_version_response.clone();
let mut shutdown_server_rx = shutdown_server_rx.resubscribe();

let service = hyper::service::service_fn(
move |req: http::Request<hyper::body::Incoming>| {
future::ready({
// If the host header can make a valid URL, use it
let host = req
.headers()
.get("host")
.and_then(|host| host.to_str().ok())
.and_then(|host| Url::parse(&format!("http://{host}")).ok())
.and_then(|url| match (url.host(), url.port()) {
(Some(host), Some(port)) => Some(format!("{host}:{port}")),
(Some(host), None) => Some(format!("{host}")),
_ => None,
});
match (req.method(), req.uri().path()) {
(&http::Method::GET, path) if path.starts_with("/ws/") => {
handle_ws_request(req, Rc::clone(&inspector_map))
}
(&http::Method::GET, "/json/version") => {
handle_json_version_request(json_version_response.clone())
}
(&http::Method::GET, "/json") => {
handle_json_request(Rc::clone(&inspector_map), host)
}
(&http::Method::GET, "/json/list") => {
handle_json_request(Rc::clone(&inspector_map), host)
}
_ => http::Response::builder()
.status(http::StatusCode::NOT_FOUND)
.body(Box::new(http_body_util::Full::new(Bytes::from(
"Not Found",
)))),
},

_ = &mut shutdown_rx => {
break;
}
};
let io = TokioIo::new(stream);

let inspector_map = Rc::clone(&inspector_map_);
let json_version_response = json_version_response.clone();
let mut shutdown_server_rx = shutdown_server_rx.resubscribe();

let service = hyper::service::service_fn(
move |req: http::Request<hyper::body::Incoming>| {
future::ready({
// If the host header can make a valid URL, use it
let host = req
.headers()
.get("host")
.and_then(|host| host.to_str().ok())
.and_then(|host| Url::parse(&format!("http://{host}")).ok())
.and_then(|url| match (url.host(), url.port()) {
(Some(host), Some(port)) => Some(format!("{host}:{port}")),
(Some(host), None) => Some(format!("{host}")),
_ => None,
});
match (req.method(), req.uri().path()) {
(&http::Method::GET, path) if path.starts_with("/ws/") => {
handle_ws_request(req, Rc::clone(&inspector_map))
}
})
},
);
(&http::Method::GET, "/json/version") => {
handle_json_version_request(json_version_response.clone())
}
(&http::Method::GET, "/json") => {
handle_json_request(Rc::clone(&inspector_map), host)
}
(&http::Method::GET, "/json/list") => {
handle_json_request(Rc::clone(&inspector_map), host)
}
_ => http::Response::builder()
.status(http::StatusCode::NOT_FOUND)
.body(Box::new(http_body_util::Full::new(Bytes::from(
"Not Found",
)))),
}
})
},
);

deno_core::unsync::spawn(async move {
let server = hyper::server::conn::http1::Builder::new();
deno_core::unsync::spawn(async move {
let server = hyper::server::conn::http1::Builder::new();

let mut conn =
pin!(server.serve_connection(io, service).with_upgrades());
let mut shutdown_rx = pin!(shutdown_server_rx.recv());
let mut conn =
pin!(server.serve_connection(io, service).with_upgrades());
let mut shutdown_rx = pin!(shutdown_server_rx.recv());

tokio::select! {
result = conn.as_mut() => {
if let Err(err) = result {
eprintln!("Failed to serve connection: {:?}", err);
}
},
_ = &mut shutdown_rx => {
conn.as_mut().graceful_shutdown();
let _ = conn.await;
tokio::select! {
result = conn.as_mut() => {
if let Err(err) = result {
eprintln!("Failed to serve connection: {:?}", err);
}
},
_ = &mut shutdown_rx => {
conn.as_mut().graceful_shutdown();
let _ = conn.await;
}
});
}
})
.fuse()
);
}
});
}
}
.boxed_local();

select! {
tokio::select! {
_ = register_inspector_handler => {},
_ = deregister_inspector_handler => unreachable!(),
_ = server_handler => {},
}
}

async fn listen_for_new_inspectors(
mut register_inspector_rx: UnboundedReceiver<InspectorInfo>,
inspector_map: Rc<RefCell<HashMap<Uuid, InspectorInfo>>>,
) {
while let Some(info) = register_inspector_rx.next().await {
eprintln!(
"Debugger listening on {}",
info.get_websocket_debugger_url(&info.host.to_string())
);
eprintln!("Visit chrome://inspect to connect to the debugger.");
if info.wait_for_session {
eprintln!("Deno is waiting for debugger to connect.");
}
if inspector_map.borrow_mut().insert(info.uuid, info).is_some() {
panic!("Inspector UUID already in map");
}
}
}

/// The pump future takes care of forwarding messages between the websocket
/// and channels. It resolves when either side disconnects, ignoring any
/// errors.
Expand Down
2 changes: 1 addition & 1 deletion dcore/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ fn main() -> Result<(), Error> {
if let Some(inspector_server) = inspector_server.clone() {
inspector_server.register_inspector(
main_module.to_string(),
&mut js_runtime,
js_runtime.inspector(),
matches!(maybe_inspect_mode.unwrap(), InspectMode::WaitForConnection),
);
}
Expand Down
Loading