Skip to content

Commit e66341b

Browse files
committed
Simplify API & fix case
1 parent c415d93 commit e66341b

File tree

12 files changed

+62
-58
lines changed

12 files changed

+62
-58
lines changed

mars/deploy/oscar/tests/fault_injection_config_with_rerun.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,5 @@ scheduling:
77
storage:
88
# shared-memory38 may lose object if the process crash after put success.
99
backends: [plasma]
10+
plasma:
11+
store_memory: 32M

mars/deploy/oscar/tests/test_local.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
from ....storage import StorageLevel
4343
from ....services.storage import StorageAPI
4444
from ....tensor.arithmetic.add import TensorAdd
45-
from ....tests.core import mock, check_dict_structure_same, DICT_NOT_EMPTY
45+
from ....tests.core import mock, DICT_NOT_EMPTY
4646
from ..local import new_cluster, _load_config
4747
from ..session import (
4848
get_default_async_session,
@@ -93,8 +93,8 @@
9393
"serialization": {},
9494
"most_calls": DICT_NOT_EMPTY,
9595
"slow_calls": DICT_NOT_EMPTY,
96-
# "band_subtasks": DICT_NOT_EMPTY,
97-
# "slow_subtasks": DICT_NOT_EMPTY,
96+
"band_subtasks": {},
97+
"slow_subtasks": {},
9898
}
9999
}
100100
EXPECT_PROFILING_STRUCTURE_NO_SLOW = copy.deepcopy(EXPECT_PROFILING_STRUCTURE)
@@ -263,10 +263,6 @@ async def test_execute(create_cluster, config):
263263

264264
info = await session.execute(b, extra_config=extra_config)
265265
await info
266-
if extra_config:
267-
check_dict_structure_same(info.profiling_result(), expect_profiling_structure)
268-
else:
269-
assert not info.profiling_result()
270266
assert info.result() is None
271267
assert info.exception() is None
272268
assert info.progress() == 1

mars/deploy/oscar/tests/test_ray.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,8 @@
6666
},
6767
"most_calls": DICT_NOT_EMPTY,
6868
"slow_calls": DICT_NOT_EMPTY,
69-
"band_subtasks": DICT_NOT_EMPTY,
70-
"slow_subtasks": DICT_NOT_EMPTY,
69+
"band_subtasks": {},
70+
"slow_subtasks": {},
7171
}
7272
}
7373
EXPECT_PROFILING_STRUCTURE_NO_SLOW = copy.deepcopy(EXPECT_PROFILING_STRUCTURE)

mars/deploy/oscar/tests/test_ray_scheduling.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14+
1415
import asyncio
1516
import logging
1617
import os
@@ -28,6 +29,7 @@
2829
process_placement_to_address,
2930
kill_and_wait,
3031
)
32+
from ....oscar.backends.router import Router
3133
from ....services.cluster import ClusterAPI
3234
from ....services.scheduling.supervisor.autoscale import AutoscalerActor
3335
from ....tests.core import require_ray
@@ -62,8 +64,11 @@ async def speculative_cluster():
6264
},
6365
},
6466
)
65-
async with client:
66-
yield client
67+
try:
68+
async with client:
69+
yield client
70+
finally:
71+
Router.set_instance(None)
6772

6873

6974
@pytest.mark.parametrize("ray_large_cluster", [{"num_nodes": 2}], indirect=True)

mars/oscar/backends/context.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -123,9 +123,14 @@ async def destroy_actor(self, actor_ref: ActorRef):
123123
message = DestroyActorMessage(
124124
new_message_id(), actor_ref, protocol=DEFAULT_PROTOCOL
125125
)
126-
future = await self._call(actor_ref.address, message, wait=False)
127-
result = await self._wait(future, actor_ref.address, message)
128-
return self._process_result_message(result)
126+
try:
127+
future = await self._call(actor_ref.address, message, wait=False)
128+
result = await self._wait(future, actor_ref.address, message)
129+
return self._process_result_message(result)
130+
except ConnectionRefusedError:
131+
# when remote server already destroyed,
132+
# we assume all actors destroyed already
133+
pass
129134

130135
async def kill_actor(self, actor_ref: ActorRef, force: bool = True):
131136
# get main_pool_address

mars/services/scheduling/api/oscar.py

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
from .... import oscar as mo
1818
from ....lib.aio import alru_cache
19-
from ...subtask import Subtask, SubtaskResult
19+
from ...subtask import Subtask
2020
from ..core import SubtaskScheduleSummary
2121
from .core import AbstractSchedulingAPI
2222

