From 2ae45068d27d48bb038870dd2cb142f416a9d025 Mon Sep 17 00:00:00 2001 From: Constantin Weberpals Date: Mon, 24 Jun 2024 17:40:03 +0200 Subject: [PATCH 01/12] enable re-training --- neuralprophet/forecaster.py | 55 +++++++++++++++++++++++-------------- 1 file changed, 35 insertions(+), 20 deletions(-) diff --git a/neuralprophet/forecaster.py b/neuralprophet/forecaster.py index d80fcef14..7ec036c0c 100644 --- a/neuralprophet/forecaster.py +++ b/neuralprophet/forecaster.py @@ -962,7 +962,7 @@ def fit( pd.DataFrame metrics with training and potentially evaluation metrics """ - if self.fitted: + if self.fitted and not continue_training: raise RuntimeError("Model has been fitted already. Please initialize a new model to fit again.") # Configuration @@ -2645,23 +2645,23 @@ def _init_train_loader(self, df, num_workers=0): torch DataLoader """ df, _, _, _ = df_utils.prep_or_copy_df(df) - # if not self.fitted: - self.config_normalization.init_data_params( - df=df, - config_lagged_regressors=self.config_lagged_regressors, - config_regressors=self.config_regressors, - config_events=self.config_events, - config_seasonality=self.config_seasonality, - ) + if not self.fitted: + self.config_normalization.init_data_params( + df=df, + config_lagged_regressors=self.config_lagged_regressors, + config_regressors=self.config_regressors, + config_events=self.config_events, + config_seasonality=self.config_seasonality, + ) df = _normalize(df=df, config_normalization=self.config_normalization) - # if not self.fitted: - if self.config_trend.changepoints is not None: - # scale user-specified changepoint times - df_aux = pd.DataFrame({"ds": pd.Series(self.config_trend.changepoints)}) + if not self.fitted: + if self.config_trend.changepoints is not None: + # scale user-specified changepoint times + df_aux = pd.DataFrame({"ds": pd.Series(self.config_trend.changepoints)}) - df_normalized = _normalize(df=df_aux, config_normalization=self.config_normalization) - self.config_trend.changepoints = df_normalized["t"].values # type: ignore + df_normalized = _normalize(df=df_aux, config_normalization=self.config_normalization) + self.config_trend.changepoints = df_normalized["t"].values # type: ignore # df_merged, _ = df_utils.join_dataframes(df) # df_merged = df_merged.sort_values("ds") @@ -2740,6 +2740,13 @@ def _train( pd.DataFrame metrics """ + # Test + if continue_training: + checkpoint_path = self.metrics_logger.checkpoint_path + print(checkpoint_path) + checkpoint = torch.load(checkpoint_path) + print(checkpoint.keys()) + # Set up data the training dataloader df, _, _, _ = df_utils.prep_or_copy_df(df) train_loader = self._init_train_loader(df, num_workers) @@ -2748,12 +2755,20 @@ def _train( # Internal flag to check if validation is enabled validation_enabled = df_val is not None - # Init the model, if not continue from checkpoint + # Load model and optimizer state from checkpoint if continue_training is True if continue_training: - raise NotImplementedError( - "Continuing training from checkpoint is not implemented yet. This feature is planned for one of the \ - upcoming releases." - ) + checkpoint_path = self.metrics_logger.checkpoint_path + checkpoint = torch.load(checkpoint_path) + self.model = self._init_model() + # TODO: fix size mismatch for trend.trend_changepoints_t: copying a param with shape torch.Size([11]) from checkpoint, the shape in current model is torch.Size([12]). + self.model.load_state_dict(checkpoint["state_dict"], strict=False) + self.optimizer.load_state_dict(checkpoint["optimizer_states"][0]) + self.trainer.current_epoch = checkpoint["epoch"] + 1 + if "lr_schedulers" in checkpoint: + self.lr_scheduler.load_state_dict(checkpoint["lr_schedulers"][0]) + print(f"Resuming training from epoch {self.trainer.current_epoch}") + # TODO: remove print, checkpoint['lr_schedulers'] + print(f"Resuming training from epoch {self.trainer.current_epoch}") else: self.model = self._init_model() From 900c8d5f1cfdb0da32ebffea2d0b10b9121711b4 Mon Sep 17 00:00:00 2001 From: Constantin Weberpals Date: Sat, 29 Jun 2024 14:10:56 +0200 Subject: [PATCH 02/12] update scheduler --- neuralprophet/forecaster.py | 81 ++++++++++++++++++++++++++++--------- 1 file changed, 61 insertions(+), 20 deletions(-) diff --git a/neuralprophet/forecaster.py b/neuralprophet/forecaster.py index 7ec036c0c..fd372680b 100644 --- a/neuralprophet/forecaster.py +++ b/neuralprophet/forecaster.py @@ -2654,6 +2654,7 @@ def _init_train_loader(self, df, num_workers=0): config_seasonality=self.config_seasonality, ) + print("Changepoints:", self.config_trend.changepoints) df = _normalize(df=df, config_normalization=self.config_normalization) if not self.fitted: if self.config_trend.changepoints is not None: @@ -2746,11 +2747,10 @@ def _train( print(checkpoint_path) checkpoint = torch.load(checkpoint_path) print(checkpoint.keys()) - - # Set up data the training dataloader - df, _, _, _ = df_utils.prep_or_copy_df(df) - train_loader = self._init_train_loader(df, num_workers) - dataset_size = len(df) # train_loader.dataset + print("Current model trend changepoints:", self.model.trend.trend_changepoints_t) + # self.model = time_net.TimeNet.load_from_checkpoint(checkpoint_path) + # self.model.load_state_dict(checkpoint["state_dict"], strict=False) + print(self.model.train_loader) # Internal flag to check if validation is enabled validation_enabled = df_val is not None @@ -2759,20 +2759,55 @@ def _train( if continue_training: checkpoint_path = self.metrics_logger.checkpoint_path checkpoint = torch.load(checkpoint_path) - self.model = self._init_model() - # TODO: fix size mismatch for trend.trend_changepoints_t: copying a param with shape torch.Size([11]) from checkpoint, the shape in current model is torch.Size([12]). - self.model.load_state_dict(checkpoint["state_dict"], strict=False) - self.optimizer.load_state_dict(checkpoint["optimizer_states"][0]) - self.trainer.current_epoch = checkpoint["epoch"] + 1 - if "lr_schedulers" in checkpoint: - self.lr_scheduler.load_state_dict(checkpoint["lr_schedulers"][0]) - print(f"Resuming training from epoch {self.trainer.current_epoch}") - # TODO: remove print, checkpoint['lr_schedulers'] - print(f"Resuming training from epoch {self.trainer.current_epoch}") + + # Load model state + self.model.load_state_dict(checkpoint["state_dict"]) + + # Adjust epochs + additional_epochs = 10 + previous_epochs = self.config_train.epochs # Get the number of epochs already trained + new_total_epochs = previous_epochs + additional_epochs + self.config_train.epochs = new_total_epochs + + # Reinitialize optimizer with loaded model parameters + optimizer = torch.optim.AdamW(self.model.parameters()) + + # Load optimizer state + if "optimizer_states" in checkpoint and checkpoint["optimizer_states"]: + optimizer.load_state_dict(checkpoint["optimizer_states"][0]) + + self.config_train.optimizer = optimizer + + # Calculate total steps and steps already taken + steps_per_epoch = len(self.model.train_loader) + total_steps = steps_per_epoch * new_total_epochs + steps_taken = steps_per_epoch * previous_epochs + + # Create new scheduler with updated total steps + self.config_train.scheduler = torch.optim.lr_scheduler.OneCycleLR( + optimizer=optimizer, + total_steps=total_steps, + max_lr=10, + pct_start=(total_steps - steps_taken) / total_steps, # Adjust the percentage of remaining steps + ) + + # Manually update the scheduler's step count + for _ in range(steps_taken): + self.config_train.scheduler.step() + + print(f"Scheduler: {self.config_train.scheduler}") + print( + f"Total steps: {total_steps}, Steps taken: {steps_taken}, Remaining steps: {total_steps - steps_taken}" + ) + else: - self.model = self._init_model() + # Set up data the training dataloader + df, _, _, _ = df_utils.prep_or_copy_df(df) + train_loader = self._init_train_loader(df, num_workers) + dataset_size = len(df) # train_loader.dataset - self.model.train_loader = train_loader + self.model = self._init_model() + self.model.train_loader = train_loader # Init the Trainer self.trainer, checkpoint_callback = utils.configure_trainer( @@ -2785,9 +2820,15 @@ def _train( progress_bar_enabled=progress_bar_enabled, metrics_enabled=metrics_enabled, checkpointing_enabled=checkpointing_enabled, - num_batches_per_epoch=len(train_loader), + num_batches_per_epoch=len(self.model.train_loader), ) + # TODO: find out why scheduler not updated + if continue_training: + self.trainer.lr_schedulers = [ + {"scheduler": self.config_train.scheduler, "interval": "step", "frequency": 1} + ] + # Tune hyperparams and train if validation_enabled: # Set up data the validation dataloader @@ -2812,7 +2853,7 @@ def _train( start = time.time() self.trainer.fit( self.model, - train_loader, + self.model.train_loader, val_loader, ckpt_path=self.metrics_logger.checkpoint_path if continue_training else None, ) @@ -2834,7 +2875,7 @@ def _train( start = time.time() self.trainer.fit( self.model, - train_loader, + self.model.train_loader, ckpt_path=self.metrics_logger.checkpoint_path if continue_training else None, ) From f1355eb5dd37dbcd018f34d0a11d1a338f109b20 Mon Sep 17 00:00:00 2001 From: Constantin Weberpals Date: Mon, 1 Jul 2024 00:19:54 +0200 Subject: [PATCH 03/12] change scheduler for continued training --- neuralprophet/forecaster.py | 53 ++++++++----------------------------- neuralprophet/time_net.py | 23 +++++++++++----- 2 files changed, 28 insertions(+), 48 deletions(-) diff --git a/neuralprophet/forecaster.py b/neuralprophet/forecaster.py index fd372680b..cd7422fe2 100644 --- a/neuralprophet/forecaster.py +++ b/neuralprophet/forecaster.py @@ -2741,20 +2741,15 @@ def _train( pd.DataFrame metrics """ - # Test - if continue_training: - checkpoint_path = self.metrics_logger.checkpoint_path - print(checkpoint_path) - checkpoint = torch.load(checkpoint_path) - print(checkpoint.keys()) - print("Current model trend changepoints:", self.model.trend.trend_changepoints_t) - # self.model = time_net.TimeNet.load_from_checkpoint(checkpoint_path) - # self.model.load_state_dict(checkpoint["state_dict"], strict=False) - print(self.model.train_loader) # Internal flag to check if validation is enabled validation_enabled = df_val is not None + # Set up data the training dataloader + df, _, _, _ = df_utils.prep_or_copy_df(df) + train_loader = self._init_train_loader(df, num_workers) + dataset_size = len(df) # train_loader.dataset + # Load model and optimizer state from checkpoint if continue_training is True if continue_training: checkpoint_path = self.metrics_logger.checkpoint_path @@ -2763,8 +2758,11 @@ def _train( # Load model state self.model.load_state_dict(checkpoint["state_dict"]) + # Set continue_training flag in model to update scheduler correctly + self.model.continue_training = True + # Adjust epochs - additional_epochs = 10 + additional_epochs = 50 previous_epochs = self.config_train.epochs # Get the number of epochs already trained new_total_epochs = previous_epochs + additional_epochs self.config_train.epochs = new_total_epochs @@ -2778,34 +2776,7 @@ def _train( self.config_train.optimizer = optimizer - # Calculate total steps and steps already taken - steps_per_epoch = len(self.model.train_loader) - total_steps = steps_per_epoch * new_total_epochs - steps_taken = steps_per_epoch * previous_epochs - - # Create new scheduler with updated total steps - self.config_train.scheduler = torch.optim.lr_scheduler.OneCycleLR( - optimizer=optimizer, - total_steps=total_steps, - max_lr=10, - pct_start=(total_steps - steps_taken) / total_steps, # Adjust the percentage of remaining steps - ) - - # Manually update the scheduler's step count - for _ in range(steps_taken): - self.config_train.scheduler.step() - - print(f"Scheduler: {self.config_train.scheduler}") - print( - f"Total steps: {total_steps}, Steps taken: {steps_taken}, Remaining steps: {total_steps - steps_taken}" - ) - else: - # Set up data the training dataloader - df, _, _, _ = df_utils.prep_or_copy_df(df) - train_loader = self._init_train_loader(df, num_workers) - dataset_size = len(df) # train_loader.dataset - self.model = self._init_model() self.model.train_loader = train_loader @@ -2823,11 +2794,9 @@ def _train( num_batches_per_epoch=len(self.model.train_loader), ) - # TODO: find out why scheduler not updated if continue_training: - self.trainer.lr_schedulers = [ - {"scheduler": self.config_train.scheduler, "interval": "step", "frequency": 1} - ] + print("setting up optimizers again") + # self.trainer.strategy.setup_optimizers(self.trainer) # Tune hyperparams and train if validation_enabled: diff --git a/neuralprophet/time_net.py b/neuralprophet/time_net.py index ea3c4b2f3..5b18525ac 100644 --- a/neuralprophet/time_net.py +++ b/neuralprophet/time_net.py @@ -63,6 +63,7 @@ def __init__( num_seasonalities_modelled: int = 1, num_seasonalities_modelled_dict: dict = None, meta_used_in_model: bool = False, + continue_training: bool = False, ): """ Parameters @@ -306,6 +307,9 @@ def __init__( else: self.config_regressors.regressors = None + # Continued training + self.continue_training = continue_training + @property def ar_weights(self) -> torch.Tensor: """sets property auto-regression weights for regularization. Update if AR is modelled differently""" @@ -863,12 +867,19 @@ def configure_optimizers(self): optimizer = self._optimizer(self.parameters(), lr=self.learning_rate, **self.config_train.optimizer_args) # Scheduler - lr_scheduler = self._scheduler( - optimizer, - max_lr=self.learning_rate, - total_steps=self.trainer.estimated_stepping_batches, - **self.config_train.scheduler_args, - ) + if self.continue_training: + # Update initial learning rate to the last learning rate for continued training + last_lr = optimizer.param_groups[0]["lr"] + lr_scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.95) + for param_group in optimizer.param_groups: + param_group["initial_lr"] = last_lr + else: + lr_scheduler = self._scheduler( + optimizer, + max_lr=self.learning_rate, + total_steps=self.trainer.estimated_stepping_batches, + **self.config_train.scheduler_args, + ) return {"optimizer": optimizer, "lr_scheduler": lr_scheduler} From da3a6d5d8a9de442e2f145b8dafbcde9b5507c71 Mon Sep 17 00:00:00 2001 From: Constantin Weberpals Date: Mon, 1 Jul 2024 17:22:02 +0200 Subject: [PATCH 04/12] add test --- neuralprophet/forecaster.py | 14 +++++++++++--- tests/test_utils.py | 27 +++++++++++++-------------- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/neuralprophet/forecaster.py b/neuralprophet/forecaster.py index cd7422fe2..17c256221 100644 --- a/neuralprophet/forecaster.py +++ b/neuralprophet/forecaster.py @@ -1051,6 +1051,10 @@ def fit( if self.fitted is True and not continue_training: log.error("Model has already been fitted. Re-fitting may break or produce different results.") + + if continue_training and self.metrics_logger.checkpoint_path is None: + log.error("Continued training requires checkpointing in model.") + self.max_lags = df_utils.get_max_num_lags(self.config_lagged_regressors, self.n_lags) if self.max_lags == 0 and self.n_forecasts > 1: self.n_forecasts = 1 @@ -2761,10 +2765,14 @@ def _train( # Set continue_training flag in model to update scheduler correctly self.model.continue_training = True + previous_epoch = checkpoint["epoch"] # Adjust epochs - additional_epochs = 50 - previous_epochs = self.config_train.epochs # Get the number of epochs already trained - new_total_epochs = previous_epochs + additional_epochs + if self.config_train.epochs: + additional_epochs = self.config_train.epochs + else: + additional_epochs = previous_epoch + # Get the number of epochs already trained + new_total_epochs = previous_epoch + additional_epochs self.config_train.epochs = new_total_epochs # Reinitialize optimizer with loaded model parameters diff --git a/tests/test_utils.py b/tests/test_utils.py index f08968e99..c5d838240 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -101,17 +101,16 @@ def test_save_load_io(): pd.testing.assert_frame_equal(forecast, forecast3) -# TODO: add functionality to continue training -# def test_continue_training(): -# df = pd.read_csv(PEYTON_FILE, nrows=NROWS) -# m = NeuralProphet( -# epochs=EPOCHS, -# batch_size=BATCH_SIZE, -# learning_rate=LR, -# n_lags=6, -# n_forecasts=3, -# n_changepoints=0, -# ) -# metrics = m.fit(df, freq="D") -# metrics2 = m.fit(df, freq="D", continue_training=True) -# assert metrics1["Loss"].sum() >= metrics2["Loss"].sum() +def test_continue_training(): + df = pd.read_csv(PEYTON_FILE, nrows=NROWS) + m = NeuralProphet( + epochs=EPOCHS, + batch_size=BATCH_SIZE, + learning_rate=LR, + n_lags=6, + n_forecasts=3, + n_changepoints=0, + ) + metrics = m.fit(df, checkpointing=True, freq="D") + metrics2 = m.fit(df, freq="D", continue_training=True) + assert metrics["Loss"].min() >= metrics2["Loss"].min() From f9969285c8c8b8f8f5cf22a6b8850b58323c576a Mon Sep 17 00:00:00 2001 From: Constantin Weberpals Date: Mon, 1 Jul 2024 21:04:07 +0200 Subject: [PATCH 05/12] fix metrics logging --- neuralprophet/forecaster.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/neuralprophet/forecaster.py b/neuralprophet/forecaster.py index c972651e3..5ad4ad73f 100644 --- a/neuralprophet/forecaster.py +++ b/neuralprophet/forecaster.py @@ -2888,8 +2888,12 @@ def _train( if not metrics_enabled: return None + # Return metrics collected in logger as dataframe - metrics_df = pd.DataFrame(self.metrics_logger.history) + if self.metrics_logger.history is not None: + metrics_df = pd.DataFrame(self.metrics_logger.history) + else: + metrics_df = pd.DataFrame() return metrics_df def restore_trainer(self, accelerator: Optional[str] = None): From f9a77f8da770298d285f7f41f532d897a102316a Mon Sep 17 00:00:00 2001 From: Constantin Weberpals Date: Fri, 5 Jul 2024 10:19:56 +0200 Subject: [PATCH 06/12] include feedback --- neuralprophet/forecaster.py | 35 +++++++++-------------------------- 1 file changed, 9 insertions(+), 26 deletions(-) diff --git a/neuralprophet/forecaster.py b/neuralprophet/forecaster.py index 5ad4ad73f..7cf43d696 100644 --- a/neuralprophet/forecaster.py +++ b/neuralprophet/forecaster.py @@ -979,7 +979,12 @@ def fit( metrics with training and potentially evaluation metrics """ if self.fitted and not continue_training: - raise RuntimeError("Model has been fitted already. Please initialize a new model to fit again.") + raise RuntimeError( + "Model has been fitted already. If you want to continue training please set the flag continue_training." + ) + + if continue_training and epochs is None: + raise ValueError("Continued training requires setting the number of epochs to train for.") # Configuration if epochs is not None: @@ -1065,11 +1070,8 @@ def fit( or any(value != 1 for value in self.num_seasonalities_modelled_dict.values()) ) - if self.fitted is True and not continue_training: - log.error("Model has already been fitted. Re-fitting may break or produce different results.") - if continue_training and self.metrics_logger.checkpoint_path is None: - log.error("Continued training requires checkpointing in model.") + log.error("Continued training requires checkpointing in model to continue from last epoch.") self.max_lags = df_utils.get_max_num_lags( n_lags=self.n_lags, config_lagged_regressors=self.config_lagged_regressors @@ -2777,34 +2779,15 @@ def _train( # Load model and optimizer state from checkpoint if continue_training is True if continue_training: - checkpoint_path = self.metrics_logger.checkpoint_path - checkpoint = torch.load(checkpoint_path) - - # Load model state - self.model.load_state_dict(checkpoint["state_dict"]) + previous_epoch = self.model.current_epoch # Set continue_training flag in model to update scheduler correctly self.model.continue_training = True - previous_epoch = checkpoint["epoch"] # Adjust epochs - if self.config_train.epochs: - additional_epochs = self.config_train.epochs - else: - additional_epochs = previous_epoch - # Get the number of epochs already trained - new_total_epochs = previous_epoch + additional_epochs + new_total_epochs = previous_epoch + self.config_train.epochs self.config_train.epochs = new_total_epochs - # Reinitialize optimizer with loaded model parameters - optimizer = torch.optim.AdamW(self.model.parameters()) - - # Load optimizer state - if "optimizer_states" in checkpoint and checkpoint["optimizer_states"]: - optimizer.load_state_dict(checkpoint["optimizer_states"][0]) - - self.config_train.optimizer = optimizer - else: self.model = self._init_model() From 7ad761d00a93f3b4a9ad54d41116ed0b3c2cb91a Mon Sep 17 00:00:00 2001 From: Constantin Weberpals Date: Fri, 5 Jul 2024 14:56:27 +0200 Subject: [PATCH 07/12] get correct optimizer states --- neuralprophet/configure.py | 4 ++++ neuralprophet/forecaster.py | 6 ++++++ neuralprophet/time_net.py | 17 ++++++++++++++--- 3 files changed, 24 insertions(+), 3 deletions(-) diff --git a/neuralprophet/configure.py b/neuralprophet/configure.py index 5b54b202e..7a8dcf7c7 100644 --- a/neuralprophet/configure.py +++ b/neuralprophet/configure.py @@ -104,6 +104,7 @@ class Train: n_data: int = field(init=False) loss_func_name: str = field(init=False) lr_finder_args: dict = field(default_factory=dict) + optimizer_state: dict = field(default_factory=dict) def __post_init__(self): # assert the uncertainty estimation params and then finalize the quantiles @@ -239,6 +240,9 @@ def get_reg_delay_weight(self, e, iter_progress, reg_start_pct: float = 0.66, re delay_weight = 1 return delay_weight + def set_optimizer_state(self, optimizer_state: dict): + self.optimizer_state = optimizer_state + @dataclass class Trend: diff --git a/neuralprophet/forecaster.py b/neuralprophet/forecaster.py index 7cf43d696..ed35413aa 100644 --- a/neuralprophet/forecaster.py +++ b/neuralprophet/forecaster.py @@ -2779,15 +2779,21 @@ def _train( # Load model and optimizer state from checkpoint if continue_training is True if continue_training: + checkpoint_path = self.metrics_logger.checkpoint_path + checkpoint = torch.load(checkpoint_path) + previous_epoch = self.model.current_epoch # Set continue_training flag in model to update scheduler correctly self.model.continue_training = True + self.model.start_epoch = previous_epoch # Adjust epochs new_total_epochs = previous_epoch + self.config_train.epochs self.config_train.epochs = new_total_epochs + self.config_train.set_optimizer_state(checkpoint["optimizer_states"][0]) + else: self.model = self._init_model() diff --git a/neuralprophet/time_net.py b/neuralprophet/time_net.py index 8413d8782..e635b9dda 100644 --- a/neuralprophet/time_net.py +++ b/neuralprophet/time_net.py @@ -64,6 +64,7 @@ def __init__( num_seasonalities_modelled_dict: dict = None, meta_used_in_model: bool = False, continue_training: bool = False, + start_epoch: int = 0, ): """ Parameters @@ -309,6 +310,7 @@ def __init__( # Continued training self.continue_training = continue_training + self.start_epoch = start_epoch @property def ar_weights(self) -> torch.Tensor: @@ -870,11 +872,20 @@ def configure_optimizers(self): # Scheduler if self.continue_training: + optimizer.load_state_dict(self.config_train.optimizer_state) + # Update initial learning rate to the last learning rate for continued training - last_lr = optimizer.param_groups[0]["lr"] - lr_scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.95) + last_lr = float(optimizer.param_groups[0]["lr"]) # Ensure it's a float + + batches_per_epoch = len(self.train_dataloader()) + total_batches_processed = self.start_epoch * batches_per_epoch + for param_group in optimizer.param_groups: - param_group["initial_lr"] = last_lr + param_group["initial_lr"] = (last_lr,) + + lr_scheduler = lr_scheduler = torch.optim.lr_scheduler.ExponentialLR( + optimizer, gamma=0.95, last_epoch=total_batches_processed - 1 + ) else: lr_scheduler = self._scheduler( optimizer, From b14d20b2fe9d357d10719b61081eda39132a0345 Mon Sep 17 00:00:00 2001 From: Constantin Weberpals Date: Fri, 5 Jul 2024 15:01:07 +0200 Subject: [PATCH 08/12] fix tests --- tests/test_utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_utils.py b/tests/test_utils.py index de57fd5fd..a1f8c5874 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -21,6 +21,7 @@ YOS_FILE = os.path.join(DATA_DIR, "yosemite_temps.csv") NROWS = 512 EPOCHS = 10 +ADDITIONAL_EPOCHS = 5 LR = 1.0 BATCH_SIZE = 64 @@ -112,5 +113,5 @@ def test_continue_training(): n_changepoints=0, ) metrics = m.fit(df, checkpointing=True, freq="D") - metrics2 = m.fit(df, freq="D", continue_training=True) + metrics2 = m.fit(df, freq="D", continue_training=True, epochs=ADDITIONAL_EPOCHS) assert metrics["Loss"].min() >= metrics2["Loss"].min() From 9fe34012e214a9634c55387822bf26efb269b1a1 Mon Sep 17 00:00:00 2001 From: Constantin Weberpals Date: Mon, 8 Jul 2024 11:52:44 +0200 Subject: [PATCH 09/12] enable setting the scheduler --- neuralprophet/configure.py | 65 ++++++++++++++++++++++++++++++------- neuralprophet/forecaster.py | 16 ++++++++- neuralprophet/time_net.py | 19 +++++++---- 3 files changed, 82 insertions(+), 18 deletions(-) diff --git a/neuralprophet/configure.py b/neuralprophet/configure.py index 7a8dcf7c7..79be7fc5a 100644 --- a/neuralprophet/configure.py +++ b/neuralprophet/configure.py @@ -94,7 +94,7 @@ class Train: optimizer: Union[str, Type[torch.optim.Optimizer]] quantiles: List[float] = field(default_factory=list) optimizer_args: dict = field(default_factory=dict) - scheduler: Optional[Type[torch.optim.lr_scheduler.OneCycleLR]] = None + scheduler: Optional[Type[torch.optim.lr_scheduler._LRScheduler]] = None scheduler_args: dict = field(default_factory=dict) newer_samples_weight: float = 1.0 newer_samples_start: float = 0.0 @@ -193,16 +193,59 @@ def set_scheduler(self): Set the scheduler and scheduler args. The scheduler is not initialized yet as this is done in configure_optimizers in TimeNet. """ - self.scheduler = torch.optim.lr_scheduler.OneCycleLR - self.scheduler_args.update( - { - "pct_start": 0.3, - "anneal_strategy": "cos", - "div_factor": 10.0, - "final_div_factor": 10.0, - "three_phase": True, - } - ) + self.scheduler_args.clear() + if isinstance(self.scheduler, str): + if self.scheduler.lower() == "onecyclelr": + self.scheduler = torch.optim.lr_scheduler.OneCycleLR + self.scheduler_args.update( + { + "pct_start": 0.3, + "anneal_strategy": "cos", + "div_factor": 10.0, + "final_div_factor": 10.0, + "three_phase": True, + } + ) + elif self.scheduler.lower() == "steplr": + self.scheduler = torch.optim.lr_scheduler.StepLR + self.scheduler_args.update( + { + "step_size": 10, + "gamma": 0.1, + } + ) + elif self.scheduler.lower() == "exponentiallr": + self.scheduler = torch.optim.lr_scheduler.ExponentialLR + self.scheduler_args.update( + { + "gamma": 0.95, + } + ) + elif self.scheduler.lower() == "reducelronplateau": + self.scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau + self.scheduler_args.update( + { + "mode": "min", + "factor": 0.1, + "patience": 10, + } + ) + elif self.scheduler.lower() == "cosineannealinglr": + self.scheduler = torch.optim.lr_scheduler.CosineAnnealingLR + self.scheduler_args.update( + { + "T_max": 50, + } + ) + else: + raise NotImplementedError(f"Scheduler {self.scheduler} is not supported.") + elif self.scheduler is None: + self.scheduler = torch.optim.lr_scheduler.ExponentialLR + self.scheduler_args.update( + { + "gamma": 0.95, + } + ) def set_lr_finder_args(self, dataset_size, num_batches): """ diff --git a/neuralprophet/forecaster.py b/neuralprophet/forecaster.py index ed35413aa..2e12d974e 100644 --- a/neuralprophet/forecaster.py +++ b/neuralprophet/forecaster.py @@ -451,6 +451,7 @@ def __init__( accelerator: Optional[str] = None, trainer_config: dict = {}, prediction_frequency: Optional[dict] = None, + scheduler: Optional[str] = "onecyclelr", ): self.config = locals() self.config.pop("self") @@ -509,6 +510,7 @@ def __init__( self.config_train = configure.Train( quantiles=quantiles, learning_rate=learning_rate, + scheduler=scheduler, epochs=epochs, batch_size=batch_size, loss_func=loss_func, @@ -921,6 +923,7 @@ def fit( continue_training: bool = False, num_workers: int = 0, deterministic: bool = False, + scheduler: Optional[str] = None, ): """Train, and potentially evaluate model. @@ -986,6 +989,18 @@ def fit( if continue_training and epochs is None: raise ValueError("Continued training requires setting the number of epochs to train for.") + if continue_training: + if scheduler is not None: + self.config_train.scheduler = scheduler + else: + self.config_train.scheduler = None + self.config_train.set_scheduler() + + if scheduler is not None and not continue_training: + log.warning( + "Scheduler can only be set in fit when continuing training. Please set the scheduler when initializing the model." + ) + # Configuration if epochs is not None: self.config_train.epochs = epochs @@ -2681,7 +2696,6 @@ def _init_train_loader(self, df, num_workers=0): config_seasonality=self.config_seasonality, ) - print("Changepoints:", self.config_trend.changepoints) df = _normalize(df=df, config_normalization=self.config_normalization) if not self.fitted: if self.config_trend.changepoints is not None: diff --git a/neuralprophet/time_net.py b/neuralprophet/time_net.py index e635b9dda..3bbfbe66b 100644 --- a/neuralprophet/time_net.py +++ b/neuralprophet/time_net.py @@ -883,16 +883,23 @@ def configure_optimizers(self): for param_group in optimizer.param_groups: param_group["initial_lr"] = (last_lr,) - lr_scheduler = lr_scheduler = torch.optim.lr_scheduler.ExponentialLR( - optimizer, gamma=0.95, last_epoch=total_batches_processed - 1 - ) - else: lr_scheduler = self._scheduler( optimizer, - max_lr=self.learning_rate, - total_steps=self.trainer.estimated_stepping_batches, **self.config_train.scheduler_args, ) + else: + if self._scheduler == torch.optim.lr_scheduler.OneCycleLR: + lr_scheduler = self._scheduler( + optimizer, + max_lr=self.learning_rate, + total_steps=self.trainer.estimated_stepping_batches, + **self.config_train.scheduler_args, + ) + else: + lr_scheduler = self._scheduler( + optimizer, + **self.config_train.scheduler_args, + ) return {"optimizer": optimizer, "lr_scheduler": lr_scheduler} From 00f2e25e13aca00fb58d8951214fb27ad0da792a Mon Sep 17 00:00:00 2001 From: Constantin Weberpals Date: Mon, 8 Jul 2024 12:31:51 +0200 Subject: [PATCH 10/12] update for onecyclelr --- neuralprophet/forecaster.py | 2 +- neuralprophet/time_net.py | 29 ++++++++++++++--------------- 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/neuralprophet/forecaster.py b/neuralprophet/forecaster.py index 2e12d974e..0a1202c6c 100644 --- a/neuralprophet/forecaster.py +++ b/neuralprophet/forecaster.py @@ -436,6 +436,7 @@ def __init__( batch_size: Optional[int] = None, loss_func: Union[str, torch.nn.modules.loss._Loss, Callable] = "SmoothL1Loss", optimizer: Union[str, Type[torch.optim.Optimizer]] = "AdamW", + scheduler: Optional[str] = "onecyclelr", newer_samples_weight: float = 2, newer_samples_start: float = 0.0, quantiles: List[float] = [], @@ -451,7 +452,6 @@ def __init__( accelerator: Optional[str] = None, trainer_config: dict = {}, prediction_frequency: Optional[dict] = None, - scheduler: Optional[str] = "onecyclelr", ): self.config = locals() self.config.pop("self") diff --git a/neuralprophet/time_net.py b/neuralprophet/time_net.py index 3bbfbe66b..28f4058b5 100644 --- a/neuralprophet/time_net.py +++ b/neuralprophet/time_net.py @@ -871,35 +871,34 @@ def configure_optimizers(self): optimizer = self._optimizer(self.parameters(), lr=self.learning_rate, **self.config_train.optimizer_args) # Scheduler + self._scheduler = self.config_train.scheduler + if self.continue_training: optimizer.load_state_dict(self.config_train.optimizer_state) # Update initial learning rate to the last learning rate for continued training last_lr = float(optimizer.param_groups[0]["lr"]) # Ensure it's a float - batches_per_epoch = len(self.train_dataloader()) - total_batches_processed = self.start_epoch * batches_per_epoch - for param_group in optimizer.param_groups: param_group["initial_lr"] = (last_lr,) + if self._scheduler == torch.optim.lr_scheduler.OneCycleLR: + log.warning("OneCycleLR scheduler is not supported for continued training. Switching to ExponentialLR") + self._scheduler = torch.optim.lr_scheduler.ExponentialLR + self.config_train.scheduler_args = {"gamma": 0.95} + + if self._scheduler == torch.optim.lr_scheduler.OneCycleLR: lr_scheduler = self._scheduler( optimizer, + max_lr=self.learning_rate, + total_steps=self.trainer.estimated_stepping_batches, **self.config_train.scheduler_args, ) else: - if self._scheduler == torch.optim.lr_scheduler.OneCycleLR: - lr_scheduler = self._scheduler( - optimizer, - max_lr=self.learning_rate, - total_steps=self.trainer.estimated_stepping_batches, - **self.config_train.scheduler_args, - ) - else: - lr_scheduler = self._scheduler( - optimizer, - **self.config_train.scheduler_args, - ) + lr_scheduler = self._scheduler( + optimizer, + **self.config_train.scheduler_args, + ) return {"optimizer": optimizer, "lr_scheduler": lr_scheduler} From 5f103d8837fc53fd5639efd260900c8e171341ee Mon Sep 17 00:00:00 2001 From: Constantin Weberpals Date: Tue, 9 Jul 2024 14:01:36 +0200 Subject: [PATCH 11/12] add tests and adapt docstring --- neuralprophet/configure.py | 11 +---------- neuralprophet/forecaster.py | 24 +++++++++++++++++++++++- tests/test_utils.py | 31 +++++++++++++++++++++++++++++++ 3 files changed, 55 insertions(+), 11 deletions(-) diff --git a/neuralprophet/configure.py b/neuralprophet/configure.py index 79be7fc5a..5cc5edca7 100644 --- a/neuralprophet/configure.py +++ b/neuralprophet/configure.py @@ -190,7 +190,7 @@ def set_optimizer(self): def set_scheduler(self): """ - Set the scheduler and scheduler args. + Set the scheduler and scheduler arg depending on the user selection. The scheduler is not initialized yet as this is done in configure_optimizers in TimeNet. """ self.scheduler_args.clear() @@ -221,15 +221,6 @@ def set_scheduler(self): "gamma": 0.95, } ) - elif self.scheduler.lower() == "reducelronplateau": - self.scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau - self.scheduler_args.update( - { - "mode": "min", - "factor": 0.1, - "patience": 10, - } - ) elif self.scheduler.lower() == "cosineannealinglr": self.scheduler = torch.optim.lr_scheduler.CosineAnnealingLR self.scheduler_args.update( diff --git a/neuralprophet/forecaster.py b/neuralprophet/forecaster.py index 0a1202c6c..05cdc3f75 100644 --- a/neuralprophet/forecaster.py +++ b/neuralprophet/forecaster.py @@ -301,6 +301,20 @@ class NeuralProphet: >>> m = NeuralProphet(collect_metrics=["MSE", "MAE", "RMSE"]) >>> # use custorm torchmetrics names >>> m = NeuralProphet(collect_metrics={"MAPE": "MeanAbsolutePercentageError", "MSLE": "MeanSquaredLogError", + scheduler : str, torch.optim.lr_scheduler._LRScheduler + Type of learning rate scheduler to use. + + Options + * (default) ``OneCycleLR``: One Cycle Learning Rate scheduler + * ``StepLR``: Step Learning Rate scheduler + * ``ExponentialLR``: Exponential Learning Rate scheduler + * ``CosineAnnealingLR``: Cosine Annealing Learning Rate scheduler + + Examples + -------- + >>> from neuralprophet import NeuralProphet + >>> # Step Learning Rate scheduler + >>> m = NeuralProphet(scheduler="StepLR") COMMENT Uncertainty Estimation @@ -975,6 +989,13 @@ def fit( Note: using multiple workers and therefore distributed training might significantly increase the training time since each batch needs to be copied to each worker for each epoch. Keeping all data on the main process might be faster for most datasets. + scheduler : str + Type of learning rate scheduler to use for continued training. If None, uses ExponentialLR as + default as specified in the model config. + Options + * ``StepLR``: Step Learning Rate scheduler + * ``ExponentialLR``: Exponential Learning Rate scheduler + * ``CosineAnnealingLR``: Cosine Annealing Learning Rate scheduler Returns ------- @@ -2796,7 +2817,8 @@ def _train( checkpoint_path = self.metrics_logger.checkpoint_path checkpoint = torch.load(checkpoint_path) - previous_epoch = self.model.current_epoch + checkpoint_epoch = checkpoint["epoch"] if "epoch" in checkpoint else 0 + previous_epoch = max(self.model.current_epoch, checkpoint_epoch) # Set continue_training flag in model to update scheduler correctly self.model.continue_training = True diff --git a/tests/test_utils.py b/tests/test_utils.py index a1f8c5874..3b93721bf 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -115,3 +115,34 @@ def test_continue_training(): metrics = m.fit(df, checkpointing=True, freq="D") metrics2 = m.fit(df, freq="D", continue_training=True, epochs=ADDITIONAL_EPOCHS) assert metrics["Loss"].min() >= metrics2["Loss"].min() + + +def test_continue_training_with_scheduler_selection(): + df = pd.read_csv(PEYTON_FILE, nrows=NROWS) + m = NeuralProphet( + epochs=EPOCHS, + batch_size=BATCH_SIZE, + learning_rate=LR, + n_lags=6, + n_forecasts=3, + n_changepoints=0, + ) + metrics = m.fit(df, checkpointing=True, freq="D") + # Continue training with StepLR + metrics2 = m.fit(df, freq="D", continue_training=True, epochs=ADDITIONAL_EPOCHS, scheduler="StepLR") + assert metrics["Loss"].min() >= metrics2["Loss"].min() + + +def test_save_load_continue_training(): + df = pd.read_csv(PEYTON_FILE, nrows=NROWS) + m = NeuralProphet( + epochs=EPOCHS, + n_lags=6, + n_forecasts=3, + n_changepoints=0, + ) + metrics = m.fit(df, checkpointing=True, freq="D") + save(m, "test_model.pt") + m2 = load("test_model.pt") + metrics2 = m2.fit(df, continue_training=True, epochs=ADDITIONAL_EPOCHS, scheduler="StepLR") + assert metrics["Loss"].min() >= metrics2["Loss"].min() From e04320157766efd517ac3149d62a41d62373b67c Mon Sep 17 00:00:00 2001 From: Constantin Weberpals Date: Tue, 9 Jul 2024 14:22:17 +0200 Subject: [PATCH 12/12] fix array mismatch --- neuralprophet/forecaster.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/neuralprophet/forecaster.py b/neuralprophet/forecaster.py index 05cdc3f75..1c99def2d 100644 --- a/neuralprophet/forecaster.py +++ b/neuralprophet/forecaster.py @@ -2916,7 +2916,13 @@ def _train( # Return metrics collected in logger as dataframe if self.metrics_logger.history is not None: - metrics_df = pd.DataFrame(self.metrics_logger.history) + # avoid array mismatch when continuing training + history = self.metrics_logger.history + max_length = max(len(lst) for lst in history.values()) + for key in history: + while len(history[key]) < max_length: + history[key].append(None) + metrics_df = pd.DataFrame(history) else: metrics_df = pd.DataFrame() return metrics_df