Skip to content

Commit ae49af6

Browse files
committed
RemoteStashCompressedData new data class, and it's deployment to execmanager.py to support compressed file formats while stashing.
It's implemetation followed the instruction given in class:`RemoteStashData`. It's deployment in `execmanager` follows the same logic as of `RemoteStashFolderData`, meaning it directly calls on Transport methods. This commit is a part of enhancements for stashing feature: #6764
1 parent d7c382a commit ae49af6

File tree

18 files changed

+552
-50
lines changed

18 files changed

+552
-50
lines changed

.github/system_tests/test_daemon.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -437,14 +437,20 @@ def launch_all():
437437
run_multiply_add_workchain()
438438

439439
# Testing the stashing functionality
440+
# To speedup, here we only check with StashMode.COPY.
441+
# All stash_modes are tested individually in `test_execmanager.py`
440442
print('Testing the stashing functionality')
441443
process, inputs, expected_result = create_calculation_process(code=code_doubler, inputval=1)
442444
with tempfile.TemporaryDirectory() as tmpdir:
443445
# Delete the temporary directory to test that the stashing functionality will create it if necessary
444446
shutil.rmtree(tmpdir, ignore_errors=True)
445447

446448
source_list = ['output.txt', 'triple_value.*']
447-
inputs['metadata']['options']['stash'] = {'target_base': tmpdir, 'source_list': source_list}
449+
inputs['metadata']['options']['stash'] = {
450+
'stash_mode': StashMode.COPY.value,
451+
'target_base': tmpdir,
452+
'source_list': source_list,
453+
}
448454
_, node = run.get_node(process, **inputs)
449455
assert node.is_finished_ok
450456
assert 'remote_stash' in node.outputs

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ requires-python = '>=3.9'
115115
'core.orbital' = 'aiida.orm.nodes.data.orbital:OrbitalData'
116116
'core.remote' = 'aiida.orm.nodes.data.remote.base:RemoteData'
117117
'core.remote.stash' = 'aiida.orm.nodes.data.remote.stash.base:RemoteStashData'
118+
'core.remote.stash.compress' = 'aiida.orm.nodes.data.remote.stash.compress:RemoteStashCompressedData'
118119
'core.remote.stash.folder' = 'aiida.orm.nodes.data.remote.stash.folder:RemoteStashFolderData'
119120
'core.singlefile' = 'aiida.orm.nodes.data.singlefile:SinglefileData'
120121
'core.str' = 'aiida.orm.nodes.data.str:Str'

src/aiida/common/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
# fmt: off
1818

19+
from .asserts import *
1920
from .datastructures import *
2021
from .exceptions import *
2122
from .extendeddicts import *
@@ -80,6 +81,7 @@
8081
'UniquenessError',
8182
'UnsupportedSpeciesError',
8283
'ValidationError',
84+
'assert_never',
8385
'create_callback',
8486
'get_progress_reporter',
8587
'override_log_level',

src/aiida/common/datastructures.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ class StashMode(Enum):
2222
"""Mode to use when stashing files from the working directory of a completed calculation job for safekeeping."""
2323

2424
COPY = 'copy'
25+
COMPRESS_TAR = 'tar'
26+
COMPRESS_TARBZ2 = 'tar.bz2'
27+
COMPRESS_TARGZ = 'tar.gz'
28+
COMPRESS_TARXZ = 'tar.xz'
2529

2630

2731
class CalcJobState(Enum):

src/aiida/engine/daemon/execmanager.py

Lines changed: 88 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from typing import Mapping as MappingType
2525

