Skip to content

Commit a9c6aed

Browse files
authored
Merge pull request #1 from mathworks/merge-submission-modes
Merge submission modes
2 parents 79a9a4a + ca60500 commit a9c6aed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

69 files changed

+510
-4003
lines changed

README.md

Lines changed: 126 additions & 120 deletions
Large diffs are not rendered by default.

shared/cancelJobFcn.m renamed to cancelJobFcn.m

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,7 @@
1313
error('parallelexamples:GenericGridEngine:SubmitFcnError', ...
1414
'The function %s is for use with clusters created using the parcluster command.', currFilename)
1515
end
16-
if ~cluster.HasSharedFilesystem
17-
error('parallelexamples:GenericGridEngine:NotSharedFileSystem', ...
18-
'The function %s is for use with shared filesystems.', currFilename)
19-
end
16+
2017
% Get the information about the actual cluster used
2118
data = cluster.getJobClusterData(job);
2219
if isempty(data)
@@ -36,8 +33,7 @@
3633
commandToRun = sprintf('qdel "%s"', schedulerID);
3734
dctSchedulerMessage(4, '%s: Canceling job on cluster using command:\n\t%s.', currFilename, commandToRun);
3835
try
39-
% Make the shelled out call to run the command.
40-
[cmdFailed, cmdOut] = runSchedulerCommand(commandToRun);
36+
[cmdFailed, cmdOut] = runSchedulerCommand(cluster, commandToRun);
4137
catch err
4238
cmdFailed = true;
4339
cmdOut = err.message;
@@ -55,6 +51,21 @@
5551
end
5652
end
5753

54+
if ~cluster.HasSharedFilesystem
55+
% Only stop mirroring if we are actually mirroring
56+
remoteConnection = getRemoteConnection(cluster);
57+
if remoteConnection.isJobUsingConnection(job.ID)
58+
dctSchedulerMessage(5, '%s: Stopping the mirror for job %d.', currFilename, job.ID);
59+
try
60+
remoteConnection.stopMirrorForJob(job);
61+
catch err
62+
warning('parallelexamples:GenericGridEngine:FailedToStopMirrorForJob', ...
63+
'Failed to stop the file mirroring for job %d.\nReason: %s', ...
64+
job.ID, err.getReport);
65+
end
66+
end
67+
end
68+
5869
% Now warn about those jobs that we failed to cancel.
5970
erroredJobAndCauseStrings = erroredJobAndCauseStrings(~cellfun(@isempty, erroredJobAndCauseStrings));
6071
if ~isempty(erroredJobAndCauseStrings)

shared/cancelTaskFcn.m renamed to cancelTaskFcn.m

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,7 @@
1313
error('parallelexamples:GenericGridEngine:SubmitFcnError', ...
1414
'The function %s is for use with clusters created using the parcluster command.', currFilename)
1515
end
16-
if ~cluster.HasSharedFilesystem
17-
error('parallelexamples:GenericGridEngine:NotSharedFileSystem', ...
18-
'The function %s is for use with shared filesystems.', currFilename)
19-
end
16+
2017
% Get the information about the actual cluster used
2118
data = cluster.getJobClusterData(task.Parent);
2219
if isempty(data)
@@ -45,8 +42,7 @@
4542
commandToRun = sprintf('qdel "%s"', schedulerID);
4643
dctSchedulerMessage(4, '%s: Canceling task on cluster using command:\n\t%s.', currFilename, commandToRun);
4744
try
48-
% Make the shelled out call to run the command.
49-
[cmdFailed, cmdOut] = runSchedulerCommand(commandToRun);
45+
[cmdFailed, cmdOut] = runSchedulerCommand(cluster, commandToRun);
5046
catch err
5147
cmdFailed = true;
5248
cmdOut = err.message;
File renamed without changes.

nonshared/communicatingSubmitFcn.m renamed to communicatingSubmitFcn.m

Lines changed: 85 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,14 @@ function communicatingSubmitFcn(cluster, job, environmentProperties)
1717

