2121import torchx .specs as specs
2222from torchx .cli .argparse_util import ArgOnceAction , torchxconfig_run
2323from torchx .cli .cmd_base import SubCommand
24+ from torchx .cli .cmd_log import get_logs
2425from torchx .runner import config , get_runner , Runner
2526from torchx .runner .config import load_sections
2627from torchx .schedulers import get_default_scheduler_name , get_scheduler_factories
@@ -186,6 +187,12 @@ def add_arguments(self, subparser: argparse.ArgumentParser) -> None:
186187 help = "optional parent run ID that this run belongs to."
187188 " It can be used to group runs for experiment tracking purposes" ,
188189 )
190+ subparser .add_argument (
191+ "--tee_logs" ,
192+ action = "store_true" ,
193+ default = False ,
194+ help = "Add additional prefix to log lines to indicate which replica is printing the log" ,
195+ )
189196 subparser .add_argument (
190197 "component_name_and_args" ,
191198 nargs = argparse .REMAINDER ,
@@ -237,14 +244,18 @@ def _run(self, runner: Runner, args: argparse.Namespace) -> None:
237244 print (app_handle )
238245
239246 if args .scheduler .startswith ("local" ):
240- self ._wait_and_exit (runner , app_handle , log = True )
247+ self ._wait_and_exit (
248+ runner , app_handle , log = True , tee_logs = args .tee_logs
249+ )
241250 else :
242251 logger .info (f"Launched app: { app_handle } " )
243252 app_status = runner .status (app_handle )
244253 if app_status :
245254 logger .info (app_status .format ())
246255 if args .wait or args .log :
247- self ._wait_and_exit (runner , app_handle , log = args .log )
256+ self ._wait_and_exit (
257+ runner , app_handle , log = args .log , tee_logs = args .tee_logs
258+ )
248259
249260 except (ComponentValidationException , ComponentNotFoundException ) as e :
250261 error_msg = f"\n Failed to run component `{ component } ` got errors: \n { e } "
@@ -267,10 +278,16 @@ def run(self, args: argparse.Namespace) -> None:
267278 with get_runner (component_defaults = component_defaults ) as runner :
268279 self ._run (runner , args )
269280
270- def _wait_and_exit (self , runner : Runner , app_handle : str , log : bool ) -> None :
281+ def _wait_and_exit (
282+ self , runner : Runner , app_handle : str , log : bool , tee_logs : bool
283+ ) -> None :
271284 logger .info ("Waiting for the app to finish..." )
272285
273- log_thread = self ._start_log_thread (runner , app_handle ) if log else None
286+ log_thread = (
287+ self ._start_log_thread (runner , app_handle , tee_logs_enabled = tee_logs )
288+ if log
289+ else None
290+ )
274291
275292 status = runner .wait (app_handle , wait_interval = 1 )
276293 if not status :
@@ -287,15 +304,30 @@ def _wait_and_exit(self, runner: Runner, app_handle: str, log: bool) -> None:
287304 else :
288305 logger .debug (status )
289306
290- def _start_log_thread (self , runner : Runner , app_handle : str ) -> threading .Thread :
291- thread = tee_logs (
292- dst = sys .stderr ,
293- app_handle = app_handle ,
294- regex = None ,
295- runner = runner ,
296- should_tail = True ,
297- streams = None ,
298- colorize = not sys .stderr .closed and sys .stderr .isatty (),
299- )
307+ def _start_log_thread (
308+ self , runner : Runner , app_handle : str , tee_logs_enabled : bool
309+ ) -> threading .Thread :
310+ if tee_logs_enabled :
311+ thread = tee_logs (
312+ dst = sys .stderr ,
313+ app_handle = app_handle ,
314+ regex = None ,
315+ runner = runner ,
316+ should_tail = True ,
317+ streams = None ,
318+ colorize = not sys .stderr .closed and sys .stderr .isatty (),
319+ )
320+ else :
321+ thread = threading .Thread (
322+ target = get_logs ,
323+ kwargs = {
324+ "file" : sys .stderr ,
325+ "runner" : runner ,
326+ "identifier" : app_handle ,
327+ "regex" : None ,
328+ "should_tail" : True ,
329+ },
330+ )
331+ thread .daemon = True
300332 thread .start ()
301333 return thread
0 commit comments