diff --git a/josh-proxy/src/bin/josh-proxy.rs b/josh-proxy/src/bin/josh-proxy.rs index d775492e..5f0ffa7f 100644 --- a/josh-proxy/src/bin/josh-proxy.rs +++ b/josh-proxy/src/bin/josh-proxy.rs @@ -2,19 +2,17 @@ extern crate lazy_static; extern crate clap; -use bytes::Bytes; use clap::Parser; -use http_body_util::Full; use hyper::body::Incoming; use hyper::server::conn::http1; use hyper_util::rt::{tokio::TokioIo, tokio::TokioTimer}; use josh_proxy::cli; +use josh_proxy::hyper_integration::{JoshResponse, empty, erase, full}; use josh_proxy::{FetchError, MetaConfig, RemoteAuth, RepoConfig, RepoUpdate, run_git_with_auth}; use tokio::pin; use tokio::sync::broadcast; use tracing_opentelemetry::OpenTelemetrySpanExt; -use futures::FutureExt; use hyper::service::service_fn; use hyper::{Request, Response, StatusCode}; @@ -319,7 +317,7 @@ async fn fetch_upstream( async fn static_paths( service: &JoshProxyService, path: &str, -) -> josh::JoshResult>>> { +) -> josh::JoshResult> { tracing::debug!("static_path {:?}", path); if path == "/version" { return Ok(Some(make_response( @@ -378,7 +376,7 @@ async fn static_paths( async fn repo_update_fn( _serv: Arc, req: Request, -) -> josh::JoshResult>> { +) -> josh::JoshResult { let body = req.into_body().collect().await?.to_bytes(); let s = tracing::span!(tracing::Level::TRACE, "repo update worker"); @@ -402,10 +400,10 @@ async fn repo_update_fn( Ok(match result { Ok(stderr) => Response::builder() .status(hyper::StatusCode::OK) - .body(Full::new(Bytes::from(stderr))), + .body(full(stderr)), Err(josh::JoshError(stderr)) => Response::builder() .status(hyper::StatusCode::INTERNAL_SERVER_ERROR) - .body(Full::new(Bytes::from(stderr))), + .body(full(stderr)), }?) } @@ -541,27 +539,27 @@ async fn do_filter( Ok(()) } -fn make_response(body: &str, code: hyper::StatusCode) -> Response> { +fn make_response(body: &str, code: hyper::StatusCode) -> JoshResponse { let owned_body = body.to_owned(); Response::builder() .status(code) .header(hyper::header::CONTENT_TYPE, "text/plain") - .body(Full::new(Bytes::from(owned_body))) + .body(full(owned_body)) .expect("Can't build response") } async fn handle_ui_request( req: Request, resource_path: &str, -) -> josh::JoshResult>> { +) -> josh::JoshResult { // Proxy: can be used for UI development or to serve a different UI if let Some(proxy) = &ARGS.static_resource_proxy_target { let client_ip = IpAddr::from_str("127.0.0.1").unwrap(); return match hyper_reverse_proxy::call(client_ip, proxy, req).await { - Ok(response) => Ok(response), + Ok(response) => Ok(erase(response)), Err(error) => Ok(Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) - .body(Full::new(Bytes::from(format!("Proxy error: {:?}", error)))) + .body(full(format!("Proxy error: {:?}", error))) .unwrap()), }; } @@ -576,25 +574,21 @@ async fn handle_ui_request( || resource_path == "/history"; let resolve_path = if is_app_route { - "index.html" + "/index.html" } else { resource_path }; - let resolver = hyper_staticfile::Resolver::new("josh/static"); let request = hyper::http::Request::get(resolve_path).body(()).unwrap(); - let result = resolver.resolve_request(&request).await?; - let response = hyper::Response::new(Full::new( - hyper_staticfile::ResponseBuilder::new() - .request(&req) - .build(result)? - .into_body() - .collect() - .await? - .to_bytes(), - )); - Ok(response) + let resolver = hyper_staticfile::Static::new("/josh/static"); + + let res = resolver.serve(request).await.map_err(|e| match e { + //TODO: handle errors + _ => JoshError("Error serving static file".to_string()), + })?; + + return Ok(erase(res)); } async fn query_meta_repo( @@ -986,7 +980,7 @@ fn make_repo_update( async fn handle_serve_namespace_request( serv: Arc, req: Request, -) -> josh::JoshResult>> { +) -> josh::JoshResult { let error_response = |status: StatusCode| Ok(make_response("", status)); if req.method() != hyper::Method::POST { @@ -1201,7 +1195,7 @@ async fn handle_serve_namespace_request( async fn call_service( serv: Arc, req_auth: (josh_proxy::auth::Handle, Request), -) -> josh::JoshResult>> { +) -> josh::JoshResult { let (auth, req) = req_auth; let path = { @@ -1271,7 +1265,7 @@ async fn call_service( return Ok(Response::builder() .status(hyper::StatusCode::FOUND) .header("Location", redirect_path) - .body(Full::new(Bytes::new()))?); + .body(empty())?); } }; @@ -1320,7 +1314,7 @@ async fn call_service( return Ok(Response::builder() .status(hyper::StatusCode::TEMPORARY_REDIRECT) .header("Location", format!("{}{}", remote_url, parsed_url.pathinfo)) - .body(Full::new(Bytes::new()))?); + .body(empty())?); } let http_auth_required = ARGS.require_auth && parsed_url.pathinfo == "/git-receive-pack"; @@ -1347,7 +1341,7 @@ async fn call_service( "Basic realm=User Visible Realm", ) .status(hyper::StatusCode::UNAUTHORIZED); - return Ok(builder.body(Full::new(Bytes::new()))?); + return Ok(builder.body(empty())?); } } @@ -1357,11 +1351,11 @@ async fn call_service( if parsed_url.api == "/~/graphiql" { let addr = format!("/~/graphql{}", meta.config.repo); - return Ok(tokio::task::spawn_blocking(move || { - josh_proxy::juniper_hyper::graphiql(&addr, None) - }) - .in_current_span() - .await??); + return Ok(erase( + tokio::task::spawn_blocking(move || josh_proxy::juniper_hyper::graphiql(&addr, None)) + .in_current_span() + .await??, + )); } for fetch_repo in fetch_repos.iter() { @@ -1385,11 +1379,11 @@ async fn call_service( "Basic realm=User Visible Realm", ) .status(hyper::StatusCode::UNAUTHORIZED); - return Ok(builder.body(Full::new(Bytes::new()))?); + return Ok(builder.body(empty())?); } Err(FetchError::Other(e)) => { let builder = Response::builder().status(hyper::StatusCode::INTERNAL_SERVER_ERROR); - return Ok(builder.body(Full::new(Bytes::from(e.0)))?); + return Ok(builder.body(full(e.0))?); } } } @@ -1450,7 +1444,7 @@ async fn call_service( // it is executed in all cases. std::mem::drop(temp_ns); - Ok(cgi_response) + Ok(erase(cgi_response)) } async fn serve_query( @@ -1459,7 +1453,7 @@ async fn serve_query( upstream_repo: String, filter: josh::filter::Filter, head_ref: &str, -) -> josh::JoshResult>> { +) -> josh::JoshResult { let tracing_span = tracing::span!(tracing::Level::TRACE, "render worker"); let head_ref = head_ref.to_string(); let res = tokio::task::spawn_blocking(move || -> josh::JoshResult<_> { @@ -1506,15 +1500,15 @@ async fn serve_query( .get("content-type") .unwrap_or(&"text/plain".to_string()), ) - .body(Full::new(Bytes::from(res)))?, + .body(full(res))?, Ok(None) => Response::builder() .status(hyper::StatusCode::NOT_FOUND) - .body(Full::new(Bytes::from("File not found".to_string())))?, + .body(full("File not found".to_string()))?, Err(res) => Response::builder() .status(hyper::StatusCode::UNPROCESSABLE_ENTITY) - .body(Full::new(Bytes::from(res.to_string())))?, + .body(full(res.to_string()))?, }) } @@ -1650,7 +1644,7 @@ async fn run_proxy() -> josh::JoshResult { let _s = tracing::span!( tracing::Level::TRACE, "http_request", - path = _req.uri().path() + path = _req.uri().path().to_string() ); let s = _s; @@ -1677,9 +1671,8 @@ async fn run_proxy() -> josh::JoshResult { }; let _e = s.enter(); trace_http_response_code(s.clone(), r.status()); - r + Ok::<_, hyper::http::Error>(r) } - .map(Ok::<_, hyper::http::Error>) }), ); pin!(conn); @@ -1842,15 +1835,11 @@ async fn serve_graphql( upstream_repo: String, upstream: String, auth: josh_proxy::auth::Handle, -) -> josh::JoshResult>> { +) -> josh::JoshResult { let remote_url = upstream.clone() + upstream_repo.as_str(); let parsed = match josh_proxy::juniper_hyper::parse_req(req).await { Ok(r) => r, - Err(resp) => { - return Ok(hyper::Response::new(Full::new(Bytes::from( - resp.collect().await?.to_bytes(), - )))); - } + Err(resp) => return Ok(erase(resp)), }; let transaction_mirror = josh::cache::Transaction::open( @@ -1909,12 +1898,12 @@ async fn serve_graphql( "Basic realm=User Visible Realm", ) .status(hyper::StatusCode::UNAUTHORIZED); - return Ok(builder.body(Full::new(Bytes::new()))?); + return Ok(builder.body(empty())?); } Err(FetchError::Other(e)) => { let builder = Response::builder().status(hyper::StatusCode::INTERNAL_SERVER_ERROR); - return Ok(builder.body(Full::new(Bytes::from(e.0)))?); + return Ok(builder.body(full(e.0))?); } }; @@ -1928,8 +1917,8 @@ async fn serve_graphql( hyper::StatusCode::BAD_REQUEST }; - let body = Full::new(Bytes::from(serde_json::to_string_pretty(&res).unwrap())); - let mut resp = Response::new(Full::new(Bytes::new())); + let body = full(serde_json::to_string_pretty(&res).unwrap()); + let mut resp = Response::new(empty()); *resp.status_mut() = code; resp.headers_mut().insert( hyper::header::CONTENT_TYPE, diff --git a/josh-proxy/src/hyper_integration.rs b/josh-proxy/src/hyper_integration.rs new file mode 100644 index 00000000..8f706235 --- /dev/null +++ b/josh-proxy/src/hyper_integration.rs @@ -0,0 +1,29 @@ +use bytes::Bytes; +use http_body_util::{BodyExt, combinators::BoxBody}; +use hyper::Response; + +pub type BoxError = Box; +pub type JoshBody = BoxBody; +pub type JoshResponse = Response; + +pub fn empty() -> JoshBody { + use http_body_util::{BodyExt, Empty}; + return Empty::::new() + .map_err(|never| match never {}) + .boxed(); +} + +pub fn full(b: impl Into) -> JoshBody { + use http_body_util::{BodyExt, Full}; + return Full::::new(b.into()) + .map_err(|never| match never {}) + .boxed(); +} + +pub fn erase(res: hyper::Response) -> JoshResponse +where + B: hyper::body::Body + Send + Sync + 'static, + B::Error: Into, +{ + res.map(|b| b.map_err(Into::into).boxed()) +} diff --git a/josh-proxy/src/lib.rs b/josh-proxy/src/lib.rs index 1a710fbb..6fd24d66 100644 --- a/josh-proxy/src/lib.rs +++ b/josh-proxy/src/lib.rs @@ -1,6 +1,7 @@ pub mod auth; pub mod cli; pub mod housekeeping; +pub mod hyper_integration; pub mod juniper_hyper; pub mod trace; diff --git a/tests/proxy/ui.t b/tests/proxy/ui.t new file mode 100644 index 00000000..333fd84f --- /dev/null +++ b/tests/proxy/ui.t @@ -0,0 +1,30 @@ + $ . ${TESTDIR}/setup_test_env.sh + $ cd ${TESTTMP} + $ curl -s -I http://127.0.0.1:8002/ + HTTP/1.1 302 Found\r (esc) + location: /~/ui/\r (esc) + date: * (glob) + \r (esc) + $ curl -s -I http://127.0.0.1:8002/~/ui/index.html + HTTP/1.1 200 OK\r (esc) + etag: * (glob) + last-modified: * (glob) + accept-ranges: bytes\r (esc) + content-length: 633\r (esc) + content-type: text/html\r (esc) + date: * (glob) + \r (esc) + $ curl -s -I http://127.0.0.1:8002/~/ui/favicon.ico + HTTP/1.1 200 OK\r (esc) + etag: * (glob) + last-modified: * (glob) + accept-ranges: bytes\r (esc) + content-length: 12014\r (esc) + content-type: image/x-icon\r (esc) + date: * (glob) + \r (esc) + $ curl -s -I http://127.0.0.1:8002/a/repo + HTTP/1.1 302 Found\r (esc) + location: /~/ui/browse?repo=/a/repo.git&path=&filter=%3A%2F&rev=HEAD\r (esc) + date: * (glob) + \r (esc)