From 654ca6c40a5309fef22390488a955a84e099c2b9 Mon Sep 17 00:00:00 2001 From: Volker Diels-Grabsch Date: Sun, 9 Feb 2025 01:44:25 +0100 Subject: [PATCH] Introduce Buf_write.of_flow --- lib_eio/buf_write.ml | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/lib_eio/buf_write.ml b/lib_eio/buf_write.ml index cf5e290e4..b35e79bd7 100644 --- a/lib_eio/buf_write.ml +++ b/lib_eio/buf_write.ml @@ -515,21 +515,16 @@ let copy t flow = try aux () with End_of_file -> () -let with_flow ?(initial_size=0x1000) flow fn = - Switch.run ~name:"Buf_write.with_flow" @@ fun sw -> +let of_flow ~sw ?(initial_size=0x1000) flow = let t = create ~sw initial_size in + Switch.on_release sw (fun () -> close t); Fiber.fork ~sw (fun () -> copy t flow); - match fn t with - | x -> - close t; - x - | exception ex -> - close t; - (* Raising the exception will cancel the writer thread, so do a flush first. - We don't want to flush if cancelled, but in that case the switch will - end the writer thread itself (and [flush] will raise). *) - flush t; - raise ex + t + +let with_flow ?initial_size flow fn = + Switch.run ~name:"Buf_write.with_flow" @@ fun sw -> + let t = of_flow ~sw ?initial_size flow in + fn t let rec serialize t writev = match await_batch t with