diff --git a/changes.d/5090.feat.md b/changes.d/5090.feat.md new file mode 100644 index 00000000000..a9f4e947f27 --- /dev/null +++ b/changes.d/5090.feat.md @@ -0,0 +1,2 @@ +Distinct initial and final graphs, separated from the main cycling graph, +to make it easier to configure special behaviour at startup and shutdown. diff --git a/cylc/flow/command_validation.py b/cylc/flow/command_validation.py index d0507ecd340..07339ed5d8c 100644 --- a/cylc/flow/command_validation.py +++ b/cylc/flow/command_validation.py @@ -27,6 +27,7 @@ ) from cylc.flow.cycling.loader import standardise_point_string +from cylc.flow.cycling.nocycle import NOCYCLE_POINTS from cylc.flow.exceptions import InputError, PointParsingError from cylc.flow.flow_mgr import ( FLOW_NEW, @@ -40,7 +41,6 @@ from cylc.flow.scripts.set import XTRIGGER_PREREQ_PREFIX from cylc.flow.task_outputs import TASK_OUTPUT_SUCCEEDED - if TYPE_CHECKING: from cylc.flow.id import TaskTokens @@ -359,7 +359,10 @@ def is_tasks(ids: Iterable[str]) -> 'Set[TaskTokens]': tokens = tokens.duplicate(task='root') # if the cycle is not a glob or reference, standardise it - if ( + if cast('str', tokens['cycle']) in NOCYCLE_POINTS: + # OK: startup or shutdown graph + pass + elif ( # cycle point is a glob not contains_fnmatch(cast('str', tokens['cycle'])) # cycle point is a reference to the ICP/FCP diff --git a/cylc/flow/config.py b/cylc/flow/config.py index 519dd49afcf..dbd74abc6d0 100644 --- a/cylc/flow/config.py +++ b/cylc/flow/config.py @@ -59,6 +59,13 @@ from cylc.flow.c3mro import C3 from cylc.flow.cfgspec.glbl_cfg import glbl_cfg from cylc.flow.cfgspec.workflow import RawWorkflowConfig + +from cylc.flow.cycling.nocycle import ( + NocycleSequence, + NOCYCLE_SEQ_STARTUP, + NOCYCLE_SEQ_SHUTDOWN +) +from cylc.flow.id import Tokens from cylc.flow.cycling.integer import IntegerInterval from cylc.flow.cycling.iso8601 import ( ISO8601Interval, @@ -87,7 +94,6 @@ import cylc.flow.flags from cylc.flow.graph_parser import GraphParser from cylc.flow.graphnode import GraphNodeParser -from cylc.flow.id import Tokens from cylc.flow.listify import listify from cylc.flow.log_level import verbosity_to_env from cylc.flow.param_expand import NameExpander @@ -311,6 +317,7 @@ def __init__( self.start_point: 'PointBase' self.stop_point: Optional['PointBase'] = None self.final_point: Optional['PointBase'] = None + self.nocycle_sequences: Set['NocycleSequence'] = set() self.sequences: List['SequenceBase'] = [] self.actual_first_point: Optional['PointBase'] = None self._start_point_for_actual_first_point: Optional['PointBase'] = None @@ -657,8 +664,13 @@ def _warn_if_queues_have_implicit_tasks( ) def prelim_process_graph(self) -> None: - """Ensure graph is not empty; set integer cycling mode and icp/fcp = 1 - for simplest "R1 = foo" type graphs. + """Error if graph empty; set integer cycling and icp/fcp = 1, + if those settings are omitted and the graph is acyclic graphs. + + Somewhat relevant notes: + - The default (if not set) cycling mode, gregorian, requires an ICP. + - cycling mode is not stored in the DB, so recompute for restarts. + """ graphdict = self.cfg['scheduling']['graph'] if not any(graphdict.values()): @@ -667,9 +679,21 @@ def prelim_process_graph(self) -> None: if ( 'cycling mode' not in self.cfg['scheduling'] and self.cfg['scheduling'].get('initial cycle point', '1') == '1' and - all(item in ['graph', '1', 'R1'] for item in graphdict) + all( + seq in [ + 'R1', + str(NOCYCLE_SEQ_STARTUP), + str(NOCYCLE_SEQ_SHUTDOWN), + 'graph', # Cylc 7 back-compat + '1' # Cylc 7 back-compat? + ] + for seq in graphdict + ) ): - # Pure acyclic graph, assume integer cycling mode with '1' cycle + # Non-cycling graph, assume integer cycling mode with '1' cycle. + # Typos in "startup", "shutdown", or "R1" will appear as cycling + # here, but will be fatal later during proper recurrance checking. + self.cfg['scheduling']['cycling mode'] = INTEGER_CYCLING_TYPE for key in ('initial cycle point', 'final cycle point'): if key not in self.cfg['scheduling']: @@ -2350,15 +2374,25 @@ def load_graph(self): try: seq = get_sequence(section, icp, fcp) except (AttributeError, TypeError, ValueError, CylcError) as exc: - if cylc.flow.flags.verbosity > 1: - traceback.print_exc() - msg = 'Cannot process recurrence %s' % section - msg += ' (initial cycle point=%s)' % icp - msg += ' (final cycle point=%s)' % fcp - if isinstance(exc, CylcError): - msg += ' %s' % exc.args[0] - raise WorkflowConfigError(msg) from None - self.sequences.append(seq) + try: + # is it a startup or shutdown graph? + seq = NocycleSequence(section) + except ValueError: + if cylc.flow.flags.verbosity > 1: + traceback.print_exc() + msg = ( + f"Cannot process recurrence {section}" + f" (initial cycle point={icp})" + f" (final cycle point={fcp})" + ) + if isinstance(exc, CylcError): + msg += ' %s' % exc.args[0] + raise WorkflowConfigError(msg) from None + else: + self.nocycle_sequences.add(seq) + else: + self.sequences.append(seq) + parser = GraphParser( family_map, self.parameters, diff --git a/cylc/flow/cycling/__init__.py b/cylc/flow/cycling/__init__.py index 4d2c8da11a7..92f75c7aea1 100644 --- a/cylc/flow/cycling/__init__.py +++ b/cylc/flow/cycling/__init__.py @@ -349,7 +349,11 @@ def TYPE_SORT_KEY(self) -> int: @classmethod @abstractmethod # Note: stacked decorator not strictly enforced in Py2.x def get_async_expr(cls, start_point=0): - """Express a one-off sequence at the initial cycle point.""" + """Express a one-off sequence at the initial cycle point. + + Note "async" has nothing to do with asyncio. It was a (bad) + name for one-off (non-cycling) graphs in early Cylc versions. + """ pass @abstractmethod diff --git a/cylc/flow/cycling/integer.py b/cylc/flow/cycling/integer.py index ce81ba55a94..4d1347723b9 100644 --- a/cylc/flow/cycling/integer.py +++ b/cylc/flow/cycling/integer.py @@ -21,7 +21,12 @@ import re from cylc.flow.cycling import ( - PointBase, IntervalBase, SequenceBase, ExclusionBase, parse_exclusion, cmp + PointBase, + IntervalBase, + SequenceBase, + ExclusionBase, + parse_exclusion, + cmp ) from cylc.flow.exceptions import ( CylcMissingContextPointError, diff --git a/cylc/flow/cycling/loader.py b/cylc/flow/cycling/loader.py index 11b86f0819c..8a9f39d91bc 100644 --- a/cylc/flow/cycling/loader.py +++ b/cylc/flow/cycling/loader.py @@ -21,12 +21,14 @@ from typing import Optional, Type, overload -from cylc.flow.cycling import PointBase, integer, iso8601 +from cylc.flow.cycling import PointBase, integer, iso8601, nocycle from metomi.isodatetime.data import Calendar ISO8601_CYCLING_TYPE = iso8601.CYCLER_TYPE_ISO8601 INTEGER_CYCLING_TYPE = integer.CYCLER_TYPE_INTEGER +NOCYCLE_CYCLING_TYPE = nocycle.CYCLER_TYPE_NOCYCLE + IS_OFFSET_ABSOLUTE_IMPLS = { INTEGER_CYCLING_TYPE: integer.is_offset_absolute, diff --git a/cylc/flow/cycling/nocycle.py b/cylc/flow/cycling/nocycle.py new file mode 100644 index 00000000000..4eb8a38e723 --- /dev/null +++ b/cylc/flow/cycling/nocycle.py @@ -0,0 +1,253 @@ +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +Cycling logic for isolated non-cycling startup and shutdown graphs. +""" + +from cylc.flow.cycling import PointBase, SequenceBase + +# cycle point values +NOCYCLE_PT_STARTUP = "startup" +NOCYCLE_PT_SHUTDOWN = "shutdown" + +NOCYCLE_POINTS = ( + NOCYCLE_PT_STARTUP, + NOCYCLE_PT_SHUTDOWN +) + +CYCLER_TYPE_NOCYCLE = "nocycle" +CYCLER_TYPE_SORT_KEY_NOCYCLE = 1 + +# Unused abstract methods below left to raise NotImplementedError. + + +class NocyclePoint(PointBase): + """A non-advancing string-valued cycle point.""" + + TYPE = CYCLER_TYPE_NOCYCLE + TYPE_SORT_KEY = CYCLER_TYPE_SORT_KEY_NOCYCLE + + __slots__ = ('value') + + def __init__(self, value: str) -> None: + """Initialise a nocycle point. + + >>> NocyclePoint(NOCYCLE_PT_STARTUP) + startup + >>> NocyclePoint("beta") + Traceback (most recent call last): + ValueError: Illegal Nocycle value 'beta' + """ + if value not in [NOCYCLE_PT_STARTUP, NOCYCLE_PT_SHUTDOWN]: + raise ValueError(f"Illegal Nocycle value '{value}'") + self.value = value + + def __hash__(self): + """Hash it. + + >>> bool(hash(NocyclePoint(NOCYCLE_PT_STARTUP))) + True + """ + return hash(self.value) + + def __eq__(self, other): + """Equality. + + >>> (NocyclePoint(NOCYCLE_PT_STARTUP) == + ... NocyclePoint(NOCYCLE_PT_STARTUP)) + True + >>> (NocyclePoint(NOCYCLE_PT_STARTUP) == + ... NocyclePoint(NOCYCLE_PT_SHUTDOWN)) + False + """ + return str(other) == str(self.value) + + def __le__(self, other): + """Less than or equal (only if equal). + + >>> (NocyclePoint(NOCYCLE_PT_STARTUP) <= + ... NocyclePoint(NOCYCLE_PT_STARTUP)) + True + >>> (NocyclePoint(NOCYCLE_PT_STARTUP) <= + ... NocyclePoint(NOCYCLE_PT_SHUTDOWN)) + False + """ + return str(other) == self.value + + def __lt__(self, other): + """Less than (never). + + >>> (NocyclePoint(NOCYCLE_PT_STARTUP) < + ... NocyclePoint(NOCYCLE_PT_STARTUP)) + False + >>> (NocyclePoint(NOCYCLE_PT_STARTUP) < + ... NocyclePoint(NOCYCLE_PT_SHUTDOWN)) + False + """ + return False + + def __gt__(self, other): + """Greater than (never). + >>> (NocyclePoint(NOCYCLE_PT_STARTUP) > + ... NocyclePoint(NOCYCLE_PT_STARTUP)) + False + >>> (NocyclePoint(NOCYCLE_PT_STARTUP) > + ... NocyclePoint(NOCYCLE_PT_SHUTDOWN)) + False + """ + return False + + def __str__(self): + """ + >>> str(NocyclePoint(NOCYCLE_PT_STARTUP)) + 'startup' + >>> str(NocyclePoint(NOCYCLE_PT_SHUTDOWN)) + 'shutdown' + """ + return self.value + + def _cmp(self, other): + raise NotImplementedError + + def add(self, other): + # Not used. + raise NotImplementedError + + def sub(self, other): + # Not used. + raise NotImplementedError + + +class NocycleSequence(SequenceBase): + """A single point sequence.""" + + def __init__(self, dep_section, p_context_start=None, p_context_stop=None): + """Workflow cycling context is ignored. + + >>> NocycleSequence("startup").point + startup + """ + self.point = NocyclePoint(dep_section) + + def __hash__(self): + """Hash it. + + >>> bool(hash(NocycleSequence("startup"))) + True + """ + return hash(str(self.point)) + + def is_valid(self, point): + """Is point on-sequence and in-bounds? + + >>> NocycleSequence("startup").is_valid("startup") + True + >>> NocycleSequence("startup").is_valid("shutdown") + False + """ + return str(point) == str(self.point) + + def get_first_point(self, point): + """First point is the only point. + + >>> NocycleSequence("startup").get_first_point("shutdown") + startup + """ + return self.point + + def get_start_point(self, point): + """First point is the only point.""" + # Not used. + raise NotImplementedError + return self.point + + def get_next_point(self, point): + """There is no next point. + + >>> NocycleSequence("startup").get_next_point("startup") + """ + return None + + def get_next_point_on_sequence(self, point): + """There is no next point. + + >>> NocycleSequence("startup").get_next_point_on_sequence("startup") + """ + return None + + def __eq__(self, other): + """Equality. + + >>> NocycleSequence("startup") == NocycleSequence("startup") + True + >>> NocycleSequence("startup") == NocycleSequence("shutdown") + False + """ + try: + return str(other.point) == str(self.point) + except AttributeError: + # (other has not .point) + return False + + def __str__(self): + """String. + + >>> str(NocycleSequence("startup")) + 'startup' + """ + return str(self.point) + + def TYPE(self): + raise NotImplementedError + + def TYPE_SORT_KEY(self): + raise NotImplementedError + + def get_async_expr(cls, start_point=0): + raise NotImplementedError + + def get_interval(self): + """Return the cycling interval of this sequence.""" + raise NotImplementedError + + def get_offset(self): + """Deprecated: return the offset used for this sequence.""" + raise NotImplementedError + + def set_offset(self, i_offset): + """Deprecated: alter state to offset the entire sequence.""" + raise NotImplementedError + + def is_on_sequence(self, point): + """Is point on-sequence, disregarding bounds?""" + raise NotImplementedError + + def get_prev_point(self, point): + """Return the previous point < point, or None if out of bounds.""" + raise NotImplementedError + + def get_nearest_prev_point(self, point): + """Return the largest point < some arbitrary point.""" + raise NotImplementedError + + def get_stop_point(self): + """Return the last point in this sequence, or None if unbounded.""" + raise NotImplementedError + + +NOCYCLE_SEQ_STARTUP = NocycleSequence(NOCYCLE_PT_STARTUP) +NOCYCLE_SEQ_SHUTDOWN = NocycleSequence(NOCYCLE_PT_SHUTDOWN) diff --git a/cylc/flow/data_store_mgr.py b/cylc/flow/data_store_mgr.py index e40b1140354..520e949e281 100644 --- a/cylc/flow/data_store_mgr.py +++ b/cylc/flow/data_store_mgr.py @@ -79,6 +79,7 @@ __version__ as CYLC_VERSION, ) from cylc.flow.cycling.loader import get_point +from cylc.flow.cycling.nocycle import NOCYCLE_POINTS from cylc.flow.data_messages_pb2 import ( AllDeltas, EDeltas, @@ -1080,7 +1081,11 @@ def increment_graph_window( ) for items in graph_children.values(): for child_name, child_point, _ in items: - if final_point and child_point > final_point: + if ( + final_point + and str(child_point) not in NOCYCLE_POINTS + and child_point > final_point + ): continue child_tokens = self.id_.duplicate( cycle=str(child_point), @@ -1105,7 +1110,11 @@ def increment_graph_window( taskdefs ).values(): for parent_name, parent_point, _ in items: - if final_point and parent_point > final_point: + if ( + str(parent_point) not in NOCYCLE_POINTS + and final_point + and (parent_point > final_point) + ): continue parent_tokens = self.id_.duplicate( cycle=str(parent_point), @@ -2353,16 +2362,20 @@ def update_workflow(self, reloaded=False): delta_set = True if self.schd.pool.active_tasks: - pool_points = set(self.schd.pool.active_tasks) - - oldest_point = str(min(pool_points)) - if w_data.oldest_active_cycle_point != oldest_point: - w_delta.oldest_active_cycle_point = oldest_point - delta_set = True - newest_point = str(max(pool_points)) - if w_data.newest_active_cycle_point != newest_point: - w_delta.newest_active_cycle_point = newest_point - delta_set = True + pool_points = { + p for p in self.schd.pool.active_tasks + if str(p) not in NOCYCLE_POINTS + } + if pool_points: + # TODO is it OK if not set (due to nocycle points)? + oldest_point = str(min(pool_points)) + if w_data.oldest_active_cycle_point != oldest_point: + w_delta.oldest_active_cycle_point = oldest_point + delta_set = True + newest_point = str(max(pool_points)) + if w_data.newest_active_cycle_point != newest_point: + w_delta.newest_active_cycle_point = newest_point + delta_set = True if delta_set: w_delta.id = self.workflow_id diff --git a/cylc/flow/flow_mgr.py b/cylc/flow/flow_mgr.py index 5508005586b..35a1c4fb848 100644 --- a/cylc/flow/flow_mgr.py +++ b/cylc/flow/flow_mgr.py @@ -214,7 +214,7 @@ def get_flow( "start_time": now_sec } LOG.info( - f"New flow: {flow_num} ({meta}) {now_sec}" + f"New flow: {flow_num} ({meta})" ) self.db_mgr.put_insert_workflow_flows( flow_num, diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py index ea690ff9ea9..772a3495c44 100644 --- a/cylc/flow/scheduler.py +++ b/cylc/flow/scheduler.py @@ -17,6 +17,7 @@ import asyncio from collections import deque +from functools import partial from contextlib import suppress import logging import os @@ -46,6 +47,7 @@ TYPE_CHECKING, Any, AsyncGenerator, + Callable, Dict, Iterable, List, @@ -72,9 +74,17 @@ from cylc.flow.broadcast_mgr import BroadcastMgr from cylc.flow.cfgspec.glbl_cfg import glbl_cfg from cylc.flow.config import WorkflowConfig +from cylc.flow.cycling.nocycle import ( + NOCYCLE_POINTS, + NOCYCLE_PT_STARTUP, + NOCYCLE_PT_SHUTDOWN, + NOCYCLE_SEQ_STARTUP, + NOCYCLE_SEQ_SHUTDOWN +) from cylc.flow.data_store_mgr import DataStoreMgr from cylc.flow.exceptions import ( CommandFailedError, + CyclerTypeError, CylcError, InputError, ) @@ -339,6 +349,8 @@ def __init__(self, id_: str, options: 'Values') -> None: # load anything (e.g. template variables) from it os.unlink(self.workflow_db_mgr.pub_path) + self.graph_loaders: List[Callable] = [] + # Map used to track incomplete remote inits for restart # {install_target: platform} self.incomplete_ri_map: Dict[str, Dict] = {} @@ -442,7 +454,7 @@ async def configure(self, params): """Configure the scheduler. * Load the flow configuration. - * Load/write workflow parameters from the DB. + * Load/write workflow parameters from/to the DB. * Get the data store rolling. """ @@ -469,8 +481,10 @@ async def configure(self, params): self.profiler.log_memory("scheduler.py: before load_flow_file") try: - cfg = self.load_flow_file() - self.apply_new_config(cfg, is_reload=False) + self.apply_new_config( + self.load_flow_file(), + is_reload=False + ) except ParsecError as exc: # Mark this exc as expected (see docstring for .schd_expected): exc.schd_expected = True @@ -516,17 +530,6 @@ async def configure(self, params): self.data_store_mgr.initiate_data_model() - self.profiler.log_memory("scheduler.py: before load_tasks") - if self.is_restart: - self._load_pool_from_db() - if self.restored_stop_task_id is not None: - self.pool.set_stop_task(self.restored_stop_task_id) - elif self.options.starttask: - self._load_pool_from_tasks() - else: - self._load_pool_from_point() - self.profiler.log_memory("scheduler.py: after load_tasks") - self.workflow_db_mgr.put_workflow_params(self) self.workflow_db_mgr.put_workflow_template_vars(self.template_vars) self.workflow_db_mgr.put_runtime_inheritance(self.config) @@ -650,6 +653,11 @@ def log_start(self) -> None: f'Run mode: {self.get_run_mode().value}', extra=RotatingLogFileHandler.header_extra ) + LOG.info( + "Cycling mode:" + f" {self.config.cfg['scheduling']['cycling mode']}", + extra=RotatingLogFileHandler.header_extra + ) LOG.info( f'Initial point: {self.config.initial_point}', extra=RotatingLogFileHandler.header_extra @@ -669,6 +677,87 @@ def log_start(self) -> None: extra=RotatingLogFileHandler.header_extra ) + async def _get_graph_loaders(self) -> None: + """Tee up the loaders for configured graphs (startup, main, shutdown). + + A graph loader loads the task pool with the graph's parentless tasks + to get the graph started. + + In a restart the task pool is loaded from the DB before calling this + method so we can examine the pool to see what's still to run. + + NOTE: always spawn the first task of the next graph if shutting down + at the end of a graph, to ensure that restart works correctly. + + """ + points = [p.value for p in self.pool.get_points()] + if self.is_restart and not points: + return + + if ( + NOCYCLE_SEQ_SHUTDOWN in self.config.nocycle_sequences + and ( + not points or NOCYCLE_PT_SHUTDOWN not in points + ) + ): + # shutdown section exists and hasn't started yet. + self.graph_loaders.append( + partial(self.pool.load_nocycle_graph, NOCYCLE_SEQ_SHUTDOWN) + ) + + if ( + self.config.sequences and ( + not points + or ( + not any(p not in NOCYCLE_POINTS for p in points) + and NOCYCLE_PT_SHUTDOWN not in points + ) + ) + ): + # Main graph exists, and hasn't started yet (and hasn't + # already run, or the pool would contain shutdown tasks. + if self.options.starttask: + # Cold start from specified tasks. + self.graph_loaders.append(self._load_pool_from_tasks) + else: + # Cold start from cycle point. + self.graph_loaders.append(self._load_pool_from_point) + + if ( + NOCYCLE_SEQ_STARTUP in self.config.nocycle_sequences + and ( + not self.is_restart + ) + ): + # startup section exists and hasn't started yet. + # (Not in a restart - the pool would already be loaded from DB). + self.graph_loaders.append( + partial(self.pool.load_nocycle_graph, NOCYCLE_SEQ_STARTUP) + ) + + async def run_graphs(self): + self.graph_loaders = [] + + if self.pool.active_tasks: + # pool already loaded for integration test! + while await self._main_loop(): + pass + return + + if self.is_restart: + await self._load_pool_from_db() + self.pool.compute_runahead() + self.pool.release_runahead_tasks() + + await self._get_graph_loaders() + + while await self._main_loop(): + pass + while self.graph_loaders: + (self.graph_loaders.pop())() + while await self._main_loop(): + pass + async def run_scheduler(self) -> None: """Start the scheduler main loop.""" try: @@ -702,13 +791,12 @@ async def run_scheduler(self) -> None: self ) ) - self.server.publish_queue.put( - self.data_store_mgr.publish_deltas) + self.server.publish_queue.put(self.data_store_mgr.publish_deltas) + # Non-async sleep - yield to other threads rather than event loop sleep(0) self.profiler.start() - while True: # MAIN LOOP - await self._main_loop() + await self.run_graphs() except SchedulerStop as exc: # deliberate stop @@ -750,7 +838,6 @@ async def run_scheduler(self) -> None: await self.handle_exception(exc) else: - # main loop ends (not used?) await self.shutdown(SchedulerStop(StopMode.AUTO.value)) finally: @@ -861,15 +948,21 @@ def _load_pool_from_point(self): released from runhead.) """ - start_type = ( - "Warm" if self.config.start_point > self.config.initial_point - else "Cold" - ) - LOG.info(f"{start_type} start from {self.config.start_point}") + LOG.info("Loading main graph") + msg = f"start from {self.config.start_point}" + if self.config.start_point == self.config.initial_point: + msg = "Cold " + msg + LOG.info(msg) self.pool.load_from_point() - def _load_pool_from_db(self): + async def _load_pool_from_db(self): """Load task pool from DB, for a restart.""" + LOG.info("Loading from DB for restart") + + self.task_job_mgr.task_remote_mgr.is_restart = True + self.task_job_mgr.task_remote_mgr.rsync_includes = ( + self.config.get_validated_rsync_includes()) + self.workflow_db_mgr.pri_dao.select_broadcast_states( self.broadcast_mgr.load_db_broadcast_states) self.broadcast_mgr.post_load_db_coerce() @@ -889,6 +982,50 @@ def _load_pool_from_db(self): self.pool.load_db_tasks_to_hold() self.pool.update_flow_mgr() + if self.restored_stop_task_id is not None: + self.pool.set_stop_task(self.restored_stop_task_id) + + self.log_start() + + all_tasks = self.pool.get_tasks() + if not all_tasks: + # Restart with empty pool: only unfinished event handlers. + # This workflow completed before restart; wait for intervention. + with suppress(KeyError): + self.timers[self.EVENT_RESTART_TIMEOUT].reset() + self.is_restart_timeout_wait = True + LOG.warning( + "This workflow already ran to completion." + "\nTo make it continue, trigger new tasks" + " before the restart timeout." + ) + return + + self.restart_remote_init() + # Poll all pollable tasks + await commands.run_cmd(commands.poll_tasks(self, ['*/*'])) + # Cycle point globs don't match startup and shutdown: + await commands.run_cmd( + commands.poll_tasks(self, [f"{NOCYCLE_PT_STARTUP}/*"])) + await commands.run_cmd( + commands.poll_tasks(self, [f"{NOCYCLE_PT_SHUTDOWN}/*"])) + + # If we shut down with manually triggered waiting tasks, + # submit them to run now. + # NOTE: this will run tasks that were triggered with + # the trigger "--on-resume" option, even if the workflow + # is restarted as paused. Option to be removed at 8.5.0. + pre_prep_tasks = [] + for itask in all_tasks: + if ( + itask.is_manual_submit + and itask.state(TASK_STATUS_WAITING) + ): + itask.waiting_on_job_prep = True + pre_prep_tasks.append(itask) + + self.start_job_submission(pre_prep_tasks) + def restart_remote_init(self): """Remote init for all submitted/running tasks in the pool.""" self.task_job_mgr.task_remote_mgr.is_restart = True @@ -934,11 +1071,11 @@ def manage_remote_init(self): install_target] if status == REMOTE_INIT_DONE: self.task_job_mgr.task_remote_mgr.file_install(platform) - if status in [REMOTE_FILE_INSTALL_DONE, - REMOTE_INIT_255, - REMOTE_FILE_INSTALL_255, - REMOTE_INIT_FAILED, - REMOTE_FILE_INSTALL_FAILED]: + elif status in [REMOTE_FILE_INSTALL_DONE, + REMOTE_INIT_255, + REMOTE_FILE_INSTALL_255, + REMOTE_INIT_FAILED, + REMOTE_FILE_INSTALL_FAILED]: # Remove install target self.incomplete_ri_map.pop(install_target) @@ -1471,6 +1608,7 @@ def timeout_check(self): async def workflow_shutdown(self): """Determines if the workflow can be shutdown yet.""" + # from pudb import set_trace; set_trace() if self.pool.check_abort_on_task_fails(): self._set_stop(StopMode.AUTO_ON_TASK_FAILURE) @@ -1618,8 +1756,11 @@ def update_profiler_logs(self, tinit): self.count, get_current_time_string())) self.count += 1 - async def _main_loop(self) -> None: - """A single iteration of the main loop.""" + async def _main_loop(self) -> bool: + """A single iteration of the main loop. + + Return False to stop iterating. + """ tinit = time() # Useful for debugging core scheduler issues: @@ -1682,10 +1823,11 @@ async def _main_loop(self) -> None: # paused. This allows broadcast-and-trigger beyond the expiry # limit, by pausing before doing it (after which the expiry # limit moves back). - with suppress(TimePointDumperBoundsError): + with suppress(TimePointDumperBoundsError, CyclerTypeError): # NOTE: TimePointDumperBoundsError will be raised for negative # cycle points, we skip broadcast expiry in this circumstance # (pre-initial condition) + # NOTE: CyclerTypeError raised for startup and shutdown graphs if min_point := self.pool.get_min_point(): # NOTE: the broadcast expire limit is the oldest active # cycle MINUS the longest cycling interval @@ -1733,6 +1875,11 @@ async def _main_loop(self) -> None: with suppress(KeyError): self.timers[self.EVENT_STALL_TIMEOUT].stop() + if self.graph_finished() and self.graph_loaders: + # Return control to load the next graph. + LOG.debug("Graph finished") + return False + self.process_workflow_db_queue() # If public database is stuck, blast it away by copying the content @@ -1742,7 +1889,6 @@ async def _main_loop(self) -> None: # Shutdown workflow if timeouts have occurred self.timeout_check() - # Does the workflow need to shutdown on task failure? await self.workflow_shutdown() if self.options.profile_mode: @@ -1778,6 +1924,8 @@ async def _main_loop(self) -> None: await asyncio.sleep(duration) # Record latest main loop interval self.main_loop_intervals.append(time() - tinit) + + return True # END MAIN LOOP def _update_workflow_state(self): @@ -1992,6 +2140,21 @@ def set_stop_clock(self, unix_time): self.workflow_db_mgr.put_workflow_stop_clock_time(self.stop_clock_time) self.update_data_store() + def graph_finished(self): + """Nothing left to run.""" + return not any( + itask for itask in self.pool.get_tasks() + if itask.state( + TASK_STATUS_PREPARING, + TASK_STATUS_SUBMITTED, + TASK_STATUS_RUNNING + ) + or ( + itask.state(TASK_STATUS_WAITING) + and not itask.state.is_runahead + ) + ) + def stop_clock_done(self): """Return True if wall clock stop time reached.""" if self.stop_clock_time is None: @@ -2009,10 +2172,13 @@ def stop_clock_done(self): def check_auto_shutdown(self): """Check if we should shut down now.""" + # if more tasks to run (if waiting and not + # runahead, then held, queued, or xtriggered). if ( self.is_paused or self.is_restart_timeout_wait or self.check_workflow_stalled() or + not self.graph_finished() or # if more tasks to run (if waiting and not # runahead, then held, queued, or xtriggered). any( diff --git a/cylc/flow/scripts/graph.py b/cylc/flow/scripts/graph.py index 6438e8de476..b148dc4e2ad 100644 --- a/cylc/flow/scripts/graph.py +++ b/cylc/flow/scripts/graph.py @@ -45,6 +45,7 @@ from typing import Dict, List, Optional, TYPE_CHECKING, Tuple, Callable from cylc.flow.config import WorkflowConfig +from cylc.flow.cycling.nocycle import NOCYCLE_PT_STARTUP from cylc.flow.exceptions import InputError, CylcError from cylc.flow.id import Tokens from cylc.flow.id_cli import parse_id_async @@ -68,10 +69,20 @@ def sort_integer_node(id_): Example: >>> sort_integer_node('11/foo') ('foo', 11) - + >>> sort_integer_node('startup/foo') + ('foo', 0) + >>> sort_integer_node('shutdown/foo') + ('foo', 1) """ tokens = Tokens(id_, relative=True) - return (tokens['task'], int(tokens['cycle'])) + try: + return (tokens['task'], int(tokens['cycle'])) + except ValueError: + # nocycle point + if tokens['cycle'] == NOCYCLE_PT_STARTUP: + return (tokens['task'], 0) + else: + return (tokens['task'], 1) def sort_integer_edge(id_): diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py index 00ca7f2974c..0369b189f28 100644 --- a/cylc/flow/task_pool.py +++ b/cylc/flow/task_pool.py @@ -42,7 +42,11 @@ from cylc.flow.exceptions import ( PlatformLookupError, PointParsingError, - WorkflowConfigError, + WorkflowConfigError +) +from cylc.flow.cycling.nocycle import ( + NocyclePoint, + NOCYCLE_POINTS ) import cylc.flow.flags from cylc.flow.flow_mgr import ( @@ -242,13 +246,38 @@ def _swap_out(self, itask): self.active_tasks[itask.point][itask.identity] = itask self.active_tasks_changed = True + def load_nocycle_graph(self, seq): + """Load task pool for a no-cycle (startup or shutdown) graph.""" + + LOG.info(f"Loading {seq} graph") + # Always start flow 1 for automatic load from start of a graph. + flow_num = self.flow_mgr.get_flow( + 1, meta=f"original {seq} flow", + ) + self.runahead_limit_point = None + for name in self.config.get_task_name_list(): + tdef = self.config.get_taskdef(name) + if str(seq) not in [str(s) for s in tdef.sequences]: + continue + if tdef.is_parentless(seq.point, seq): + ntask, is_in_pool, _ = self.get_or_spawn_task( + seq.point, tdef, {flow_num} + ) + if ntask is not None: + if not is_in_pool: + self.add_to_pool(ntask) + self.rh_release_and_queue(ntask) + def load_from_point(self): """Load the task pool for the workflow start point. Add every parentless task out to the runahead limit. """ + # Always start flow 1 for automatic load from start of a graph. flow_num = self.flow_mgr.get_flow( - meta=f"original flow from {self.config.start_point}") + 1, meta=f"original flow from {self.config.start_point}", + ) + self.compute_runahead() for name in self.task_name_list: tdef = self.config.get_taskdef(name) @@ -298,7 +327,7 @@ def release_runahead_tasks(self): Return True if any tasks are released, else False. Call when RH limit changes. """ - if not self.active_tasks or not self.runahead_limit_point: + if not self.active_tasks: # (At start-up task pool might not exist yet) return False @@ -310,7 +339,12 @@ def release_runahead_tasks(self): itask for point, itask_id_map in self.active_tasks.items() for itask in itask_id_map.values() - if point <= self.runahead_limit_point + if ( + str(point) in NOCYCLE_POINTS + or + (self.runahead_limit_point and + point <= self.runahead_limit_point) + ) if itask.state.is_runahead ] @@ -360,7 +394,7 @@ def compute_runahead(self, force=False) -> bool: base_point: Optional['PointBase'] = None # First get the runahead base point. - if not self.active_tasks: + if not self.active_tasks or not self.runahead_limit_point: # Find the earliest sequence point beyond the workflow start point. base_point = min( ( @@ -369,13 +403,18 @@ def compute_runahead(self, force=False) -> bool: seq.get_first_point(self.config.start_point) for seq in self.config.sequences } - if point is not None + if ( + point is not None + and type(point) is not NocyclePoint # type: ignore + ) ), default=None, ) else: # Find the earliest point with incomplete tasks. - for point, itasks in sorted(self.get_tasks_by_point().items()): + for point, itasks in sorted( + self.get_tasks_by_point(exclude_nocycle_pts=True).items() + ): # All n=0 tasks are incomplete by definition, but Cylc 7 # ignores failed ones (it does not ignore submit-failed!). if ( @@ -386,6 +425,7 @@ def compute_runahead(self, force=False) -> bool: ) ): continue + base_point = point break @@ -439,6 +479,7 @@ def compute_runahead(self, force=False) -> bool: count += 1 sequence_points.add(seq_point) seq_point = sequence.get_next_point(seq_point) + self._prev_runahead_sequence_points = sequence_points self._prev_runahead_base_point = base_point @@ -542,11 +583,17 @@ def load_db_task_pool_for_restart(self, row_idx, row): (cycle, name, flow_nums, flow_wait, is_manual_submit, is_late, status, is_held, submit_num, _, platform_name, time_submit, time_run, timeout, outputs_str) = row + + if cycle in NOCYCLE_POINTS: + point = NocyclePoint(cycle) + else: + point = get_point(cycle) + try: itask = TaskProxy( self.tokens, self.config.get_taskdef(name), - get_point(cycle), + point, deserialise_set(flow_nums), status=status, is_held=is_held, @@ -957,11 +1004,14 @@ def get_task_ids(self) -> Set[str]: """Return a list of task IDs in the task pool.""" return {itask.identity for itask in self.get_tasks()} - def get_tasks_by_point(self) -> 'Dict[PointBase, List[TaskProxy]]': + def get_tasks_by_point( + self, exclude_nocycle_pts=False + ) -> 'Dict[PointBase, List[TaskProxy]]': """Return a map of task proxies by cycle point.""" return { point: list(itask_id_map.values()) for point, itask_id_map in self.active_tasks.items() + if not exclude_nocycle_pts or str(point) not in NOCYCLE_POINTS } def get_task(self, point: 'PointBase', name: str) -> Optional[TaskProxy]: @@ -1074,9 +1124,15 @@ def release_queued_tasks(self): # Note: released and pre_prep_tasks can overlap return list(set(released + pre_prep_tasks)) + def get_points(self): + """Return current list of cycle points in the pool.""" + return list(self.active_tasks) + def get_min_point(self): - """Return the minimum cycle point currently in the pool.""" - cycles = list(self.active_tasks) + """Return the minimum cycle point currently in the pool, or None.""" + cycles = [ + c for c in list(self.active_tasks) if str(c) not in NOCYCLE_POINTS + ] minc = None if cycles: minc = min(cycles) @@ -1549,7 +1605,9 @@ def spawn_on_output(self, itask: TaskProxy, output: str) -> None: if ( self.runahead_limit_point is not None + and str(t.point) not in NOCYCLE_POINTS and t.point <= self.runahead_limit_point + or str(t.point) in NOCYCLE_POINTS ): self.rh_release_and_queue(t) @@ -1693,6 +1751,10 @@ def can_be_spawned(self, name: str, point: 'PointBase') -> bool: LOG.debug('No task definition %s', name) return False + if str(point) in NOCYCLE_POINTS: + # TODO - check that task in these graphs + return True + # Don't spawn outside of graph limits. # TODO: is it possible for initial_point to not be defined?? # (see also the similar check + log message in scheduler.py) @@ -1884,7 +1946,11 @@ def spawn_task( # "foo; foo[+P1] & bar => baz" # Here, in the final cycle bar wants to spawn baz, but that would # stall because baz also depends on foo after the final point. - if self.stop_point and itask.point <= self.stop_point: + if ( + self.stop_point + and str(itask.point) not in NOCYCLE_POINTS + and itask.point <= self.stop_point + ): for pct in itask.state.prerequisites_get_target_points(): if pct > self.stop_point: LOG.warning( @@ -2289,7 +2355,8 @@ def _set_prereqs_itask( self.xtrigger_mgr.force_satisfy(itask, xtrigs) if ( - itask.state.is_runahead + str(itask.point) not in NOCYCLE_POINTS + and itask.state.is_runahead and self.runahead_limit_point is not None and itask.point <= self.runahead_limit_point ): diff --git a/cylc/flow/taskdef.py b/cylc/flow/taskdef.py index 96a67d3b380..dfe987ae6b5 100644 --- a/cylc/flow/taskdef.py +++ b/cylc/flow/taskdef.py @@ -310,15 +310,19 @@ def check_for_explicit_cycling(self): raise TaskDefError( "No cycling sequences defined for %s" % self.name) - def get_parent_points(self, point): + def get_parent_points(self, point, seq=None): """Return the cycle points of my parents, at point.""" parent_points = set() - for seq in self.sequences: - if not seq.is_valid(point): + if seq: + sequences = [seq] + else: + sequences = self.sequences + for sequence in sequences: + if not sequence.is_valid(point): continue - if seq in self.dependencies: + if sequence in self.dependencies: # task has prereqs in this sequence - for dep in self.dependencies[seq]: + for dep in self.dependencies[sequence]: if dep.suicide: continue for trig in dep.task_triggers: @@ -395,9 +399,12 @@ def is_valid_point(self, point: 'PointBase') -> bool: def first_point(self, icp): """Return the first point for this task.""" + from cylc.flow.cycling.nocycle import NocycleSequence point = None adjusted = [] for seq in self.sequences: + if type(seq) is NocycleSequence: + continue pt = seq.get_first_point(icp) if pt: # may be None if beyond the sequence bounds @@ -419,7 +426,7 @@ def next_point(self, point): p_next = min(adjusted) return p_next - def is_parentless(self, point): + def is_parentless(self, point, seq=None): """Return True if task has no parents at point. Tasks are considered parentless if they have: @@ -438,7 +445,8 @@ def is_parentless(self, point): if self.sequential: # Implicit parents return False - parent_points = self.get_parent_points(point) + + parent_points = self.get_parent_points(point, seq) return ( not parent_points or all(x < self.start_point for x in parent_points) diff --git a/tests/functional/alpha-omega/00-basic.t b/tests/functional/alpha-omega/00-basic.t new file mode 100644 index 00000000000..bd7d6f25d01 --- /dev/null +++ b/tests/functional/alpha-omega/00-basic.t @@ -0,0 +1,30 @@ +#!/usr/bin/env bash +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +#------------------------------------------------------------------------------- + +# Check basic separate triggering of startup, shutdown, and main graphs. +. "$(dirname "$0")/test_header" +set_test_number 3 + +install_and_validate + +reftest_run + +graph_workflow "${WORKFLOW_NAME}" "${WORKFLOW_NAME}.graph" +cmp_ok "${WORKFLOW_NAME}.graph" "$TEST_SOURCE_DIR/${TEST_NAME_BASE}/reference.graph" + +purge diff --git a/tests/functional/alpha-omega/00-basic/flow.cylc b/tests/functional/alpha-omega/00-basic/flow.cylc new file mode 100644 index 00000000000..28564c78480 --- /dev/null +++ b/tests/functional/alpha-omega/00-basic/flow.cylc @@ -0,0 +1,12 @@ + +[scheduling] + cycling mode = integer + final cycle point = 2 + [[graph]] + startup = "a => b" + shutdown = "x => y" + R1 = "foo => bar" + P1 = "bar => baz" +[runtime] + [[a, b, x, y]] + [[foo, bar, baz]] diff --git a/tests/functional/alpha-omega/00-basic/reference.graph b/tests/functional/alpha-omega/00-basic/reference.graph new file mode 100644 index 00000000000..018cde27a08 --- /dev/null +++ b/tests/functional/alpha-omega/00-basic/reference.graph @@ -0,0 +1,13 @@ +edge "startup/a" "startup/b" +edge "1/bar" "1/baz" +edge "2/bar" "2/baz" +edge "1/foo" "1/bar" +graph +node "startup/a" "a\nstartup" +node "startup/b" "b\nstartup" +node "1/bar" "bar\n1" +node "2/bar" "bar\n2" +node "1/baz" "baz\n1" +node "2/baz" "baz\n2" +node "1/foo" "foo\n1" +stop diff --git a/tests/functional/alpha-omega/00-basic/reference.log b/tests/functional/alpha-omega/00-basic/reference.log new file mode 100644 index 00000000000..77146d2c21b --- /dev/null +++ b/tests/functional/alpha-omega/00-basic/reference.log @@ -0,0 +1,9 @@ +startup/a -triggered off [] in flow 1 +startup/b -triggered off ['startup/a'] in flow 1 +1/foo -triggered off [] in flow 1 +2/bar -triggered off [] in flow 1 +1/bar -triggered off ['1/foo'] in flow 1 +2/baz -triggered off ['2/bar'] in flow 1 +1/baz -triggered off ['1/bar'] in flow 1 +shutdown/x -triggered off [] in flow 1 +shutdown/y -triggered off ['shutdown/x'] in flow 1 diff --git a/tests/functional/alpha-omega/01-restart.t b/tests/functional/alpha-omega/01-restart.t new file mode 100644 index 00000000000..deef53fae13 --- /dev/null +++ b/tests/functional/alpha-omega/01-restart.t @@ -0,0 +1,34 @@ +#!/usr/bin/env bash +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +#------------------------------------------------------------------------------- + +# Check restart in and between startup, R1, P1, and shutdown graphs. + +. "$(dirname "$0")/test_header" +set_test_number 8 + +install_and_validate + +SRC_DIR="$TEST_SOURCE_DIR/${TEST_NAME_BASE}" +RUN_DIR="$WORKFLOW_RUN_DIR" + +for RUN in $(seq 0 6); do + cp "${SRC_DIR}/reference.log.$RUN" "${RUN_DIR}/reference.log" + reftest_run "${TEST_NAME_BASE}-${RUN}" +done + +purge diff --git a/tests/functional/alpha-omega/01-restart/flow.cylc b/tests/functional/alpha-omega/01-restart/flow.cylc new file mode 100644 index 00000000000..82175f1cde7 --- /dev/null +++ b/tests/functional/alpha-omega/01-restart/flow.cylc @@ -0,0 +1,26 @@ +# Test stop and restart after every task in every type of graph. +[scheduling] + cycling mode = integer + final cycle point = 2 + [[graph]] + startup = "a => b" + R1 = "foo => bar" + P1 = "bar[-P1] => bar => baz" + shutdown = "x => y" +[runtime] + [[STOP]] + script = cylc stop $CYLC_WORKFLOW_ID + [[a, b, x, y, foo]] + inherit = STOP + [[bar]] + script = """ + if ((CYLC_TASK_CYCLE_POINT == 1)); then + cylc stop $CYLC_WORKFLOW_ID + fi + """ + [[baz]] + script = """ + if ((CYLC_TASK_CYCLE_POINT == 2)); then + cylc stop $CYLC_WORKFLOW_ID + fi + """ diff --git a/tests/functional/alpha-omega/01-restart/reference.log.0 b/tests/functional/alpha-omega/01-restart/reference.log.0 new file mode 100644 index 00000000000..a27bdd7b37c --- /dev/null +++ b/tests/functional/alpha-omega/01-restart/reference.log.0 @@ -0,0 +1 @@ +startup/a -triggered off [] in flow 1 diff --git a/tests/functional/alpha-omega/01-restart/reference.log.1 b/tests/functional/alpha-omega/01-restart/reference.log.1 new file mode 100644 index 00000000000..1afce31918c --- /dev/null +++ b/tests/functional/alpha-omega/01-restart/reference.log.1 @@ -0,0 +1,3 @@ +Initial point: 1 +Final point: 2 +startup/b -triggered off ['startup/a'] in flow 1 diff --git a/tests/functional/alpha-omega/01-restart/reference.log.2 b/tests/functional/alpha-omega/01-restart/reference.log.2 new file mode 100644 index 00000000000..edf876337c5 --- /dev/null +++ b/tests/functional/alpha-omega/01-restart/reference.log.2 @@ -0,0 +1,3 @@ +Initial point: 1 +Final point: 2 +1/foo -triggered off [] in flow 1 diff --git a/tests/functional/alpha-omega/01-restart/reference.log.3 b/tests/functional/alpha-omega/01-restart/reference.log.3 new file mode 100644 index 00000000000..a2e1f9bfbcf --- /dev/null +++ b/tests/functional/alpha-omega/01-restart/reference.log.3 @@ -0,0 +1,3 @@ +Initial point: 1 +Final point: 2 +1/bar -triggered off ['0/bar', '1/foo'] in flow 1 diff --git a/tests/functional/alpha-omega/01-restart/reference.log.4 b/tests/functional/alpha-omega/01-restart/reference.log.4 new file mode 100644 index 00000000000..f986452dc79 --- /dev/null +++ b/tests/functional/alpha-omega/01-restart/reference.log.4 @@ -0,0 +1,5 @@ +Initial point: 1 +Final point: 2 +2/bar -triggered off ['1/bar'] in flow 1 +1/baz -triggered off ['1/bar'] in flow 1 +2/baz -triggered off ['2/bar'] in flow 1 diff --git a/tests/functional/alpha-omega/01-restart/reference.log.5 b/tests/functional/alpha-omega/01-restart/reference.log.5 new file mode 100644 index 00000000000..ced181660e9 --- /dev/null +++ b/tests/functional/alpha-omega/01-restart/reference.log.5 @@ -0,0 +1,3 @@ +Initial point: 1 +Final point: 2 +shutdown/x -triggered off [] in flow 1 diff --git a/tests/functional/alpha-omega/01-restart/reference.log.6 b/tests/functional/alpha-omega/01-restart/reference.log.6 new file mode 100644 index 00000000000..dee29b15170 --- /dev/null +++ b/tests/functional/alpha-omega/01-restart/reference.log.6 @@ -0,0 +1,3 @@ +Initial point: 1 +Final point: 2 +shutdown/y -triggered off ['shutdown/x'] in flow 1 diff --git a/tests/functional/alpha-omega/02-retrigger.t b/tests/functional/alpha-omega/02-retrigger.t new file mode 100644 index 00000000000..1fe0b091a17 --- /dev/null +++ b/tests/functional/alpha-omega/02-retrigger.t @@ -0,0 +1,24 @@ +#!/usr/bin/env bash +# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE. +# Copyright (C) NIWA & British Crown (Met Office) & Contributors. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +#------------------------------------------------------------------------------- + +# Check manual triggering of startup and shutdown graph tasks. + +. "$(dirname "$0")/test_header" +set_test_number 2 +reftest +exit diff --git a/tests/functional/alpha-omega/02-retrigger/flow.cylc b/tests/functional/alpha-omega/02-retrigger/flow.cylc new file mode 100644 index 00000000000..ba27c5f381a --- /dev/null +++ b/tests/functional/alpha-omega/02-retrigger/flow.cylc @@ -0,0 +1,22 @@ + +# 1. startup graph completes +# 2. then main graph starts, and triggers startup/a and shutdown/x with --flow=9 +# 3. then when all tasks finish, shutdown graph runs + +[scheduling] + [[graph]] + startup = "a => b" + shutdown = "x => y" + R1 = "foo => bar => baz" +[runtime] + [[a]] + [[b]] + [[x]] + [[y]] + [[foo]] + [[bar]] + script = """ + cylc trigger --flow=9 $CYLC_WORKFLOW_ID//startup/a + cylc trigger --flow=9 $CYLC_WORKFLOW_ID//shutdown/x + """ + [[baz]] diff --git a/tests/functional/alpha-omega/02-retrigger/reference.log b/tests/functional/alpha-omega/02-retrigger/reference.log new file mode 100644 index 00000000000..222e7c96d56 --- /dev/null +++ b/tests/functional/alpha-omega/02-retrigger/reference.log @@ -0,0 +1,11 @@ +startup/a -triggered off [] in flow 1 +startup/b -triggered off ['startup/a'] in flow 1 +1/foo -triggered off [] in flow 1 +1/bar -triggered off ['1/foo'] in flow 1 +startup/a -triggered off [] in flow 9 +startup/b -triggered off ['startup/a'] in flow 9 +shutdown/x -triggered off [] in flow 9 +shutdown/y -triggered off ['shutdown/x'] in flow 9 +1/baz -triggered off ['1/bar'] in flow 1 +shutdown/x -triggered off [] in flow 1 +shutdown/y -triggered off ['shutdown/x'] in flow 1 diff --git a/tests/functional/alpha-omega/test_header b/tests/functional/alpha-omega/test_header new file mode 120000 index 00000000000..90bd5a36f92 --- /dev/null +++ b/tests/functional/alpha-omega/test_header @@ -0,0 +1 @@ +../lib/bash/test_header \ No newline at end of file diff --git a/tests/integration/utils/flow_tools.py b/tests/integration/utils/flow_tools.py index 73c187b9a74..5ce0b755c2d 100644 --- a/tests/integration/utils/flow_tools.py +++ b/tests/integration/utils/flow_tools.py @@ -112,6 +112,16 @@ def _make_flow( return workflow_id +async def _load_graph(sched): + """Get scheduler to load the main graph.""" + if sched.is_restart: + await sched._load_pool_from_db() + elif sched.options.starttask: + sched._load_pool_from_tasks() + else: + sched._load_pool_from_point() + + @contextmanager def _make_scheduler(): """Return a scheduler object for a flow registration.""" @@ -153,6 +163,7 @@ async def _start_flow( # exception occurs in Scheduler try: await schd.start() + await _load_graph(schd) finally: # After this `yield`, the `with` block of the context manager # is executed: @@ -184,6 +195,7 @@ async def _run_flow( # exception occurs in Scheduler try: await schd.start() + await _load_graph(schd) # Do not await as we need to yield control to the main loop: task = asyncio.create_task(schd.run_scheduler()) finally: diff --git a/tests/unit/test_flow_mgr.py b/tests/unit/test_flow_mgr.py index 00a6a26eb12..bddf9a1613d 100644 --- a/tests/unit/test_flow_mgr.py +++ b/tests/unit/test_flow_mgr.py @@ -52,48 +52,45 @@ def test_all( meta = "the quick brown fox" assert flow_mgr.get_flow(None, meta) == 1 - msg1 = f"flow: 1 ({meta}) {FAKE_NOW_ISO}" + msg1 = f"flow: 1 ({meta})" assert f"New {msg1}" in caplog.messages # automatic: expect 2 meta = "jumped over the lazy dog" assert flow_mgr.get_flow(None, meta) == 2 - msg2 = f"flow: 2 ({meta}) {FAKE_NOW_ISO}" + msg2 = f"flow: 2 ({meta})" assert f"New {msg2}" in caplog.messages # give flow 2: not a new flow meta = "jumped over the moon" assert flow_mgr.get_flow(2, meta) == 2 - msg3 = f"flow: 2 ({meta}) {FAKE_NOW_ISO}" - assert f"New {msg3}" not in caplog.messages - assert ( - f"Ignoring flow metadata \"{meta}\": 2 is not a new flow" - in caplog.messages - ) + msg2a = f'Ignoring flow metadata "{meta}": 2 is not a new flow' + assert f"{msg2a}" in caplog.messages # give flow 4: new flow meta = "jumped over the cheese" assert flow_mgr.get_flow(4, meta) == 4 - msg4 = f"flow: 4 ({meta}) {FAKE_NOW_ISO}" + msg4 = f"flow: 4 ({meta})" assert f"New {msg4}" in caplog.messages # automatic: expect 3 meta = "jumped over the log" assert flow_mgr.get_flow(None, meta) == 3 - msg5 = f"flow: 3 ({meta}) {FAKE_NOW_ISO}" - assert f"New {msg5}" in caplog.messages + msg3 = f"flow: 3 ({meta})" + assert f"New {msg3}" in caplog.messages # automatic: expect 5 (skip over 4) meta = "crawled under the log" assert flow_mgr.get_flow(None, meta) == 5 - msg6 = f"flow: 5 ({meta}) {FAKE_NOW_ISO}" - assert f"New {msg6}" in caplog.messages + msg5 = f"flow: 5 ({meta})" + assert f"New {msg5}" in caplog.messages + flow_mgr._log() assert ( "Flows:\n" - f"{msg1}\n" - f"{msg2}\n" - f"{msg4}\n" - f"{msg5}\n" - f"{msg6}" + f"{msg1} {FAKE_NOW_ISO}\n" + f"{msg2} {FAKE_NOW_ISO}\n" + f"{msg4} {FAKE_NOW_ISO}\n" + f"{msg3} {FAKE_NOW_ISO}\n" + f"{msg5} {FAKE_NOW_ISO}" ) in caplog.messages