diff --git a/changes.d/5090.feat.md b/changes.d/5090.feat.md
new file mode 100644
index 00000000000..a9f4e947f27
--- /dev/null
+++ b/changes.d/5090.feat.md
@@ -0,0 +1,2 @@
+Distinct initial and final graphs, separated from the main cycling graph,
+to make it easier to configure special behaviour at startup and shutdown.
diff --git a/cylc/flow/command_validation.py b/cylc/flow/command_validation.py
index d0507ecd340..07339ed5d8c 100644
--- a/cylc/flow/command_validation.py
+++ b/cylc/flow/command_validation.py
@@ -27,6 +27,7 @@
)
from cylc.flow.cycling.loader import standardise_point_string
+from cylc.flow.cycling.nocycle import NOCYCLE_POINTS
from cylc.flow.exceptions import InputError, PointParsingError
from cylc.flow.flow_mgr import (
FLOW_NEW,
@@ -40,7 +41,6 @@
from cylc.flow.scripts.set import XTRIGGER_PREREQ_PREFIX
from cylc.flow.task_outputs import TASK_OUTPUT_SUCCEEDED
-
if TYPE_CHECKING:
from cylc.flow.id import TaskTokens
@@ -359,7 +359,10 @@ def is_tasks(ids: Iterable[str]) -> 'Set[TaskTokens]':
tokens = tokens.duplicate(task='root')
# if the cycle is not a glob or reference, standardise it
- if (
+ if cast('str', tokens['cycle']) in NOCYCLE_POINTS:
+ # OK: startup or shutdown graph
+ pass
+ elif (
# cycle point is a glob
not contains_fnmatch(cast('str', tokens['cycle']))
# cycle point is a reference to the ICP/FCP
diff --git a/cylc/flow/config.py b/cylc/flow/config.py
index 519dd49afcf..dbd74abc6d0 100644
--- a/cylc/flow/config.py
+++ b/cylc/flow/config.py
@@ -59,6 +59,13 @@
from cylc.flow.c3mro import C3
from cylc.flow.cfgspec.glbl_cfg import glbl_cfg
from cylc.flow.cfgspec.workflow import RawWorkflowConfig
+
+from cylc.flow.cycling.nocycle import (
+ NocycleSequence,
+ NOCYCLE_SEQ_STARTUP,
+ NOCYCLE_SEQ_SHUTDOWN
+)
+from cylc.flow.id import Tokens
from cylc.flow.cycling.integer import IntegerInterval
from cylc.flow.cycling.iso8601 import (
ISO8601Interval,
@@ -87,7 +94,6 @@
import cylc.flow.flags
from cylc.flow.graph_parser import GraphParser
from cylc.flow.graphnode import GraphNodeParser
-from cylc.flow.id import Tokens
from cylc.flow.listify import listify
from cylc.flow.log_level import verbosity_to_env
from cylc.flow.param_expand import NameExpander
@@ -311,6 +317,7 @@ def __init__(
self.start_point: 'PointBase'
self.stop_point: Optional['PointBase'] = None
self.final_point: Optional['PointBase'] = None
+ self.nocycle_sequences: Set['NocycleSequence'] = set()
self.sequences: List['SequenceBase'] = []
self.actual_first_point: Optional['PointBase'] = None
self._start_point_for_actual_first_point: Optional['PointBase'] = None
@@ -657,8 +664,13 @@ def _warn_if_queues_have_implicit_tasks(
)
def prelim_process_graph(self) -> None:
- """Ensure graph is not empty; set integer cycling mode and icp/fcp = 1
- for simplest "R1 = foo" type graphs.
+ """Error if graph empty; set integer cycling and icp/fcp = 1,
+ if those settings are omitted and the graph is acyclic graphs.
+
+ Somewhat relevant notes:
+ - The default (if not set) cycling mode, gregorian, requires an ICP.
+ - cycling mode is not stored in the DB, so recompute for restarts.
+
"""
graphdict = self.cfg['scheduling']['graph']
if not any(graphdict.values()):
@@ -667,9 +679,21 @@ def prelim_process_graph(self) -> None:
if (
'cycling mode' not in self.cfg['scheduling'] and
self.cfg['scheduling'].get('initial cycle point', '1') == '1' and
- all(item in ['graph', '1', 'R1'] for item in graphdict)
+ all(
+ seq in [
+ 'R1',
+ str(NOCYCLE_SEQ_STARTUP),
+ str(NOCYCLE_SEQ_SHUTDOWN),
+ 'graph', # Cylc 7 back-compat
+ '1' # Cylc 7 back-compat?
+ ]
+ for seq in graphdict
+ )
):
- # Pure acyclic graph, assume integer cycling mode with '1' cycle
+ # Non-cycling graph, assume integer cycling mode with '1' cycle.
+ # Typos in "startup", "shutdown", or "R1" will appear as cycling
+ # here, but will be fatal later during proper recurrance checking.
+
self.cfg['scheduling']['cycling mode'] = INTEGER_CYCLING_TYPE
for key in ('initial cycle point', 'final cycle point'):
if key not in self.cfg['scheduling']:
@@ -2350,15 +2374,25 @@ def load_graph(self):
try:
seq = get_sequence(section, icp, fcp)
except (AttributeError, TypeError, ValueError, CylcError) as exc:
- if cylc.flow.flags.verbosity > 1:
- traceback.print_exc()
- msg = 'Cannot process recurrence %s' % section
- msg += ' (initial cycle point=%s)' % icp
- msg += ' (final cycle point=%s)' % fcp
- if isinstance(exc, CylcError):
- msg += ' %s' % exc.args[0]
- raise WorkflowConfigError(msg) from None
- self.sequences.append(seq)
+ try:
+ # is it a startup or shutdown graph?
+ seq = NocycleSequence(section)
+ except ValueError:
+ if cylc.flow.flags.verbosity > 1:
+ traceback.print_exc()
+ msg = (
+ f"Cannot process recurrence {section}"
+ f" (initial cycle point={icp})"
+ f" (final cycle point={fcp})"
+ )
+ if isinstance(exc, CylcError):
+ msg += ' %s' % exc.args[0]
+ raise WorkflowConfigError(msg) from None
+ else:
+ self.nocycle_sequences.add(seq)
+ else:
+ self.sequences.append(seq)
+
parser = GraphParser(
family_map,
self.parameters,
diff --git a/cylc/flow/cycling/__init__.py b/cylc/flow/cycling/__init__.py
index 4d2c8da11a7..92f75c7aea1 100644
--- a/cylc/flow/cycling/__init__.py
+++ b/cylc/flow/cycling/__init__.py
@@ -349,7 +349,11 @@ def TYPE_SORT_KEY(self) -> int:
@classmethod
@abstractmethod # Note: stacked decorator not strictly enforced in Py2.x
def get_async_expr(cls, start_point=0):
- """Express a one-off sequence at the initial cycle point."""
+ """Express a one-off sequence at the initial cycle point.
+
+ Note "async" has nothing to do with asyncio. It was a (bad)
+ name for one-off (non-cycling) graphs in early Cylc versions.
+ """
pass
@abstractmethod
diff --git a/cylc/flow/cycling/integer.py b/cylc/flow/cycling/integer.py
index ce81ba55a94..4d1347723b9 100644
--- a/cylc/flow/cycling/integer.py
+++ b/cylc/flow/cycling/integer.py
@@ -21,7 +21,12 @@
import re
from cylc.flow.cycling import (
- PointBase, IntervalBase, SequenceBase, ExclusionBase, parse_exclusion, cmp
+ PointBase,
+ IntervalBase,
+ SequenceBase,
+ ExclusionBase,
+ parse_exclusion,
+ cmp
)
from cylc.flow.exceptions import (
CylcMissingContextPointError,
diff --git a/cylc/flow/cycling/loader.py b/cylc/flow/cycling/loader.py
index 11b86f0819c..8a9f39d91bc 100644
--- a/cylc/flow/cycling/loader.py
+++ b/cylc/flow/cycling/loader.py
@@ -21,12 +21,14 @@
from typing import Optional, Type, overload
-from cylc.flow.cycling import PointBase, integer, iso8601
+from cylc.flow.cycling import PointBase, integer, iso8601, nocycle
from metomi.isodatetime.data import Calendar
ISO8601_CYCLING_TYPE = iso8601.CYCLER_TYPE_ISO8601
INTEGER_CYCLING_TYPE = integer.CYCLER_TYPE_INTEGER
+NOCYCLE_CYCLING_TYPE = nocycle.CYCLER_TYPE_NOCYCLE
+
IS_OFFSET_ABSOLUTE_IMPLS = {
INTEGER_CYCLING_TYPE: integer.is_offset_absolute,
diff --git a/cylc/flow/cycling/nocycle.py b/cylc/flow/cycling/nocycle.py
new file mode 100644
index 00000000000..4eb8a38e723
--- /dev/null
+++ b/cylc/flow/cycling/nocycle.py
@@ -0,0 +1,253 @@
+# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
+# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+"""
+Cycling logic for isolated non-cycling startup and shutdown graphs.
+"""
+
+from cylc.flow.cycling import PointBase, SequenceBase
+
+# cycle point values
+NOCYCLE_PT_STARTUP = "startup"
+NOCYCLE_PT_SHUTDOWN = "shutdown"
+
+NOCYCLE_POINTS = (
+ NOCYCLE_PT_STARTUP,
+ NOCYCLE_PT_SHUTDOWN
+)
+
+CYCLER_TYPE_NOCYCLE = "nocycle"
+CYCLER_TYPE_SORT_KEY_NOCYCLE = 1
+
+# Unused abstract methods below left to raise NotImplementedError.
+
+
+class NocyclePoint(PointBase):
+ """A non-advancing string-valued cycle point."""
+
+ TYPE = CYCLER_TYPE_NOCYCLE
+ TYPE_SORT_KEY = CYCLER_TYPE_SORT_KEY_NOCYCLE
+
+ __slots__ = ('value')
+
+ def __init__(self, value: str) -> None:
+ """Initialise a nocycle point.
+
+ >>> NocyclePoint(NOCYCLE_PT_STARTUP)
+ startup
+ >>> NocyclePoint("beta")
+ Traceback (most recent call last):
+ ValueError: Illegal Nocycle value 'beta'
+ """
+ if value not in [NOCYCLE_PT_STARTUP, NOCYCLE_PT_SHUTDOWN]:
+ raise ValueError(f"Illegal Nocycle value '{value}'")
+ self.value = value
+
+ def __hash__(self):
+ """Hash it.
+
+ >>> bool(hash(NocyclePoint(NOCYCLE_PT_STARTUP)))
+ True
+ """
+ return hash(self.value)
+
+ def __eq__(self, other):
+ """Equality.
+
+ >>> (NocyclePoint(NOCYCLE_PT_STARTUP) ==
+ ... NocyclePoint(NOCYCLE_PT_STARTUP))
+ True
+ >>> (NocyclePoint(NOCYCLE_PT_STARTUP) ==
+ ... NocyclePoint(NOCYCLE_PT_SHUTDOWN))
+ False
+ """
+ return str(other) == str(self.value)
+
+ def __le__(self, other):
+ """Less than or equal (only if equal).
+
+ >>> (NocyclePoint(NOCYCLE_PT_STARTUP) <=
+ ... NocyclePoint(NOCYCLE_PT_STARTUP))
+ True
+ >>> (NocyclePoint(NOCYCLE_PT_STARTUP) <=
+ ... NocyclePoint(NOCYCLE_PT_SHUTDOWN))
+ False
+ """
+ return str(other) == self.value
+
+ def __lt__(self, other):
+ """Less than (never).
+
+ >>> (NocyclePoint(NOCYCLE_PT_STARTUP) <
+ ... NocyclePoint(NOCYCLE_PT_STARTUP))
+ False
+ >>> (NocyclePoint(NOCYCLE_PT_STARTUP) <
+ ... NocyclePoint(NOCYCLE_PT_SHUTDOWN))
+ False
+ """
+ return False
+
+ def __gt__(self, other):
+ """Greater than (never).
+ >>> (NocyclePoint(NOCYCLE_PT_STARTUP) >
+ ... NocyclePoint(NOCYCLE_PT_STARTUP))
+ False
+ >>> (NocyclePoint(NOCYCLE_PT_STARTUP) >
+ ... NocyclePoint(NOCYCLE_PT_SHUTDOWN))
+ False
+ """
+ return False
+
+ def __str__(self):
+ """
+ >>> str(NocyclePoint(NOCYCLE_PT_STARTUP))
+ 'startup'
+ >>> str(NocyclePoint(NOCYCLE_PT_SHUTDOWN))
+ 'shutdown'
+ """
+ return self.value
+
+ def _cmp(self, other):
+ raise NotImplementedError
+
+ def add(self, other):
+ # Not used.
+ raise NotImplementedError
+
+ def sub(self, other):
+ # Not used.
+ raise NotImplementedError
+
+
+class NocycleSequence(SequenceBase):
+ """A single point sequence."""
+
+ def __init__(self, dep_section, p_context_start=None, p_context_stop=None):
+ """Workflow cycling context is ignored.
+
+ >>> NocycleSequence("startup").point
+ startup
+ """
+ self.point = NocyclePoint(dep_section)
+
+ def __hash__(self):
+ """Hash it.
+
+ >>> bool(hash(NocycleSequence("startup")))
+ True
+ """
+ return hash(str(self.point))
+
+ def is_valid(self, point):
+ """Is point on-sequence and in-bounds?
+
+ >>> NocycleSequence("startup").is_valid("startup")
+ True
+ >>> NocycleSequence("startup").is_valid("shutdown")
+ False
+ """
+ return str(point) == str(self.point)
+
+ def get_first_point(self, point):
+ """First point is the only point.
+
+ >>> NocycleSequence("startup").get_first_point("shutdown")
+ startup
+ """
+ return self.point
+
+ def get_start_point(self, point):
+ """First point is the only point."""
+ # Not used.
+ raise NotImplementedError
+ return self.point
+
+ def get_next_point(self, point):
+ """There is no next point.
+
+ >>> NocycleSequence("startup").get_next_point("startup")
+ """
+ return None
+
+ def get_next_point_on_sequence(self, point):
+ """There is no next point.
+
+ >>> NocycleSequence("startup").get_next_point_on_sequence("startup")
+ """
+ return None
+
+ def __eq__(self, other):
+ """Equality.
+
+ >>> NocycleSequence("startup") == NocycleSequence("startup")
+ True
+ >>> NocycleSequence("startup") == NocycleSequence("shutdown")
+ False
+ """
+ try:
+ return str(other.point) == str(self.point)
+ except AttributeError:
+ # (other has not .point)
+ return False
+
+ def __str__(self):
+ """String.
+
+ >>> str(NocycleSequence("startup"))
+ 'startup'
+ """
+ return str(self.point)
+
+ def TYPE(self):
+ raise NotImplementedError
+
+ def TYPE_SORT_KEY(self):
+ raise NotImplementedError
+
+ def get_async_expr(cls, start_point=0):
+ raise NotImplementedError
+
+ def get_interval(self):
+ """Return the cycling interval of this sequence."""
+ raise NotImplementedError
+
+ def get_offset(self):
+ """Deprecated: return the offset used for this sequence."""
+ raise NotImplementedError
+
+ def set_offset(self, i_offset):
+ """Deprecated: alter state to offset the entire sequence."""
+ raise NotImplementedError
+
+ def is_on_sequence(self, point):
+ """Is point on-sequence, disregarding bounds?"""
+ raise NotImplementedError
+
+ def get_prev_point(self, point):
+ """Return the previous point < point, or None if out of bounds."""
+ raise NotImplementedError
+
+ def get_nearest_prev_point(self, point):
+ """Return the largest point < some arbitrary point."""
+ raise NotImplementedError
+
+ def get_stop_point(self):
+ """Return the last point in this sequence, or None if unbounded."""
+ raise NotImplementedError
+
+
+NOCYCLE_SEQ_STARTUP = NocycleSequence(NOCYCLE_PT_STARTUP)
+NOCYCLE_SEQ_SHUTDOWN = NocycleSequence(NOCYCLE_PT_SHUTDOWN)
diff --git a/cylc/flow/data_store_mgr.py b/cylc/flow/data_store_mgr.py
index e40b1140354..520e949e281 100644
--- a/cylc/flow/data_store_mgr.py
+++ b/cylc/flow/data_store_mgr.py
@@ -79,6 +79,7 @@
__version__ as CYLC_VERSION,
)
from cylc.flow.cycling.loader import get_point
+from cylc.flow.cycling.nocycle import NOCYCLE_POINTS
from cylc.flow.data_messages_pb2 import (
AllDeltas,
EDeltas,
@@ -1080,7 +1081,11 @@ def increment_graph_window(
)
for items in graph_children.values():
for child_name, child_point, _ in items:
- if final_point and child_point > final_point:
+ if (
+ final_point
+ and str(child_point) not in NOCYCLE_POINTS
+ and child_point > final_point
+ ):
continue
child_tokens = self.id_.duplicate(
cycle=str(child_point),
@@ -1105,7 +1110,11 @@ def increment_graph_window(
taskdefs
).values():
for parent_name, parent_point, _ in items:
- if final_point and parent_point > final_point:
+ if (
+ str(parent_point) not in NOCYCLE_POINTS
+ and final_point
+ and (parent_point > final_point)
+ ):
continue
parent_tokens = self.id_.duplicate(
cycle=str(parent_point),
@@ -2353,16 +2362,20 @@ def update_workflow(self, reloaded=False):
delta_set = True
if self.schd.pool.active_tasks:
- pool_points = set(self.schd.pool.active_tasks)
-
- oldest_point = str(min(pool_points))
- if w_data.oldest_active_cycle_point != oldest_point:
- w_delta.oldest_active_cycle_point = oldest_point
- delta_set = True
- newest_point = str(max(pool_points))
- if w_data.newest_active_cycle_point != newest_point:
- w_delta.newest_active_cycle_point = newest_point
- delta_set = True
+ pool_points = {
+ p for p in self.schd.pool.active_tasks
+ if str(p) not in NOCYCLE_POINTS
+ }
+ if pool_points:
+ # TODO is it OK if not set (due to nocycle points)?
+ oldest_point = str(min(pool_points))
+ if w_data.oldest_active_cycle_point != oldest_point:
+ w_delta.oldest_active_cycle_point = oldest_point
+ delta_set = True
+ newest_point = str(max(pool_points))
+ if w_data.newest_active_cycle_point != newest_point:
+ w_delta.newest_active_cycle_point = newest_point
+ delta_set = True
if delta_set:
w_delta.id = self.workflow_id
diff --git a/cylc/flow/flow_mgr.py b/cylc/flow/flow_mgr.py
index 5508005586b..35a1c4fb848 100644
--- a/cylc/flow/flow_mgr.py
+++ b/cylc/flow/flow_mgr.py
@@ -214,7 +214,7 @@ def get_flow(
"start_time": now_sec
}
LOG.info(
- f"New flow: {flow_num} ({meta}) {now_sec}"
+ f"New flow: {flow_num} ({meta})"
)
self.db_mgr.put_insert_workflow_flows(
flow_num,
diff --git a/cylc/flow/scheduler.py b/cylc/flow/scheduler.py
index ea690ff9ea9..772a3495c44 100644
--- a/cylc/flow/scheduler.py
+++ b/cylc/flow/scheduler.py
@@ -17,6 +17,7 @@
import asyncio
from collections import deque
+from functools import partial
from contextlib import suppress
import logging
import os
@@ -46,6 +47,7 @@
TYPE_CHECKING,
Any,
AsyncGenerator,
+ Callable,
Dict,
Iterable,
List,
@@ -72,9 +74,17 @@
from cylc.flow.broadcast_mgr import BroadcastMgr
from cylc.flow.cfgspec.glbl_cfg import glbl_cfg
from cylc.flow.config import WorkflowConfig
+from cylc.flow.cycling.nocycle import (
+ NOCYCLE_POINTS,
+ NOCYCLE_PT_STARTUP,
+ NOCYCLE_PT_SHUTDOWN,
+ NOCYCLE_SEQ_STARTUP,
+ NOCYCLE_SEQ_SHUTDOWN
+)
from cylc.flow.data_store_mgr import DataStoreMgr
from cylc.flow.exceptions import (
CommandFailedError,
+ CyclerTypeError,
CylcError,
InputError,
)
@@ -339,6 +349,8 @@ def __init__(self, id_: str, options: 'Values') -> None:
# load anything (e.g. template variables) from it
os.unlink(self.workflow_db_mgr.pub_path)
+ self.graph_loaders: List[Callable] = []
+
# Map used to track incomplete remote inits for restart
# {install_target: platform}
self.incomplete_ri_map: Dict[str, Dict] = {}
@@ -442,7 +454,7 @@ async def configure(self, params):
"""Configure the scheduler.
* Load the flow configuration.
- * Load/write workflow parameters from the DB.
+ * Load/write workflow parameters from/to the DB.
* Get the data store rolling.
"""
@@ -469,8 +481,10 @@ async def configure(self, params):
self.profiler.log_memory("scheduler.py: before load_flow_file")
try:
- cfg = self.load_flow_file()
- self.apply_new_config(cfg, is_reload=False)
+ self.apply_new_config(
+ self.load_flow_file(),
+ is_reload=False
+ )
except ParsecError as exc:
# Mark this exc as expected (see docstring for .schd_expected):
exc.schd_expected = True
@@ -516,17 +530,6 @@ async def configure(self, params):
self.data_store_mgr.initiate_data_model()
- self.profiler.log_memory("scheduler.py: before load_tasks")
- if self.is_restart:
- self._load_pool_from_db()
- if self.restored_stop_task_id is not None:
- self.pool.set_stop_task(self.restored_stop_task_id)
- elif self.options.starttask:
- self._load_pool_from_tasks()
- else:
- self._load_pool_from_point()
- self.profiler.log_memory("scheduler.py: after load_tasks")
-
self.workflow_db_mgr.put_workflow_params(self)
self.workflow_db_mgr.put_workflow_template_vars(self.template_vars)
self.workflow_db_mgr.put_runtime_inheritance(self.config)
@@ -650,6 +653,11 @@ def log_start(self) -> None:
f'Run mode: {self.get_run_mode().value}',
extra=RotatingLogFileHandler.header_extra
)
+ LOG.info(
+ "Cycling mode:"
+ f" {self.config.cfg['scheduling']['cycling mode']}",
+ extra=RotatingLogFileHandler.header_extra
+ )
LOG.info(
f'Initial point: {self.config.initial_point}',
extra=RotatingLogFileHandler.header_extra
@@ -669,6 +677,87 @@ def log_start(self) -> None:
extra=RotatingLogFileHandler.header_extra
)
+ async def _get_graph_loaders(self) -> None:
+ """Tee up the loaders for configured graphs (startup, main, shutdown).
+
+ A graph loader loads the task pool with the graph's parentless tasks
+ to get the graph started.
+
+ In a restart the task pool is loaded from the DB before calling this
+ method so we can examine the pool to see what's still to run.
+
+ NOTE: always spawn the first task of the next graph if shutting down
+ at the end of a graph, to ensure that restart works correctly.
+
+ """
+ points = [p.value for p in self.pool.get_points()]
+ if self.is_restart and not points:
+ return
+
+ if (
+ NOCYCLE_SEQ_SHUTDOWN in self.config.nocycle_sequences
+ and (
+ not points or NOCYCLE_PT_SHUTDOWN not in points
+ )
+ ):
+ # shutdown section exists and hasn't started yet.
+ self.graph_loaders.append(
+ partial(self.pool.load_nocycle_graph, NOCYCLE_SEQ_SHUTDOWN)
+ )
+
+ if (
+ self.config.sequences and (
+ not points
+ or (
+ not any(p not in NOCYCLE_POINTS for p in points)
+ and NOCYCLE_PT_SHUTDOWN not in points
+ )
+ )
+ ):
+ # Main graph exists, and hasn't started yet (and hasn't
+ # already run, or the pool would contain shutdown tasks.
+ if self.options.starttask:
+ # Cold start from specified tasks.
+ self.graph_loaders.append(self._load_pool_from_tasks)
+ else:
+ # Cold start from cycle point.
+ self.graph_loaders.append(self._load_pool_from_point)
+
+ if (
+ NOCYCLE_SEQ_STARTUP in self.config.nocycle_sequences
+ and (
+ not self.is_restart
+ )
+ ):
+ # startup section exists and hasn't started yet.
+ # (Not in a restart - the pool would already be loaded from DB).
+ self.graph_loaders.append(
+ partial(self.pool.load_nocycle_graph, NOCYCLE_SEQ_STARTUP)
+ )
+
+ async def run_graphs(self):
+ self.graph_loaders = []
+
+ if self.pool.active_tasks:
+ # pool already loaded for integration test!
+ while await self._main_loop():
+ pass
+ return
+
+ if self.is_restart:
+ await self._load_pool_from_db()
+ self.pool.compute_runahead()
+ self.pool.release_runahead_tasks()
+
+ await self._get_graph_loaders()
+
+ while await self._main_loop():
+ pass
+ while self.graph_loaders:
+ (self.graph_loaders.pop())()
+ while await self._main_loop():
+ pass
+
async def run_scheduler(self) -> None:
"""Start the scheduler main loop."""
try:
@@ -702,13 +791,12 @@ async def run_scheduler(self) -> None:
self
)
)
- self.server.publish_queue.put(
- self.data_store_mgr.publish_deltas)
+ self.server.publish_queue.put(self.data_store_mgr.publish_deltas)
+
# Non-async sleep - yield to other threads rather than event loop
sleep(0)
self.profiler.start()
- while True: # MAIN LOOP
- await self._main_loop()
+ await self.run_graphs()
except SchedulerStop as exc:
# deliberate stop
@@ -750,7 +838,6 @@ async def run_scheduler(self) -> None:
await self.handle_exception(exc)
else:
- # main loop ends (not used?)
await self.shutdown(SchedulerStop(StopMode.AUTO.value))
finally:
@@ -861,15 +948,21 @@ def _load_pool_from_point(self):
released from runhead.)
"""
- start_type = (
- "Warm" if self.config.start_point > self.config.initial_point
- else "Cold"
- )
- LOG.info(f"{start_type} start from {self.config.start_point}")
+ LOG.info("Loading main graph")
+ msg = f"start from {self.config.start_point}"
+ if self.config.start_point == self.config.initial_point:
+ msg = "Cold " + msg
+ LOG.info(msg)
self.pool.load_from_point()
- def _load_pool_from_db(self):
+ async def _load_pool_from_db(self):
"""Load task pool from DB, for a restart."""
+ LOG.info("Loading from DB for restart")
+
+ self.task_job_mgr.task_remote_mgr.is_restart = True
+ self.task_job_mgr.task_remote_mgr.rsync_includes = (
+ self.config.get_validated_rsync_includes())
+
self.workflow_db_mgr.pri_dao.select_broadcast_states(
self.broadcast_mgr.load_db_broadcast_states)
self.broadcast_mgr.post_load_db_coerce()
@@ -889,6 +982,50 @@ def _load_pool_from_db(self):
self.pool.load_db_tasks_to_hold()
self.pool.update_flow_mgr()
+ if self.restored_stop_task_id is not None:
+ self.pool.set_stop_task(self.restored_stop_task_id)
+
+ self.log_start()
+
+ all_tasks = self.pool.get_tasks()
+ if not all_tasks:
+ # Restart with empty pool: only unfinished event handlers.
+ # This workflow completed before restart; wait for intervention.
+ with suppress(KeyError):
+ self.timers[self.EVENT_RESTART_TIMEOUT].reset()
+ self.is_restart_timeout_wait = True
+ LOG.warning(
+ "This workflow already ran to completion."
+ "\nTo make it continue, trigger new tasks"
+ " before the restart timeout."
+ )
+ return
+
+ self.restart_remote_init()
+ # Poll all pollable tasks
+ await commands.run_cmd(commands.poll_tasks(self, ['*/*']))
+ # Cycle point globs don't match startup and shutdown:
+ await commands.run_cmd(
+ commands.poll_tasks(self, [f"{NOCYCLE_PT_STARTUP}/*"]))
+ await commands.run_cmd(
+ commands.poll_tasks(self, [f"{NOCYCLE_PT_SHUTDOWN}/*"]))
+
+ # If we shut down with manually triggered waiting tasks,
+ # submit them to run now.
+ # NOTE: this will run tasks that were triggered with
+ # the trigger "--on-resume" option, even if the workflow
+ # is restarted as paused. Option to be removed at 8.5.0.
+ pre_prep_tasks = []
+ for itask in all_tasks:
+ if (
+ itask.is_manual_submit
+ and itask.state(TASK_STATUS_WAITING)
+ ):
+ itask.waiting_on_job_prep = True
+ pre_prep_tasks.append(itask)
+
+ self.start_job_submission(pre_prep_tasks)
+
def restart_remote_init(self):
"""Remote init for all submitted/running tasks in the pool."""
self.task_job_mgr.task_remote_mgr.is_restart = True
@@ -934,11 +1071,11 @@ def manage_remote_init(self):
install_target]
if status == REMOTE_INIT_DONE:
self.task_job_mgr.task_remote_mgr.file_install(platform)
- if status in [REMOTE_FILE_INSTALL_DONE,
- REMOTE_INIT_255,
- REMOTE_FILE_INSTALL_255,
- REMOTE_INIT_FAILED,
- REMOTE_FILE_INSTALL_FAILED]:
+ elif status in [REMOTE_FILE_INSTALL_DONE,
+ REMOTE_INIT_255,
+ REMOTE_FILE_INSTALL_255,
+ REMOTE_INIT_FAILED,
+ REMOTE_FILE_INSTALL_FAILED]:
# Remove install target
self.incomplete_ri_map.pop(install_target)
@@ -1471,6 +1608,7 @@ def timeout_check(self):
async def workflow_shutdown(self):
"""Determines if the workflow can be shutdown yet."""
+ # from pudb import set_trace; set_trace()
if self.pool.check_abort_on_task_fails():
self._set_stop(StopMode.AUTO_ON_TASK_FAILURE)
@@ -1618,8 +1756,11 @@ def update_profiler_logs(self, tinit):
self.count, get_current_time_string()))
self.count += 1
- async def _main_loop(self) -> None:
- """A single iteration of the main loop."""
+ async def _main_loop(self) -> bool:
+ """A single iteration of the main loop.
+
+ Return False to stop iterating.
+ """
tinit = time()
# Useful for debugging core scheduler issues:
@@ -1682,10 +1823,11 @@ async def _main_loop(self) -> None:
# paused. This allows broadcast-and-trigger beyond the expiry
# limit, by pausing before doing it (after which the expiry
# limit moves back).
- with suppress(TimePointDumperBoundsError):
+ with suppress(TimePointDumperBoundsError, CyclerTypeError):
# NOTE: TimePointDumperBoundsError will be raised for negative
# cycle points, we skip broadcast expiry in this circumstance
# (pre-initial condition)
+ # NOTE: CyclerTypeError raised for startup and shutdown graphs
if min_point := self.pool.get_min_point():
# NOTE: the broadcast expire limit is the oldest active
# cycle MINUS the longest cycling interval
@@ -1733,6 +1875,11 @@ async def _main_loop(self) -> None:
with suppress(KeyError):
self.timers[self.EVENT_STALL_TIMEOUT].stop()
+ if self.graph_finished() and self.graph_loaders:
+ # Return control to load the next graph.
+ LOG.debug("Graph finished")
+ return False
+
self.process_workflow_db_queue()
# If public database is stuck, blast it away by copying the content
@@ -1742,7 +1889,6 @@ async def _main_loop(self) -> None:
# Shutdown workflow if timeouts have occurred
self.timeout_check()
- # Does the workflow need to shutdown on task failure?
await self.workflow_shutdown()
if self.options.profile_mode:
@@ -1778,6 +1924,8 @@ async def _main_loop(self) -> None:
await asyncio.sleep(duration)
# Record latest main loop interval
self.main_loop_intervals.append(time() - tinit)
+
+ return True
# END MAIN LOOP
def _update_workflow_state(self):
@@ -1992,6 +2140,21 @@ def set_stop_clock(self, unix_time):
self.workflow_db_mgr.put_workflow_stop_clock_time(self.stop_clock_time)
self.update_data_store()
+ def graph_finished(self):
+ """Nothing left to run."""
+ return not any(
+ itask for itask in self.pool.get_tasks()
+ if itask.state(
+ TASK_STATUS_PREPARING,
+ TASK_STATUS_SUBMITTED,
+ TASK_STATUS_RUNNING
+ )
+ or (
+ itask.state(TASK_STATUS_WAITING)
+ and not itask.state.is_runahead
+ )
+ )
+
def stop_clock_done(self):
"""Return True if wall clock stop time reached."""
if self.stop_clock_time is None:
@@ -2009,10 +2172,13 @@ def stop_clock_done(self):
def check_auto_shutdown(self):
"""Check if we should shut down now."""
+ # if more tasks to run (if waiting and not
+ # runahead, then held, queued, or xtriggered).
if (
self.is_paused or
self.is_restart_timeout_wait or
self.check_workflow_stalled() or
+ not self.graph_finished() or
# if more tasks to run (if waiting and not
# runahead, then held, queued, or xtriggered).
any(
diff --git a/cylc/flow/scripts/graph.py b/cylc/flow/scripts/graph.py
index 6438e8de476..b148dc4e2ad 100644
--- a/cylc/flow/scripts/graph.py
+++ b/cylc/flow/scripts/graph.py
@@ -45,6 +45,7 @@
from typing import Dict, List, Optional, TYPE_CHECKING, Tuple, Callable
from cylc.flow.config import WorkflowConfig
+from cylc.flow.cycling.nocycle import NOCYCLE_PT_STARTUP
from cylc.flow.exceptions import InputError, CylcError
from cylc.flow.id import Tokens
from cylc.flow.id_cli import parse_id_async
@@ -68,10 +69,20 @@ def sort_integer_node(id_):
Example:
>>> sort_integer_node('11/foo')
('foo', 11)
-
+ >>> sort_integer_node('startup/foo')
+ ('foo', 0)
+ >>> sort_integer_node('shutdown/foo')
+ ('foo', 1)
"""
tokens = Tokens(id_, relative=True)
- return (tokens['task'], int(tokens['cycle']))
+ try:
+ return (tokens['task'], int(tokens['cycle']))
+ except ValueError:
+ # nocycle point
+ if tokens['cycle'] == NOCYCLE_PT_STARTUP:
+ return (tokens['task'], 0)
+ else:
+ return (tokens['task'], 1)
def sort_integer_edge(id_):
diff --git a/cylc/flow/task_pool.py b/cylc/flow/task_pool.py
index 00ca7f2974c..0369b189f28 100644
--- a/cylc/flow/task_pool.py
+++ b/cylc/flow/task_pool.py
@@ -42,7 +42,11 @@
from cylc.flow.exceptions import (
PlatformLookupError,
PointParsingError,
- WorkflowConfigError,
+ WorkflowConfigError
+)
+from cylc.flow.cycling.nocycle import (
+ NocyclePoint,
+ NOCYCLE_POINTS
)
import cylc.flow.flags
from cylc.flow.flow_mgr import (
@@ -242,13 +246,38 @@ def _swap_out(self, itask):
self.active_tasks[itask.point][itask.identity] = itask
self.active_tasks_changed = True
+ def load_nocycle_graph(self, seq):
+ """Load task pool for a no-cycle (startup or shutdown) graph."""
+
+ LOG.info(f"Loading {seq} graph")
+ # Always start flow 1 for automatic load from start of a graph.
+ flow_num = self.flow_mgr.get_flow(
+ 1, meta=f"original {seq} flow",
+ )
+ self.runahead_limit_point = None
+ for name in self.config.get_task_name_list():
+ tdef = self.config.get_taskdef(name)
+ if str(seq) not in [str(s) for s in tdef.sequences]:
+ continue
+ if tdef.is_parentless(seq.point, seq):
+ ntask, is_in_pool, _ = self.get_or_spawn_task(
+ seq.point, tdef, {flow_num}
+ )
+ if ntask is not None:
+ if not is_in_pool:
+ self.add_to_pool(ntask)
+ self.rh_release_and_queue(ntask)
+
def load_from_point(self):
"""Load the task pool for the workflow start point.
Add every parentless task out to the runahead limit.
"""
+ # Always start flow 1 for automatic load from start of a graph.
flow_num = self.flow_mgr.get_flow(
- meta=f"original flow from {self.config.start_point}")
+ 1, meta=f"original flow from {self.config.start_point}",
+ )
+
self.compute_runahead()
for name in self.task_name_list:
tdef = self.config.get_taskdef(name)
@@ -298,7 +327,7 @@ def release_runahead_tasks(self):
Return True if any tasks are released, else False.
Call when RH limit changes.
"""
- if not self.active_tasks or not self.runahead_limit_point:
+ if not self.active_tasks:
# (At start-up task pool might not exist yet)
return False
@@ -310,7 +339,12 @@ def release_runahead_tasks(self):
itask
for point, itask_id_map in self.active_tasks.items()
for itask in itask_id_map.values()
- if point <= self.runahead_limit_point
+ if (
+ str(point) in NOCYCLE_POINTS
+ or
+ (self.runahead_limit_point and
+ point <= self.runahead_limit_point)
+ )
if itask.state.is_runahead
]
@@ -360,7 +394,7 @@ def compute_runahead(self, force=False) -> bool:
base_point: Optional['PointBase'] = None
# First get the runahead base point.
- if not self.active_tasks:
+ if not self.active_tasks or not self.runahead_limit_point:
# Find the earliest sequence point beyond the workflow start point.
base_point = min(
(
@@ -369,13 +403,18 @@ def compute_runahead(self, force=False) -> bool:
seq.get_first_point(self.config.start_point)
for seq in self.config.sequences
}
- if point is not None
+ if (
+ point is not None
+ and type(point) is not NocyclePoint # type: ignore
+ )
),
default=None,
)
else:
# Find the earliest point with incomplete tasks.
- for point, itasks in sorted(self.get_tasks_by_point().items()):
+ for point, itasks in sorted(
+ self.get_tasks_by_point(exclude_nocycle_pts=True).items()
+ ):
# All n=0 tasks are incomplete by definition, but Cylc 7
# ignores failed ones (it does not ignore submit-failed!).
if (
@@ -386,6 +425,7 @@ def compute_runahead(self, force=False) -> bool:
)
):
continue
+
base_point = point
break
@@ -439,6 +479,7 @@ def compute_runahead(self, force=False) -> bool:
count += 1
sequence_points.add(seq_point)
seq_point = sequence.get_next_point(seq_point)
+
self._prev_runahead_sequence_points = sequence_points
self._prev_runahead_base_point = base_point
@@ -542,11 +583,17 @@ def load_db_task_pool_for_restart(self, row_idx, row):
(cycle, name, flow_nums, flow_wait, is_manual_submit, is_late, status,
is_held, submit_num, _, platform_name, time_submit, time_run, timeout,
outputs_str) = row
+
+ if cycle in NOCYCLE_POINTS:
+ point = NocyclePoint(cycle)
+ else:
+ point = get_point(cycle)
+
try:
itask = TaskProxy(
self.tokens,
self.config.get_taskdef(name),
- get_point(cycle),
+ point,
deserialise_set(flow_nums),
status=status,
is_held=is_held,
@@ -957,11 +1004,14 @@ def get_task_ids(self) -> Set[str]:
"""Return a list of task IDs in the task pool."""
return {itask.identity for itask in self.get_tasks()}
- def get_tasks_by_point(self) -> 'Dict[PointBase, List[TaskProxy]]':
+ def get_tasks_by_point(
+ self, exclude_nocycle_pts=False
+ ) -> 'Dict[PointBase, List[TaskProxy]]':
"""Return a map of task proxies by cycle point."""
return {
point: list(itask_id_map.values())
for point, itask_id_map in self.active_tasks.items()
+ if not exclude_nocycle_pts or str(point) not in NOCYCLE_POINTS
}
def get_task(self, point: 'PointBase', name: str) -> Optional[TaskProxy]:
@@ -1074,9 +1124,15 @@ def release_queued_tasks(self):
# Note: released and pre_prep_tasks can overlap
return list(set(released + pre_prep_tasks))
+ def get_points(self):
+ """Return current list of cycle points in the pool."""
+ return list(self.active_tasks)
+
def get_min_point(self):
- """Return the minimum cycle point currently in the pool."""
- cycles = list(self.active_tasks)
+ """Return the minimum cycle point currently in the pool, or None."""
+ cycles = [
+ c for c in list(self.active_tasks) if str(c) not in NOCYCLE_POINTS
+ ]
minc = None
if cycles:
minc = min(cycles)
@@ -1549,7 +1605,9 @@ def spawn_on_output(self, itask: TaskProxy, output: str) -> None:
if (
self.runahead_limit_point is not None
+ and str(t.point) not in NOCYCLE_POINTS
and t.point <= self.runahead_limit_point
+ or str(t.point) in NOCYCLE_POINTS
):
self.rh_release_and_queue(t)
@@ -1693,6 +1751,10 @@ def can_be_spawned(self, name: str, point: 'PointBase') -> bool:
LOG.debug('No task definition %s', name)
return False
+ if str(point) in NOCYCLE_POINTS:
+ # TODO - check that task in these graphs
+ return True
+
# Don't spawn outside of graph limits.
# TODO: is it possible for initial_point to not be defined??
# (see also the similar check + log message in scheduler.py)
@@ -1884,7 +1946,11 @@ def spawn_task(
# "foo; foo[+P1] & bar => baz"
# Here, in the final cycle bar wants to spawn baz, but that would
# stall because baz also depends on foo after the final point.
- if self.stop_point and itask.point <= self.stop_point:
+ if (
+ self.stop_point
+ and str(itask.point) not in NOCYCLE_POINTS
+ and itask.point <= self.stop_point
+ ):
for pct in itask.state.prerequisites_get_target_points():
if pct > self.stop_point:
LOG.warning(
@@ -2289,7 +2355,8 @@ def _set_prereqs_itask(
self.xtrigger_mgr.force_satisfy(itask, xtrigs)
if (
- itask.state.is_runahead
+ str(itask.point) not in NOCYCLE_POINTS
+ and itask.state.is_runahead
and self.runahead_limit_point is not None
and itask.point <= self.runahead_limit_point
):
diff --git a/cylc/flow/taskdef.py b/cylc/flow/taskdef.py
index 96a67d3b380..dfe987ae6b5 100644
--- a/cylc/flow/taskdef.py
+++ b/cylc/flow/taskdef.py
@@ -310,15 +310,19 @@ def check_for_explicit_cycling(self):
raise TaskDefError(
"No cycling sequences defined for %s" % self.name)
- def get_parent_points(self, point):
+ def get_parent_points(self, point, seq=None):
"""Return the cycle points of my parents, at point."""
parent_points = set()
- for seq in self.sequences:
- if not seq.is_valid(point):
+ if seq:
+ sequences = [seq]
+ else:
+ sequences = self.sequences
+ for sequence in sequences:
+ if not sequence.is_valid(point):
continue
- if seq in self.dependencies:
+ if sequence in self.dependencies:
# task has prereqs in this sequence
- for dep in self.dependencies[seq]:
+ for dep in self.dependencies[sequence]:
if dep.suicide:
continue
for trig in dep.task_triggers:
@@ -395,9 +399,12 @@ def is_valid_point(self, point: 'PointBase') -> bool:
def first_point(self, icp):
"""Return the first point for this task."""
+ from cylc.flow.cycling.nocycle import NocycleSequence
point = None
adjusted = []
for seq in self.sequences:
+ if type(seq) is NocycleSequence:
+ continue
pt = seq.get_first_point(icp)
if pt:
# may be None if beyond the sequence bounds
@@ -419,7 +426,7 @@ def next_point(self, point):
p_next = min(adjusted)
return p_next
- def is_parentless(self, point):
+ def is_parentless(self, point, seq=None):
"""Return True if task has no parents at point.
Tasks are considered parentless if they have:
@@ -438,7 +445,8 @@ def is_parentless(self, point):
if self.sequential:
# Implicit parents
return False
- parent_points = self.get_parent_points(point)
+
+ parent_points = self.get_parent_points(point, seq)
return (
not parent_points
or all(x < self.start_point for x in parent_points)
diff --git a/tests/functional/alpha-omega/00-basic.t b/tests/functional/alpha-omega/00-basic.t
new file mode 100644
index 00000000000..bd7d6f25d01
--- /dev/null
+++ b/tests/functional/alpha-omega/00-basic.t
@@ -0,0 +1,30 @@
+#!/usr/bin/env bash
+# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
+# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+#-------------------------------------------------------------------------------
+
+# Check basic separate triggering of startup, shutdown, and main graphs.
+. "$(dirname "$0")/test_header"
+set_test_number 3
+
+install_and_validate
+
+reftest_run
+
+graph_workflow "${WORKFLOW_NAME}" "${WORKFLOW_NAME}.graph"
+cmp_ok "${WORKFLOW_NAME}.graph" "$TEST_SOURCE_DIR/${TEST_NAME_BASE}/reference.graph"
+
+purge
diff --git a/tests/functional/alpha-omega/00-basic/flow.cylc b/tests/functional/alpha-omega/00-basic/flow.cylc
new file mode 100644
index 00000000000..28564c78480
--- /dev/null
+++ b/tests/functional/alpha-omega/00-basic/flow.cylc
@@ -0,0 +1,12 @@
+
+[scheduling]
+ cycling mode = integer
+ final cycle point = 2
+ [[graph]]
+ startup = "a => b"
+ shutdown = "x => y"
+ R1 = "foo => bar"
+ P1 = "bar => baz"
+[runtime]
+ [[a, b, x, y]]
+ [[foo, bar, baz]]
diff --git a/tests/functional/alpha-omega/00-basic/reference.graph b/tests/functional/alpha-omega/00-basic/reference.graph
new file mode 100644
index 00000000000..018cde27a08
--- /dev/null
+++ b/tests/functional/alpha-omega/00-basic/reference.graph
@@ -0,0 +1,13 @@
+edge "startup/a" "startup/b"
+edge "1/bar" "1/baz"
+edge "2/bar" "2/baz"
+edge "1/foo" "1/bar"
+graph
+node "startup/a" "a\nstartup"
+node "startup/b" "b\nstartup"
+node "1/bar" "bar\n1"
+node "2/bar" "bar\n2"
+node "1/baz" "baz\n1"
+node "2/baz" "baz\n2"
+node "1/foo" "foo\n1"
+stop
diff --git a/tests/functional/alpha-omega/00-basic/reference.log b/tests/functional/alpha-omega/00-basic/reference.log
new file mode 100644
index 00000000000..77146d2c21b
--- /dev/null
+++ b/tests/functional/alpha-omega/00-basic/reference.log
@@ -0,0 +1,9 @@
+startup/a -triggered off [] in flow 1
+startup/b -triggered off ['startup/a'] in flow 1
+1/foo -triggered off [] in flow 1
+2/bar -triggered off [] in flow 1
+1/bar -triggered off ['1/foo'] in flow 1
+2/baz -triggered off ['2/bar'] in flow 1
+1/baz -triggered off ['1/bar'] in flow 1
+shutdown/x -triggered off [] in flow 1
+shutdown/y -triggered off ['shutdown/x'] in flow 1
diff --git a/tests/functional/alpha-omega/01-restart.t b/tests/functional/alpha-omega/01-restart.t
new file mode 100644
index 00000000000..deef53fae13
--- /dev/null
+++ b/tests/functional/alpha-omega/01-restart.t
@@ -0,0 +1,34 @@
+#!/usr/bin/env bash
+# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
+# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+#-------------------------------------------------------------------------------
+
+# Check restart in and between startup, R1, P1, and shutdown graphs.
+
+. "$(dirname "$0")/test_header"
+set_test_number 8
+
+install_and_validate
+
+SRC_DIR="$TEST_SOURCE_DIR/${TEST_NAME_BASE}"
+RUN_DIR="$WORKFLOW_RUN_DIR"
+
+for RUN in $(seq 0 6); do
+ cp "${SRC_DIR}/reference.log.$RUN" "${RUN_DIR}/reference.log"
+ reftest_run "${TEST_NAME_BASE}-${RUN}"
+done
+
+purge
diff --git a/tests/functional/alpha-omega/01-restart/flow.cylc b/tests/functional/alpha-omega/01-restart/flow.cylc
new file mode 100644
index 00000000000..82175f1cde7
--- /dev/null
+++ b/tests/functional/alpha-omega/01-restart/flow.cylc
@@ -0,0 +1,26 @@
+# Test stop and restart after every task in every type of graph.
+[scheduling]
+ cycling mode = integer
+ final cycle point = 2
+ [[graph]]
+ startup = "a => b"
+ R1 = "foo => bar"
+ P1 = "bar[-P1] => bar => baz"
+ shutdown = "x => y"
+[runtime]
+ [[STOP]]
+ script = cylc stop $CYLC_WORKFLOW_ID
+ [[a, b, x, y, foo]]
+ inherit = STOP
+ [[bar]]
+ script = """
+ if ((CYLC_TASK_CYCLE_POINT == 1)); then
+ cylc stop $CYLC_WORKFLOW_ID
+ fi
+ """
+ [[baz]]
+ script = """
+ if ((CYLC_TASK_CYCLE_POINT == 2)); then
+ cylc stop $CYLC_WORKFLOW_ID
+ fi
+ """
diff --git a/tests/functional/alpha-omega/01-restart/reference.log.0 b/tests/functional/alpha-omega/01-restart/reference.log.0
new file mode 100644
index 00000000000..a27bdd7b37c
--- /dev/null
+++ b/tests/functional/alpha-omega/01-restart/reference.log.0
@@ -0,0 +1 @@
+startup/a -triggered off [] in flow 1
diff --git a/tests/functional/alpha-omega/01-restart/reference.log.1 b/tests/functional/alpha-omega/01-restart/reference.log.1
new file mode 100644
index 00000000000..1afce31918c
--- /dev/null
+++ b/tests/functional/alpha-omega/01-restart/reference.log.1
@@ -0,0 +1,3 @@
+Initial point: 1
+Final point: 2
+startup/b -triggered off ['startup/a'] in flow 1
diff --git a/tests/functional/alpha-omega/01-restart/reference.log.2 b/tests/functional/alpha-omega/01-restart/reference.log.2
new file mode 100644
index 00000000000..edf876337c5
--- /dev/null
+++ b/tests/functional/alpha-omega/01-restart/reference.log.2
@@ -0,0 +1,3 @@
+Initial point: 1
+Final point: 2
+1/foo -triggered off [] in flow 1
diff --git a/tests/functional/alpha-omega/01-restart/reference.log.3 b/tests/functional/alpha-omega/01-restart/reference.log.3
new file mode 100644
index 00000000000..a2e1f9bfbcf
--- /dev/null
+++ b/tests/functional/alpha-omega/01-restart/reference.log.3
@@ -0,0 +1,3 @@
+Initial point: 1
+Final point: 2
+1/bar -triggered off ['0/bar', '1/foo'] in flow 1
diff --git a/tests/functional/alpha-omega/01-restart/reference.log.4 b/tests/functional/alpha-omega/01-restart/reference.log.4
new file mode 100644
index 00000000000..f986452dc79
--- /dev/null
+++ b/tests/functional/alpha-omega/01-restart/reference.log.4
@@ -0,0 +1,5 @@
+Initial point: 1
+Final point: 2
+2/bar -triggered off ['1/bar'] in flow 1
+1/baz -triggered off ['1/bar'] in flow 1
+2/baz -triggered off ['2/bar'] in flow 1
diff --git a/tests/functional/alpha-omega/01-restart/reference.log.5 b/tests/functional/alpha-omega/01-restart/reference.log.5
new file mode 100644
index 00000000000..ced181660e9
--- /dev/null
+++ b/tests/functional/alpha-omega/01-restart/reference.log.5
@@ -0,0 +1,3 @@
+Initial point: 1
+Final point: 2
+shutdown/x -triggered off [] in flow 1
diff --git a/tests/functional/alpha-omega/01-restart/reference.log.6 b/tests/functional/alpha-omega/01-restart/reference.log.6
new file mode 100644
index 00000000000..dee29b15170
--- /dev/null
+++ b/tests/functional/alpha-omega/01-restart/reference.log.6
@@ -0,0 +1,3 @@
+Initial point: 1
+Final point: 2
+shutdown/y -triggered off ['shutdown/x'] in flow 1
diff --git a/tests/functional/alpha-omega/02-retrigger.t b/tests/functional/alpha-omega/02-retrigger.t
new file mode 100644
index 00000000000..1fe0b091a17
--- /dev/null
+++ b/tests/functional/alpha-omega/02-retrigger.t
@@ -0,0 +1,24 @@
+#!/usr/bin/env bash
+# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
+# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+#-------------------------------------------------------------------------------
+
+# Check manual triggering of startup and shutdown graph tasks.
+
+. "$(dirname "$0")/test_header"
+set_test_number 2
+reftest
+exit
diff --git a/tests/functional/alpha-omega/02-retrigger/flow.cylc b/tests/functional/alpha-omega/02-retrigger/flow.cylc
new file mode 100644
index 00000000000..ba27c5f381a
--- /dev/null
+++ b/tests/functional/alpha-omega/02-retrigger/flow.cylc
@@ -0,0 +1,22 @@
+
+# 1. startup graph completes
+# 2. then main graph starts, and triggers startup/a and shutdown/x with --flow=9
+# 3. then when all tasks finish, shutdown graph runs
+
+[scheduling]
+ [[graph]]
+ startup = "a => b"
+ shutdown = "x => y"
+ R1 = "foo => bar => baz"
+[runtime]
+ [[a]]
+ [[b]]
+ [[x]]
+ [[y]]
+ [[foo]]
+ [[bar]]
+ script = """
+ cylc trigger --flow=9 $CYLC_WORKFLOW_ID//startup/a
+ cylc trigger --flow=9 $CYLC_WORKFLOW_ID//shutdown/x
+ """
+ [[baz]]
diff --git a/tests/functional/alpha-omega/02-retrigger/reference.log b/tests/functional/alpha-omega/02-retrigger/reference.log
new file mode 100644
index 00000000000..222e7c96d56
--- /dev/null
+++ b/tests/functional/alpha-omega/02-retrigger/reference.log
@@ -0,0 +1,11 @@
+startup/a -triggered off [] in flow 1
+startup/b -triggered off ['startup/a'] in flow 1
+1/foo -triggered off [] in flow 1
+1/bar -triggered off ['1/foo'] in flow 1
+startup/a -triggered off [] in flow 9
+startup/b -triggered off ['startup/a'] in flow 9
+shutdown/x -triggered off [] in flow 9
+shutdown/y -triggered off ['shutdown/x'] in flow 9
+1/baz -triggered off ['1/bar'] in flow 1
+shutdown/x -triggered off [] in flow 1
+shutdown/y -triggered off ['shutdown/x'] in flow 1
diff --git a/tests/functional/alpha-omega/test_header b/tests/functional/alpha-omega/test_header
new file mode 120000
index 00000000000..90bd5a36f92
--- /dev/null
+++ b/tests/functional/alpha-omega/test_header
@@ -0,0 +1 @@
+../lib/bash/test_header
\ No newline at end of file
diff --git a/tests/integration/utils/flow_tools.py b/tests/integration/utils/flow_tools.py
index 73c187b9a74..5ce0b755c2d 100644
--- a/tests/integration/utils/flow_tools.py
+++ b/tests/integration/utils/flow_tools.py
@@ -112,6 +112,16 @@ def _make_flow(
return workflow_id
+async def _load_graph(sched):
+ """Get scheduler to load the main graph."""
+ if sched.is_restart:
+ await sched._load_pool_from_db()
+ elif sched.options.starttask:
+ sched._load_pool_from_tasks()
+ else:
+ sched._load_pool_from_point()
+
+
@contextmanager
def _make_scheduler():
"""Return a scheduler object for a flow registration."""
@@ -153,6 +163,7 @@ async def _start_flow(
# exception occurs in Scheduler
try:
await schd.start()
+ await _load_graph(schd)
finally:
# After this `yield`, the `with` block of the context manager
# is executed:
@@ -184,6 +195,7 @@ async def _run_flow(
# exception occurs in Scheduler
try:
await schd.start()
+ await _load_graph(schd)
# Do not await as we need to yield control to the main loop:
task = asyncio.create_task(schd.run_scheduler())
finally:
diff --git a/tests/unit/test_flow_mgr.py b/tests/unit/test_flow_mgr.py
index 00a6a26eb12..bddf9a1613d 100644
--- a/tests/unit/test_flow_mgr.py
+++ b/tests/unit/test_flow_mgr.py
@@ -52,48 +52,45 @@ def test_all(
meta = "the quick brown fox"
assert flow_mgr.get_flow(None, meta) == 1
- msg1 = f"flow: 1 ({meta}) {FAKE_NOW_ISO}"
+ msg1 = f"flow: 1 ({meta})"
assert f"New {msg1}" in caplog.messages
# automatic: expect 2
meta = "jumped over the lazy dog"
assert flow_mgr.get_flow(None, meta) == 2
- msg2 = f"flow: 2 ({meta}) {FAKE_NOW_ISO}"
+ msg2 = f"flow: 2 ({meta})"
assert f"New {msg2}" in caplog.messages
# give flow 2: not a new flow
meta = "jumped over the moon"
assert flow_mgr.get_flow(2, meta) == 2
- msg3 = f"flow: 2 ({meta}) {FAKE_NOW_ISO}"
- assert f"New {msg3}" not in caplog.messages
- assert (
- f"Ignoring flow metadata \"{meta}\": 2 is not a new flow"
- in caplog.messages
- )
+ msg2a = f'Ignoring flow metadata "{meta}": 2 is not a new flow'
+ assert f"{msg2a}" in caplog.messages
# give flow 4: new flow
meta = "jumped over the cheese"
assert flow_mgr.get_flow(4, meta) == 4
- msg4 = f"flow: 4 ({meta}) {FAKE_NOW_ISO}"
+ msg4 = f"flow: 4 ({meta})"
assert f"New {msg4}" in caplog.messages
# automatic: expect 3
meta = "jumped over the log"
assert flow_mgr.get_flow(None, meta) == 3
- msg5 = f"flow: 3 ({meta}) {FAKE_NOW_ISO}"
- assert f"New {msg5}" in caplog.messages
+ msg3 = f"flow: 3 ({meta})"
+ assert f"New {msg3}" in caplog.messages
# automatic: expect 5 (skip over 4)
meta = "crawled under the log"
assert flow_mgr.get_flow(None, meta) == 5
- msg6 = f"flow: 5 ({meta}) {FAKE_NOW_ISO}"
- assert f"New {msg6}" in caplog.messages
+ msg5 = f"flow: 5 ({meta})"
+ assert f"New {msg5}" in caplog.messages
+
flow_mgr._log()
assert (
"Flows:\n"
- f"{msg1}\n"
- f"{msg2}\n"
- f"{msg4}\n"
- f"{msg5}\n"
- f"{msg6}"
+ f"{msg1} {FAKE_NOW_ISO}\n"
+ f"{msg2} {FAKE_NOW_ISO}\n"
+ f"{msg4} {FAKE_NOW_ISO}\n"
+ f"{msg3} {FAKE_NOW_ISO}\n"
+ f"{msg5} {FAKE_NOW_ISO}"
) in caplog.messages