2626
from aiida.common import AIIDA_LOGGER, exceptions
27+
from aiida.common.asserts import assert_never
2728
from aiida.common.datastructures import CalcInfo, FileCopyOperation
2829
from aiida.common.folders import Folder, SandboxFolder
2930
from aiida.common.links import LinkType
@@ -435,56 +436,108 @@ async def stash_calculation(calculation: CalcJobNode, transport: Transport) -> N
435436
:param transport: an already opened transport.
436437
"""
437438
from aiida.common.datastructures import StashMode
438-
from aiida.orm import RemoteStashFolderData
439+
from aiida.orm import RemoteStashCompressedData, RemoteStashFolderData
439440

440441
logger_extra = get_dblogger_extra(calculation)
441442

442443
stash_options = calculation.get_option('stash')
443-
stash_mode = stash_options.get('mode', StashMode.COPY.value)
444+
stash_mode = stash_options.get('stash_mode')
444445
source_list = stash_options.get('source_list', [])
446+
uuid = calculation.uuid
447+
source_basepath = Path(calculation.get_remote_workdir())
445448

446449
if not source_list:
447450
return
448451

449-
if stash_mode != StashMode.COPY.value:
450-
EXEC_LOGGER.warning(f'stashing mode {stash_mode} is not implemented yet.')
451-
return
452+
EXEC_LOGGER.debug(
453+
f'stashing files with mode {stash_mode} for calculation<{calculation.pk}>: {source_list}', extra=logger_extra
454+
)
452455

453-
cls = RemoteStashFolderData
456+
if stash_mode == StashMode.COPY.value:
457+
target_basepath = Path(stash_options['target_base']) / uuid[:2] / uuid[2:4] / uuid[4:]
454458

455-
EXEC_LOGGER.debug(f'stashing files for calculation<{calculation.pk}>: {source_list}', extra=logger_extra)
459+
for source_filename in source_list:
460+
if transport.has_magic(source_filename):
461+
copy_instructions = []
462+
for globbed_filename in await transport.glob_async(source_basepath / source_filename):
463+
target_filepath = target_basepath / Path(globbed_filename).relative_to(source_basepath)
464+
copy_instructions.append((globbed_filename, target_filepath))
465+
else:
466+
copy_instructions = [(source_basepath / source_filename, target_basepath / source_filename)]
467+
468+
for source_filepath, target_filepath in copy_instructions:
469+
# If the source file is in a (nested) directory, create those directories first in the target directory
470+
target_dirname = target_filepath.parent
471+
await transport.makedirs_async(target_dirname, ignore_existing=True)
472+
473+
try:
474+
await transport.copy_async(source_filepath, target_filepath)
475+
except (OSError, ValueError) as exception:
476+
EXEC_LOGGER.warning(f'failed to stash {source_filepath} to {target_filepath}: {exception}')
477+
# try to clean up in case of a failure
478+
await transport.rmtree_async(Path(stash_options['target_base']) / uuid[:2])
479+
else:
480+
EXEC_LOGGER.debug(f'stashed {source_filepath} to {target_filepath}')
481+
482+
remote_stash = RemoteStashFolderData(
483+
computer=calculation.computer,
484+
target_basepath=str(target_basepath),
485+
stash_mode=StashMode(stash_mode),
486+
source_list=source_list,
487+
).store()
488+
489+
elif stash_mode in [
490+
StashMode.COMPRESS_TAR.value,
491+
StashMode.COMPRESS_TARBZ2.value,
492+
StashMode.COMPRESS_TARGZ.value,
493+
StashMode.COMPRESS_TARXZ.value,
494+
]:
495+
# stash_mode values are identical with compression_format in transport plugin:
496+
# 'tar', 'tar.gz', 'tar.bz2', or 'tar.xz'
497+
compression_format = stash_mode
498+
file_name = uuid
499+
dereference = stash_options.get('dereference', False)
500+
target_basepath = Path(stash_options['target_base'])
501+
authinfo = calculation.get_authinfo()
502+
aiida_remote_base = authinfo.get_workdir().format(username=transport.whoami())
503+
504+
target_destination = str(target_basepath / file_name) + '.' + compression_format
505+
506+
remote_stash = RemoteStashCompressedData(
507+
computer=calculation.computer,
508+
target_basepath=target_destination,
509+
stash_mode=StashMode(stash_mode),
510+
source_list=source_list,
511+
dereference=dereference,
512+
)
456513

457-
uuid = calculation.uuid
458-
source_basepath = Path(calculation.get_remote_workdir())
459-
target_basepath = Path(stash_options['target_base']) / uuid[:2] / uuid[2:4] / uuid[4:]
460-
461-
for source_filename in source_list:
462-
if transport.has_magic(source_filename):
463-
copy_instructions = []
464-
for globbed_filename in await transport.glob_async(source_basepath / source_filename):
465-
target_filepath = target_basepath / Path(globbed_filename).relative_to(source_basepath)
466-
copy_instructions.append((globbed_filename, target_filepath))
514+
source_list_abs = [source_basepath / source for source in source_list]
515+
516+
try:
517+
await transport.compress_async(
518+
format=compression_format,
519+
remotesources=source_list_abs,
520+
remotedestination=target_destination,
521+
root_dir=aiida_remote_base,
522+
overwrite=False,
523+
dereference=dereference,
524+
)
525+
except (OSError, ValueError) as exception:
526+
EXEC_LOGGER.warning(f'failed to stash {source_list} to {target_destination}: {exception}')
527+
return
528+
# note: if you raise here, you triger the exponential backoff
529+
# and if you don't raise, it appears as successful in verdi process list: Finished [0]
530+
# An issue opened to investigate and fix this https://github.com/aiidateam/aiida-core/issues/6789
531+
# raise exceptions.RemoteOperationError(f'failed '
532+
# 'to compress {source_list} to {target_destination}: {exception}')
467533
else:
468-
copy_instructions = [(source_basepath / source_filename, target_basepath / source_filename)]
534+
EXEC_LOGGER.debug(f'stashed {source_list} to {target_destination}')
469535

470-
for source_filepath, target_filepath in copy_instructions:
471-
# If the source file is in a (nested) directory, create those directories first in the target directory
472-
target_dirname = target_filepath.parent
473-
await transport.makedirs_async(target_dirname, ignore_existing=True)
536+
remote_stash.store()
537+
538+
else:
539+
assert_never(stash_mode)
474540

475-
try:
476-
await transport.copy_async(source_filepath, target_filepath)
477-
except (OSError, ValueError) as exception:
478-
EXEC_LOGGER.warning(f'failed to stash {source_filepath} to {target_filepath}: {exception}')
479-
else:
480-
EXEC_LOGGER.debug(f'stashed {source_filepath} to {target_filepath}')
481-
482-
remote_stash = cls(
483-
computer=calculation.computer,
484-
target_basepath=str(target_basepath),
485-
stash_mode=StashMode(stash_mode),
486-
source_list=source_list,
487-
).store()
488541
remote_stash.base.links.add_incoming(calculation, link_type=LinkType.CREATE, link_label='remote_stash')
489542

490543

src/aiida/engine/processes/calcjobs/calcjob.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ def validate_stash_options(stash_options: Any, _: Any) -> Optional[str]:
116116

117117
target_base = stash_options.get('target_base', None)
118118
source_list = stash_options.get('source_list', None)
119-
stash_mode = stash_options.get('mode', StashMode.COPY.value)
119+
stash_mode = stash_options.get('stash_mode', None)
120120

121121
if not isinstance(target_base, str) or not os.path.isabs(target_base):
122122
return f'`metadata.options.stash.target_base` should be an absolute filepath, got: {target_base}'
@@ -130,9 +130,23 @@ def validate_stash_options(stash_options: Any, _: Any) -> Optional[str]:
130130
try:
131131
StashMode(stash_mode)
132132
except ValueError:
133-
port = 'metadata.options.stash.mode'
133+
port = 'metadata.options.stash.stash_mode'
134134
return f'`{port}` should be a member of aiida.common.datastructures.StashMode, got: {stash_mode}'
135135

136+
dereference = stash_options.get('dereference', None)
137+
138+
if stash_mode in [
139+
StashMode.COMPRESS_TAR.value,
140+
StashMode.COMPRESS_TARBZ2.value,
141+
StashMode.COMPRESS_TARGZ.value,
142+
StashMode.COMPRESS_TARXZ.value,
143+
]:
144+
if not isinstance(dereference, bool):
145+
return f'`metadata.options.stash.dereference` should be a boolean, got: {dereference}'
146+
147+
elif dereference is not None:
148+
return '`metadata.options.stash.dereference` is only valid for compression stashing modes'
149+
136150
return None
137151

138152

@@ -415,7 +429,12 @@ def define(cls, spec: CalcJobProcessSpec) -> None: # type: ignore[override]
415429
required=False,
416430
help='Mode with which to perform the stashing, should be value of `aiida.common.datastructures.StashMode`.',
417431
)
418-
432+
spec.input(
433+
'metadata.options.stash.dereference',
434+
valid_type=bool,
435+
required=False,
436+
help='Whether to follow symlinks while stashing or not, specific to StashMode.COMPRESS_* enums',
437+
)
419438
spec.output(
420439
'remote_folder',
421440
valid_type=orm.RemoteData,
@@ -434,7 +453,6 @@ def define(cls, spec: CalcJobProcessSpec) -> None: # type: ignore[override]
434453
help='Files that are retrieved by the daemon will be stored in this node. By default the stdout and stderr '
435454
'of the scheduler will be added, but one can add more by specifying them in `CalcInfo.retrieve_list`.',
436455
)
437-
438456
spec.exit_code(
439457
100,
440458
'ERROR_NO_RETRIEVED_FOLDER',

src/aiida/orm/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
'QbFields',
8989
'QueryBuilder',
9090
'RemoteData',
91+
'RemoteStashCompressedData',
9192
'RemoteStashData',
9293
'RemoteStashFolderData',
9394
'SinglefileData',

src/aiida/orm/nodes/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
'ProcessNode',
5151
'ProjectionData',
5252
'RemoteData',
53+
'RemoteStashCompressedData',
5354
'RemoteStashData',
5455
'RemoteStashFolderData',
5556
'SinglefileData',

src/aiida/orm/nodes/data/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
'PortableCode',
5959
'ProjectionData',
6060
'RemoteData',
61+
'RemoteStashCompressedData',
6162
'RemoteStashData',
6263
'RemoteStashFolderData',
6364
'SinglefileData',

src/aiida/orm/nodes/data/remote/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@
99

1010
__all__ = (
1111
'RemoteData',
12+
'RemoteStashCompressedData',
1213
'RemoteStashData',
13-
'RemoteStashFolderData',
14+
'RemoteStashFolderData'
1415
)
1516

1617
# fmt: on

src/aiida/orm/nodes/data/remote/stash/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@
55
# fmt: off
66

77
from .base import *
8+
from .compress import *
89
from .folder import *
910

1011
__all__ = (
12+
'RemoteStashCompressedData',
1113
'RemoteStashData',
12-
'RemoteStashFolderData',
14+
'RemoteStashFolderData'
1315
)
1416

1517
# fmt: on

src/aiida/orm/nodes/data/remote/stash/base.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,11 @@
1+
###########################################################################
2+
# Copyright (c), The AiiDA team. All rights reserved. #
3+
# This file is part of the AiiDA code. #
4+
# #
5+
# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core #
6+
# For further information on the license, see the LICENSE.txt file #
7+
# For further information please visit http://www.aiida.net #
8+
###########################################################################
19
"""Data plugin that models an archived folder on a remote computer."""
210

311
from aiida.common.datastructures import StashMode

0 commit comments

Comments
 (0)