1818
decodeFunction = 'parallel.cluster.generic.communicatingDecodeFcn';
1919

20-
if cluster.HasSharedFilesystem
21-
error('parallelexamples:GenericGridEngine:NotNonSharedFileSystem', ...
22-
'The function %s is for use with nonshared filesystems.', currFilename)
23-
end
24-
2520
if ~strcmpi(cluster.OperatingSystem, 'unix')
2621
error('parallelexamples:GenericGridEngine:UnsupportedOS', ...
2722
'The function %s only supports clusters with unix OS.', currFilename)
2823
end
2924

30-
remoteConnection = getRemoteConnection(cluster);
25+
if isprop(cluster.AdditionalProperties, 'ClusterHost')
26+
remoteConnection = getRemoteConnection(cluster);
27+
end
3128

3229
% Determine the debug setting. Setting to true makes the MATLAB workers
3330
% output additional logging. If EnableDebug is set in the cluster object's
@@ -65,11 +62,16 @@ function communicatingSubmitFcn(cluster, job, environmentProperties)
6562
% The job specific environment variables
6663
% Remove leading and trailing whitespace from the MATLAB arguments
6764
matlabArguments = strtrim(environmentProperties.MatlabArguments);
68-
% Where on the remote filesystem to store job output
69-
storageLocation = remoteConnection.JobStorageLocation;
70-
% If the RemoteJobStorageLocation ends with a space, add a slash to ensure it is respected
71-
if endsWith(storageLocation, ' ')
72-
storageLocation = [storageLocation, fileSeparator];
65+
66+
% Where the workers store job output
67+
if cluster.HasSharedFilesystem
68+
storageLocation = environmentProperties.StorageLocation;
69+
else
70+
storageLocation = remoteConnection.JobStorageLocation;
71+
% If the RemoteJobStorageLocation ends with a space, add a slash to ensure it is respected
72+
if endsWith(storageLocation, ' ')
73+
storageLocation = [storageLocation, fileSeparator];
74+
end
7375
end
7476
variables = {'PARALLEL_SERVER_DECODE_FUNCTION', decodeFunction; ...
7577
'PARALLEL_SERVER_STORAGE_CONSTRUCTOR', environmentProperties.StorageConstructor; ...
@@ -93,33 +95,37 @@ function communicatingSubmitFcn(cluster, job, environmentProperties)
9395
nonEmptyValues = cellfun(@(x) ~isempty(strtrim(x)), variables(:,2));
9496
variables = variables(nonEmptyValues, :);
9597

96-
% The local job directory
98+
% The job directory as accessed by this machine
9799
localJobDirectory = cluster.getJobFolder(job);
98-
% How we refer to the job directory on the cluster
99-
remoteJobDirectory = remoteConnection.getRemoteJobLocation(job.ID, cluster.OperatingSystem);
100+
101+
% The job directory as accessed by workers on the cluster
102+
if cluster.HasSharedFilesystem
103+
jobDirectoryOnCluster = cluster.getJobFolderOnCluster(job);
104+
else
105+
jobDirectoryOnCluster = remoteConnection.getRemoteJobLocation(job.ID, cluster.OperatingSystem);
106+
end
100107
% Specify the job wrapper script to use.
101108
% Prior to R2019a, only the SMPD process manager is supported.
102109
if verLessThan('matlab', '9.6') || ...
103110
isprop(cluster.AdditionalProperties, 'UseSmpd') && cluster.AdditionalProperties.UseSmpd
104-
scriptName = 'communicatingJobWrapperSmpd.sh';
111+
jobWrapperName = 'communicatingJobWrapperSmpd.sh';
105112
parallelEnvironment = 'matlabSmpd';
106113
else
107-
scriptName = 'communicatingJobWrapper.sh';
114+
jobWrapperName = 'communicatingJobWrapper.sh';
108115
parallelEnvironment = 'matlab';
109116
end
110117
% The wrapper script is in the same directory as this file
111118
dirpart = fileparts(mfilename('fullpath'));
112-
localScript = fullfile(dirpart, scriptName);
119+
localScript = fullfile(dirpart, jobWrapperName);
113120
% Copy the local wrapper script to the job directory
114121
copyfile(localScript, localJobDirectory);
115122

116-
% The command that will be executed on the remote host to run the job.
117-
remoteScriptName = sprintf('%s%s%s', remoteJobDirectory, fileSeparator, scriptName);
118-
quotedScriptName = sprintf('%s%s%s', quote, remoteScriptName, quote);
123+
% The script to execute on the cluster to run the job
124+
wrapperPath = sprintf('%s%s%s', jobDirectoryOnCluster, fileSeparator, jobWrapperName);
125+
quotedWrapperPath = sprintf('%s%s%s', quote, wrapperPath, quote);
119126

120-
% Choose a file for the output. Please note that currently, JobStorageLocation refers
121-
% to a directory on disk, but this may change in the future.
122-
logFile = sprintf('%s%s%s', remoteJobDirectory, fileSeparator, sprintf('Job%d.log', job.ID));
127+
% Choose a file for the output
128+
logFile = sprintf('%s%s%s', jobDirectoryOnCluster, fileSeparator, sprintf('Job%d.log', job.ID));
123129
quotedLogFile = sprintf('%s%s%s', quote, logFile, quote);
124130
dctSchedulerMessage(5, '%s: Using %s as log file', currFilename, quotedLogFile);
125131

@@ -131,43 +137,64 @@ function communicatingSubmitFcn(cluster, job, environmentProperties)
131137
numSlots = environmentProperties.NumberOfTasks;
132138
additionalSubmitArgs = sprintf('-pe %s %d', parallelEnvironment, numSlots);
133139
dctSchedulerMessage(4, '%s: Requesting %d slots', currFilename, numSlots);
140+
if cluster.NumThreads > 1
141+
additionalSubmitArgs = sprintf('-binding pe linear:%d %s', cluster.NumThreads, additionalSubmitArgs);
142+
end
134143
commonSubmitArgs = getCommonSubmitArgs(cluster);
135144
additionalSubmitArgs = strtrim(sprintf('%s %s', additionalSubmitArgs, commonSubmitArgs));
136145

137146
% Create a script to submit a Grid Engine job - this will be created in the job directory
138147
dctSchedulerMessage(5, '%s: Generating script for job.', currFilename);
139-
localScriptName = tempname(localJobDirectory);
140-
[~, scriptName] = fileparts(localScriptName);
141-
remoteScriptLocation = sprintf('%s%s%s%s%s', quote, remoteJobDirectory, fileSeparator, scriptName, quote);
142-
createSubmitScript(localScriptName, jobName, quotedLogFile, quotedScriptName, ...
148+
localSubmitScriptPath = tempname(localJobDirectory);
149+
createSubmitScript(localSubmitScriptPath, jobName, quotedLogFile, quotedWrapperPath, ...
143150
variables, additionalSubmitArgs);
144-
% Create the command to run on the remote host.
145-
commandToRun = sprintf('sh %s', remoteScriptLocation);
146151

147-
% Start the mirror to copy all the job files over to the cluster
148-
dctSchedulerMessage(4, '%s: Starting mirror for job %d.', currFilename, job.ID);
149-
remoteConnection.startMirrorForJob(job);
152+
% Path to the submit script as seen by the cluster
153+
[~, submitScriptName] = fileparts(localSubmitScriptPath);
154+
submitScriptPathOnCluster = sprintf('%s%s%s', jobDirectoryOnCluster, fileSeparator, submitScriptName);
155+
quotedSubmitScriptPathOnCluster = sprintf('%s%s%s', quote, submitScriptPathOnCluster, quote);
156+
157+
% Create the command to run on the cluster
158+
commandToRun = sprintf('sh %s', quotedSubmitScriptPathOnCluster);
159+
160+
if ~cluster.HasSharedFilesystem
161+
% Start the mirror to copy all the job files over to the cluster
162+
dctSchedulerMessage(4, '%s: Starting mirror for job %d.', currFilename, job.ID);
163+
remoteConnection.startMirrorForJob(job);
164+
end
150165

151-
% Add execute permissions to shell scripts
152-
remoteConnection.runCommand(sprintf( ...
153-
'chmod u+x %s%s*.sh', remoteJobDirectory, fileSeparator));
166+
if isprop(cluster.AdditionalProperties, 'ClusterHost')
167+
% Add execute permissions to shell scripts
168+
runSchedulerCommand(cluster, sprintf( ...
169+
'chmod u+x %s%s*.sh', jobDirectoryOnCluster, fileSeparator));
170+
% Convert line endings to Unix
171+
runSchedulerCommand(cluster, sprintf( ...
172+
'dos2unix %s%s*.sh', jobDirectoryOnCluster, fileSeparator));
173+
end
154174

155175
% Now ask the cluster to run the submission command
156176
dctSchedulerMessage(4, '%s: Submitting job using command:\n\t%s', currFilename, commandToRun);
157-
% Execute the command on the remote host.
158-
[cmdFailed, cmdOut] = remoteConnection.runCommand(commandToRun);
177+
try
178+
[cmdFailed, cmdOut] = runSchedulerCommand(cluster, commandToRun);
179+
catch err
180+
cmdFailed = true;
181+
cmdOut = err.message;
182+
end
159183
if cmdFailed
160-
% Stop the mirroring if we failed to submit the job - this will also
161-
% remove the job files from the remote location
162-
% Only stop mirroring if we are actually mirroring
163-
if remoteConnection.isJobUsingConnection(job.ID)
164-
dctSchedulerMessage(5, '%s: Stopping the mirror for job %d.', currFilename, job.ID);
165-
try
166-
remoteConnection.stopMirrorForJob(job);
167-
catch err
168-
warning('parallelexamples:GenericGridEngine:FailedToStopMirrorForJob', ...
169-
'Failed to stop the file mirroring for job %d.\nReason: %s', ...
170-
job.ID, err.getReport);
184+
if ~cluster.HasSharedFilesystem
185+
% Stop the mirroring if we failed to submit the job - this will also
186+
% remove the job files from the remote location
187+
remoteConnection = getRemoteConnection(cluster);
188+
% Only stop mirroring if we are actually mirroring
189+
if remoteConnection.isJobUsingConnection(job.ID)
190+
dctSchedulerMessage(5, '%s: Stopping the mirror for job %d.', currFilename, job.ID);
191+
try
192+
remoteConnection.stopMirrorForJob(job);
193+
catch err
194+
warning('parallelexamples:GenericGridEngine:FailedToStopMirrorForJob', ...
195+
'Failed to stop the file mirroring for job %d.\nReason: %s', ...
196+
job.ID, err.getReport);
197+
end
171198
end
172199
end
173200
error('parallelexamples:GenericGridEngine:FailedToSubmitJob', ...
@@ -188,11 +215,16 @@ function communicatingSubmitFcn(cluster, job, environmentProperties)
188215
end
189216

190217
% Store the scheduler ID for each task and the job cluster data
191-
% Set the cluster host and remote job storage location on the job cluster data
192-
jobData = struct('type', 'generic', ...
193-
'RemoteHost', remoteConnection.Hostname, ...
194-
'RemoteJobStorageLocation', remoteConnection.JobStorageLocation, ...
195-
'HasDoneLastMirror', false);
218+
jobData = struct('type', 'generic');
219+
if isprop(cluster.AdditionalProperties, 'ClusterHost')
220+
% Store the cluster host
221+
jobData.RemoteHost = remoteConnection.Hostname;
222+
end
223+
if ~cluster.HasSharedFilesystem
224+
% Store the remote job storage location
225+
jobData.RemoteJobStorageLocation = remoteConnection.JobStorageLocation;
226+
jobData.HasDoneLastMirror = false;
227+
end
196228
if verLessThan('matlab', '9.7') % schedulerID stored in job data
197229
jobData.ClusterJobIDs = jobIDs;
198230
else % schedulerID on task since 19b

nonshared/createSubmitScript.m renamed to createSubmitScript.m

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
function createSubmitScript(outputFilename, jobName, quotedLogFile, quotedScriptName, ...
1+
function createSubmitScript(outputFilename, jobName, quotedLogFile, quotedWrapperPath, ...
22
environmentVariables, additionalSubmitArgs, jobArrayString)
33
% Create a script that sets the correct environment variables and then
44
% executes the Grid Engine qsub command.
@@ -31,7 +31,7 @@ function createSubmitScript(outputFilename, jobName, quotedLogFile, quotedScript
3131
% We will forward all environment variables with this job in the call
3232
% to qsub
3333
variablesToForward = environmentVariables(:,1);
34-
commandToRun = getSubmitString(jobName, quotedLogFile, quotedScriptName, ...
34+
commandToRun = getSubmitString(jobName, quotedLogFile, quotedWrapperPath, ...
3535
variablesToForward, additionalSubmitArgs, jobArrayString);
3636
fprintf(fid, '%s\n', commandToRun);
3737

deleteJobFcn.m

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
function deleteJobFcn(cluster, job)
2+
%DELETEJOBFCN Deletes a job on Grid Engine
3+
%
4+
% Set your cluster's PluginScriptsLocation to the parent folder of this
5+
% function to run it when you delete a job.
6+
7+
% Copyright 2017-2022 The MathWorks, Inc.
8+
9+
cancelJobFcn(cluster, job);
10+
11+
if cluster.HasSharedFilesystem
12+
% If we delete the job files before Grid Engine has actually finished
13+
% processing the job, the job may be left in an 'Eqw' state and appear in
14+
% the output of qstat until it is deleted with qdel. To try to prevent this
15+
% from happening, wait a maximum of 20 seconds for the job to not exist
16+
% according to qstat before returning.
17+
% CONFIGURATION OF THIS TIME MAY BE REQUIRED.
18+
maxTimeToWait = seconds(20);
19+
jobCancelTime = datetime('now', 'TimeZone', 'local');
20+
21+
% Store the current filename for the errors, warnings and dctSchedulerMessages
22+
currFilename = mfilename;
23+
24+
% Get the information about the actual cluster used
25+
data = cluster.getJobClusterData(job);
26+
if isempty(data)
27+
% This indicates that the job has not been submitted, so just return
28+
dctSchedulerMessage(1, '%s: Job cluster data was empty for job with ID %d.', currFilename, job.ID);
29+
return
30+
end
31+
schedulerIDs = getSimplifiedSchedulerIDsForJob(job);
32+
schedulerIDsString = strjoin(schedulerIDs, ',');
33+
commandToRun = sprintf('qstat -j %s', schedulerIDsString);
34+
35+
while (datetime('now', 'TimeZone', 'local') - jobCancelTime) < maxTimeToWait
36+
dctSchedulerMessage(4, '%s: Checking job does not exist on scheduler using command:\n\t%s.', currFilename, commandToRun);
37+
try
38+
[cmdFailed, ~] = runSchedulerCommand(cluster, commandToRun);
39+
catch err %#ok<NASGU>
40+
cmdFailed = true;
41+
end
42+
43+
% qstat returns a non-zero error code if the jobs do not exist. This is
44+
% desired behaviour.
45+
if cmdFailed
46+
dctSchedulerMessage(4, '%s: qstat failed. Assuming job does not exist on scheduler.', currFilename);
47+
return
48+
end
49+
pause(1);
50+
end
51+
dctSchedulerMessage(4, '%s: Job still exists on scheduler despite waiting %s.', currFilename, char(maxTimeToWait));
52+
end
53+
54+
end
File renamed without changes.
File renamed without changes.
File renamed without changes.

0 commit comments

Comments
 (0)