From 6404cdab9a0365f1aa32fd3772279df13526cbc0 Mon Sep 17 00:00:00 2001 From: zhangyuqin1998 Date: Mon, 22 Sep 2025 16:10:50 +0800 Subject: [PATCH] [Distributed] refine training_pipeline_step to avoid memory leaks --- paddlenlp/trainer/trainer.py | 29 ++++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/paddlenlp/trainer/trainer.py b/paddlenlp/trainer/trainer.py index 9a3bad94c59b..8f80f31218cc 100644 --- a/paddlenlp/trainer/trainer.py +++ b/paddlenlp/trainer/trainer.py @@ -44,6 +44,11 @@ from paddle import framework from paddle.distributed.fleet.meta_parallel import PipelineLayer +try: + from paddle.distributed.fleet.meta_parallel import PipelineDatasetPreprocessor +except: + PipelineDatasetPreprocessor = None + try: from paddle.base import core except: @@ -2583,22 +2588,32 @@ def training_pipeline_step(self, model: nn.Layer, inputs: Dict[str, Union[paddle # for v in self._pp_data_buffer[0].values(): # assert isinstance(v, paddle.Tensor), f"Only support tensor as pipeline mode input, got type {type(v)}" - with self.autocast_smart_context_manager(): - inputs = model._prepare_pipeline_inputs_func(self._pp_data_buffer) - self._pp_data_buffer = [] - model.train() if model._dp_comm_overlap or model._sharding_comm_overlap: for _, buffers in model._chunk_2_comm_buffers.items(): for buffer in buffers: buffer._acc_steps = self.args.gradient_accumulation_steps - inputs = model._prepare_training( - inputs, self.optimizer, self.lr_scheduler - ) # None, None => [optimizer, lr_scheduler] model.optimizer = None # we do not use `PipelineParallel` to handler optimizer step model.lr_scheduler = None + def _dataset_process_function(): + # Pass a local function to forward_backward_pipeline instead of the dataset itself. + # This prevents the dataset from being passed as a direct argument to forward_backward_pipeline, + # which would create additional reference counts that cannot be cleared, leading to GPU memory leaks. + with self.autocast_smart_context_manager(): + inputs = model._prepare_pipeline_inputs_func(self._pp_data_buffer) + self._pp_data_buffer = [] + + return model._prepare_training( + inputs, self.optimizer, self.lr_scheduler + ) # None, None => [optimizer, lr_scheduler] + + if PipelineDatasetPreprocessor is None: + inputs = _dataset_process_function() + else: + inputs = PipelineDatasetPreprocessor(_dataset_process_function) + with self.autocast_smart_context_manager(): loss = model.forward_backward_pipeline(inputs, self.scaler if self.do_grad_scaling else None)