@@ -99,7 +99,6 @@ async def cancel_subtasks(
9999
self,
100100
subtask_ids: List[str],
101101
kill_timeout: Union[float, int] = None,
102-
wait: bool = False,
103102
):
104103
"""
105104
Cancel pending and running subtasks.
@@ -111,18 +110,11 @@ async def cancel_subtasks(
111110
kill_timeout
112111
timeout seconds to kill actor process forcibly
113112
"""
114-
if wait:
115-
await self._manager_ref.cancel_subtasks(
116-
subtask_ids, kill_timeout=kill_timeout
117-
)
118-
else:
119-
await self._manager_ref.cancel_subtasks.tell(
120-
subtask_ids, kill_timeout=kill_timeout
121-
)
113+
await self._manager_ref.cancel_subtasks(subtask_ids, kill_timeout=kill_timeout)
122114

123115
async def finish_subtasks(
124116
self,
125-
subtask_results: List[SubtaskResult],
117+
subtask_ids: List[str],
126118
bands: List[Tuple] = None,
127119
schedule_next: bool = True,
128120
):
@@ -132,14 +124,14 @@ async def finish_subtasks(
132124
133125
Parameters
134126
----------
135-
subtask_results
127+
subtask_ids
136128
results of subtasks, must in finished states
137129
bands
138130
bands of subtasks to mark as finished
139131
schedule_next
140132
whether to schedule succeeding subtasks
141133
"""
142-
await self._manager_ref.finish_subtasks(subtask_results, bands, schedule_next)
134+
await self._manager_ref.finish_subtasks.tell(subtask_ids, bands, schedule_next)
143135

144136

145137
class MockSchedulingAPI(SchedulingAPI):

mars/services/scheduling/supervisor/manager.py

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -175,14 +175,15 @@ async def _get_execution_ref(self, band: BandType):
175175
async def set_subtask_result(self, result: SubtaskResult, band: BandType):
176176
info = self._subtask_infos[result.subtask_id]
177177
subtask_id = info.subtask.subtask_id
178-
notify_task_service = True
178+
notify_task_service = False
179179

180180
async with redirect_subtask_errors(self, [info.subtask], reraise=False):
181181
try:
182182
info.band_futures[band].set_result(result)
183183
if result.error is not None:
184184
raise result.error.with_traceback(result.traceback)
185185
logger.debug("Finished subtask %s with result %s.", subtask_id, result)
186+
notify_task_service = True
186187
except (OSError, MarsError) as ex:
187188
# TODO: We should handle ServerClosed Error.
188189
if (
@@ -200,7 +201,6 @@ async def set_subtask_result(self, result: SubtaskResult, band: BandType):
200201
[info.subtask.priority or tuple()],
201202
exclude_bands=set(info.band_futures.keys()),
202203
)
203-
notify_task_service = False
204204
else:
205205
raise ex
206206
except asyncio.CancelledError:
@@ -244,16 +244,14 @@ async def set_subtask_result(self, result: SubtaskResult, band: BandType):
244244

245245
async def finish_subtasks(
246246
self,
247-
subtask_results: List[SubtaskResult],
247+
subtask_ids: List[str],
248248
bands: List[BandType] = None,
249249
schedule_next: bool = True,
250250
):
251-
subtask_ids = [result.subtask_id for result in subtask_results]
252251
logger.debug("Finished subtasks %s.", subtask_ids)
253252
band_tasks = defaultdict(lambda: 0)
254253
bands = bands or [None] * len(subtask_ids)
255-
for result, subtask_band in zip(subtask_results, bands):
256-
subtask_id = result.subtask_id
254+
for subtask_id, subtask_band in zip(subtask_ids, bands):
257255
subtask_info = self._subtask_infos.get(subtask_id, None)
258256

259257
if subtask_info is not None:
@@ -265,13 +263,16 @@ async def finish_subtasks(
265263
"stage_id": subtask_info.subtask.stage_id,
266264
},
267265
)
268-
self._subtask_summaries[subtask_id] = subtask_info.to_summary(
269-
is_finished=True,
270-
is_cancelled=result.status == SubtaskStatus.cancelled,
271-
)
266+
if subtask_id not in self._subtask_summaries:
267+
summary_kw = dict(is_finished=True)
268+
if subtask_info.cancel_pending:
269+
summary_kw["is_cancelled"] = True
270+
self._subtask_summaries[subtask_id] = subtask_info.to_summary(
271+
**summary_kw
272+
)
272273
subtask_info.end_time = time.time()
273274
self._speculation_execution_scheduler.finish_subtask(subtask_info)
274-
# Cancel subtask on other bands.
275+
# Cancel subtask on other bands.
275276
aio_task = subtask_info.band_futures.pop(subtask_band, None)
276277
if aio_task:
277278
yield aio_task
@@ -321,7 +322,7 @@ async def batch_submit_subtask_to_band(self, args_list, kwargs_list):
321322
if info.cancel_pending:
322323
res_release_delays.append(
323324
self._global_resource_ref.release_subtask_resource.delay(
324-
band, info.subtask.session_id, info.subtask.subtask_id
325+
band, self._session_id, subtask_id
325326
)
326327
)
327328
continue
@@ -330,6 +331,12 @@ async def batch_submit_subtask_to_band(self, args_list, kwargs_list):
330331
"Subtask %s is not in added subtasks set, it may be finished or canceled, skip it.",
331332
subtask_id,
332333
)
334+
# in case resource already allocated, do deallocate
335+
res_release_delays.append(
336+
self._global_resource_ref.release_subtask_resource.delay(
337+
band, self._session_id, subtask_id
338+
)
339+
)
333340
continue
334341
band_to_subtask_ids[band].append(subtask_id)
335342

@@ -414,9 +421,8 @@ async def cancel_task_in_band(band):
414421

415422
info = self._subtask_infos[subtask_id]
416423
info.cancel_pending = True
417-
raw_tasks_to_cancel = list(info.band_futures.values())
418424

419-
if not raw_tasks_to_cancel:
425+
if not info.band_futures:
420426
# not submitted yet: mark subtasks as cancelled
421427
result = SubtaskResult(
422428
subtask_id=info.subtask.subtask_id,
@@ -435,13 +441,13 @@ async def cancel_task_in_band(band):
435441
)
436442
band_to_futures[band].append(future)
437443

438-
for band in band_to_futures:
439-
cancel_tasks.append(asyncio.create_task(cancel_task_in_band(band)))
440-
444+
# Dequeue first as it is possible to leak subtasks from queues
441445
if queued_subtask_ids:
442-
# Don't use `finish_subtasks` because it may remove queued
443446
await self._queueing_ref.remove_queued_subtasks(queued_subtask_ids)
444447

448+
for band in band_to_futures:
449+
cancel_tasks.append(asyncio.create_task(cancel_task_in_band(band)))
450+
445451
if cancel_tasks:
446452
yield asyncio.gather(*cancel_tasks)
447453

mars/services/scheduling/tests/test_service.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,9 @@ async def set_subtask_result(self, subtask_result: SubtaskResult):
5050
for event in self._events[subtask_result.subtask_id]:
5151
event.set()
5252
self._events.pop(subtask_result.subtask_id, None)
53-
await scheduling_api.finish_subtasks([subtask_result], subtask_result.bands)
53+
await scheduling_api.finish_subtasks(
54+
[subtask_result.subtask_id], subtask_result.bands
55+
)
5456

5557
def _return_result(self, subtask_id: str):
5658
result = self._results[subtask_id]

mars/services/scheduling/utils.py

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,10 @@
1818
from typing import Iterable
1919

2020
from ... import oscar as mo
21-
from ...lib.aio import alru_cache
2221
from ..subtask import Subtask, SubtaskResult, SubtaskStatus
2322
from ..task import TaskAPI
2423

2524

26-
@alru_cache
27-
async def _get_task_api(actor: mo.Actor):
28-
return await TaskAPI.create(getattr(actor, "_session_id"), actor.address)
29-
30-
3125
@contextlib.asynccontextmanager
3226
async def redirect_subtask_errors(
3327
actor: mo.Actor, subtasks: Iterable[Subtask], reraise: bool = True
@@ -41,7 +35,7 @@ async def redirect_subtask_errors(
4135
if isinstance(error, asyncio.CancelledError)
4236
else SubtaskStatus.errored
4337
)
44-
task_api = await _get_task_api(actor)
38+
task_api = await TaskAPI.create(getattr(actor, "_session_id"), actor.address)
4539
coros = []
4640
for subtask in subtasks:
4741
if subtask is None: # pragma: no cover

mars/services/task/execution/mars/stage.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ async def set_subtask_result(self, result: SubtaskResult, band: BandType = None)
149149
if all_done or error_or_cancelled:
150150
# tell scheduling to finish subtasks
151151
await self._scheduling_api.finish_subtasks(
152-
[result], bands=[band], schedule_next=not error_or_cancelled
152+
[result.subtask_id], bands=[band], schedule_next=not error_or_cancelled
153153
)
154154
if self.result.status != TaskStatus.terminated:
155155
self.result = TaskResult(
@@ -184,8 +184,7 @@ async def set_subtask_result(self, result: SubtaskResult, band: BandType = None)
184184
)
185185
# if error or cancel, cancel all submitted subtasks
186186
await self._scheduling_api.cancel_subtasks(
187-
list(self._submitted_subtask_ids),
188-
wait=False,
187+
list(self._submitted_subtask_ids)
189188
)
190189
self._schedule_done()
191190
cost_time_secs = self.result.end_time - self.result.start_time
@@ -219,7 +218,9 @@ async def set_subtask_result(self, result: SubtaskResult, band: BandType = None)
219218
# all predecessors finished
220219
to_schedule_subtasks.append(succ_subtask)
221220
await self._schedule_subtasks(to_schedule_subtasks)
222-
await self._scheduling_api.finish_subtasks([result], bands=[band])
221+
await self._scheduling_api.finish_subtasks(
222+
[result.subtask_id], bands=[band]
223+
)
223224

224225
async def run(self):
225226
if len(self.subtask_graph) == 0:

0 commit comments

Comments
 (0)