Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ max-line-length = 88
ignore = E203, E266, E501, W503
select = B,C,E,F,W,T4,B9
per-file-ignores =
versioneer.py:T001
unidist/_version.py:T001
versioneer.py:T201
unidist/_version.py:T201

[versioneer]
VCS = git
Expand Down
4 changes: 1 addition & 3 deletions unidist/core/backends/multiprocessing/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,4 @@ def shutdown():
----
Not supported yet.
"""
raise NotImplementedError(
"'shutdown' is not supported yet by MultiProcessing backend."
)
mp.shutdown()
4 changes: 2 additions & 2 deletions unidist/core/backends/multiprocessing/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@
"""MultiProcessing backend core functionality."""

from .actor import Actor
from .api import put, wait, get, submit, init
from .api import put, wait, get, submit, init, shutdown

__all__ = ["Actor", "put", "wait", "get", "submit", "init"]
__all__ = ["Actor", "put", "wait", "get", "submit", "init", "shutdown"]
35 changes: 17 additions & 18 deletions unidist/core/backends/multiprocessing/core/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
"""Actor specific functionality implemented using Python multiprocessing."""

import cloudpickle as pkl
from multiprocessing.managers import BaseManager

from unidist.core.backends.multiprocessing.core.object_store import ObjectStore, Delayed
from unidist.core.backends.multiprocessing.core.process_manager import (
ProcessManager,
Task,
Operation,
)


Expand All @@ -31,8 +31,7 @@ class ActorMethod:
Object storage to share data between workers.
"""

def __init__(self, cls_obj, actor, method_name, obj_store):
self._cls_obj = cls_obj
def __init__(self, actor, method_name, obj_store):
self._method_name = method_name
self._actor = actor
self._obj_store = obj_store
Expand Down Expand Up @@ -66,9 +65,14 @@ def submit(self, *args, num_returns=1, **kwargs):
else:
data_ids = self._obj_store.put(Delayed())

cls_method = getattr(self._cls_obj, self._method_name)

task = Task(cls_method, data_ids, self._obj_store, *args, **kwargs)
task = Task(
self._method_name,
Operation.EXECUTE_ACTOR_METHOD,
self._obj_store,
*args,
data_ids=data_ids,
**kwargs,
)
self._actor.submit(task)

return data_ids
Expand Down Expand Up @@ -97,17 +101,10 @@ class Actor:
"""

def __init__(self, cls, *args, **kwargs):
self._worker = None
self._worker_id = None
self._obj_store = ObjectStore.get_instance()

# FIXME : Change "WrappedClass" -> cls.__name__ + "Manager", for example.
BaseManager.register("WrappedClass", cls)
manager = BaseManager()
manager.start()

self._cls_obj = manager.WrappedClass(*args, **kwargs)
self._worker, self._worker_id = ProcessManager.get_instance().grab_worker()
task = Task(cls, Operation.CREATE_ACTOR, self._obj_store, *args, **kwargs)
self._worker_id = ProcessManager.get_instance().create_actor(task)

def __getattr__(self, name):
"""
Expand All @@ -125,7 +122,7 @@ def __getattr__(self, name):
-------
ActorMethod
"""
return ActorMethod(self._cls_obj, self, name, self._obj_store)
return ActorMethod(self, name, self._obj_store)

def submit(self, task):
"""
Expand All @@ -136,7 +133,9 @@ def submit(self, task):
task : unidist.core.backends.multiprocessing.core.process_manager.Task
Task object holding callable function.
"""
self._worker.add_task(pkl.dumps(task))
ProcessManager.get_instance().submit(
pkl.dumps(task), target_worker_id=self._worker_id
)

def __del__(self):
"""
Expand All @@ -145,4 +144,4 @@ def __del__(self):
Free worker, grabbed from the workers pool.
"""
if self._worker_id is not None:
ProcessManager.get_instance().free_worker(self._worker_id)
ProcessManager.get_instance().remove_actor(self._worker_id)
14 changes: 13 additions & 1 deletion unidist/core/backends/multiprocessing/core/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@

"""High-level API of MultiProcessing backend."""

import atexit
import signal
import cloudpickle as pkl

from unidist.config import CpuCount
from unidist.core.backends.multiprocessing.core.object_store import ObjectStore, Delayed
from unidist.core.backends.multiprocessing.core.process_manager import (
ProcessManager,
Task,
Operation,
)


Expand All @@ -31,6 +34,10 @@ def init(num_workers=CpuCount.get()):
ObjectStore.get_instance()
ProcessManager.get_instance(num_workers=num_workers)

atexit.register(shutdown)
signal.signal(signal.SIGTERM, shutdown)
signal.signal(signal.SIGINT, shutdown)


def put(data):
"""
Expand Down Expand Up @@ -122,8 +129,13 @@ def submit(func, *args, num_returns=1, **kwargs):
else:
data_ids = obj_store.put(Delayed())

task = Task(func, data_ids, obj_store, *args, **kwargs)
task = Task(func, Operation.EXECUTE, obj_store, *args, data_ids=data_ids, **kwargs)

ProcessManager.get_instance().submit(pkl.dumps(task))

return data_ids


def shutdown():
"""Shutdown MultiProcessing execution backend."""
ProcessManager.get_instance().shutdown()
149 changes: 94 additions & 55 deletions unidist/core/backends/multiprocessing/core/process_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@
from unidist.core.backends.multiprocessing.core.object_store import ObjectStore


class Operation:
EXECUTE = 0
EXECUTE_ACTOR_METHOD = 1
CREATE_ACTOR = 2
REMOVE_ACTOR = 3
CANCEL = 4


class Worker(Process):
"""
Class-process that executes tasks from `self.task_queue`.
Expand All @@ -28,34 +36,57 @@ class Worker(Process):
"""

def __init__(self, task_queue, obj_store):
Process.__init__(self, daemon=True)
Process.__init__(self)
self.task_queue = task_queue
self._obj_store = obj_store
self._actor_handle = None
self.is_actor = False

def run(self):
"""Run main infinite loop of process to execute tasks from `self.task_queue`."""
while 1:
task = self.task_queue.get()
task = pkl.loads(task)

data_ids = task.data_ids
try:
value = task()
except Exception as e:
if isinstance(data_ids, list) and len(data_ids) > 1:
for i, data_id in enumerate(data_ids):
self._obj_store.store_delayed[data_id] = e
else:
self._obj_store.store_delayed[data_ids] = e
else:
if data_ids is not None:
operation = task.operation_type
func = task.func
if operation in (Operation.EXECUTE, Operation.EXECUTE_ACTOR_METHOD):
data_ids = task.data_ids
args, kwargs = task.get_parameters()
try:
if operation == Operation.EXECUTE:
value = func(*args, **kwargs)
else:
value = getattr(self._actor_handle, func)(*args, **kwargs)
except Exception as e:
if isinstance(data_ids, list) and len(data_ids) > 1:
for data_id, val in zip(data_ids, value):
self._obj_store.store_delayed[data_id] = val
for data_id in data_ids:
self._obj_store.store_delayed[data_id] = e
else:
self._obj_store.store_delayed[data_ids] = value
finally:
self._obj_store.store_delayed[data_ids] = e
else:
if data_ids is not None:
if isinstance(data_ids, list) and len(data_ids) > 1:
for data_id, val in zip(data_ids, value):
self._obj_store.store_delayed[data_id] = val
else:
self._obj_store.store_delayed[data_ids] = value
elif operation == Operation.CREATE_ACTOR:
args, kwargs = task.get_parameters()
self._actor_handle = func(*args, **kwargs)
self.is_actor = True
elif operation == Operation.REMOVE_ACTOR:
self._actor_handle = None
self.is_actor = False
elif operation == Operation.CANCEL:
self.task_queue.task_done()
break
else:
self.task_queue.task_done()
raise ValueError("Unsupported operation")

self.task_queue.task_done()

return

def add_task(self, task):
Expand Down Expand Up @@ -92,14 +123,15 @@ def __init__(self, num_workers=None):
if num_workers is None:
num_workers = CpuCount.get()
self.workers = [None] * num_workers
self.grabbed_workers = [None] * num_workers
self.is_actors = [False] * num_workers
self.__class__._worker_id = 0

obj_store = ObjectStore.get_instance()
for idx in range(num_workers):
self.workers[idx] = Worker(JoinableQueue(), obj_store)
self.workers[idx].start()
self.grabbed_workers[idx] = False

self._is_alive = True

@classmethod
def get_instance(cls, num_workers=None):
Expand Down Expand Up @@ -128,7 +160,7 @@ def _next(self):
self.__class__._worker_id = 0
return idx

def grab_worker(self):
def create_actor(self, task):
"""
Grab a worker from worker pool.

Expand All @@ -142,13 +174,19 @@ def grab_worker(self):
int
Index of grabbed worker.
"""
for idx, is_grabbed in enumerate(self.grabbed_workers):
if not is_grabbed:
self.grabbed_workers[idx] = True
return self.workers[idx], idx
raise RuntimeError("Actor can`t be run, no available workers.")

def free_worker(self, idx):
worker_id = None
for idx, is_actor in enumerate(self.is_actors):
if not is_actor:
worker_id = idx
self.is_actors[idx] = True
break
if worker_id is None:
raise RuntimeError("Actor can`t be run, no available workers.")

self.workers[worker_id].add_task(pkl.dumps(task))
return worker_id

def remove_actor(self, idx):
"""
Free worker by index `idx`.

Expand All @@ -157,9 +195,12 @@ def free_worker(self, idx):
idx : int
Index of worker to be freed.
"""
self.grabbed_workers[idx] = False
task = Task(None, Operation.REMOVE_ACTOR, None)
if self._is_alive:
self.workers[idx].add_task(pkl.dumps(task))
self.is_actors[idx] = False

def submit(self, task):
def submit(self, task, target_worker_id=None):
"""
Add `task` to task queue of one of workers using round-robin.

Expand All @@ -168,17 +209,25 @@ def submit(self, task):
task : unidist.core.backends.multiprocessing.core.process_manager.Task
Task to be added in task queue.
"""
num_skipped = 0
if target_worker_id is None:
num_skipped = 0

while num_skipped < len(self.workers):
idx = self._next()
if not self.is_actors[idx]:
self.workers[idx].add_task(task)
return
else:
num_skipped += 1

while num_skipped < len(self.workers):
idx = self._next()
if not self.grabbed_workers[idx]:
self.workers[idx].add_task(task)
return
else:
num_skipped += 1
raise RuntimeError("Task can`t be run, no available workers.")
else:
self.workers[target_worker_id].add_task(task)

raise RuntimeError("Task can`t be run, no available workers.")
def shutdown(self):
self._is_alive = False
task = Task(None, Operation.CANCEL, None)
[worker.add_task(pkl.dumps(task)) for worker in self.workers]


class Task:
Expand All @@ -199,31 +248,21 @@ class Task:
Keyword arguments to be passed in the `func`.
"""

def __init__(self, func, data_ids, obj_store, *args, **kwargs):
self._func = func
def __init__(self, func, operation_type, obj_store, *args, data_ids=None, **kwargs):
self._args = args
self._kwargs = kwargs
self._obj_store = obj_store
self.func = func
self.operation_type = operation_type
self.data_ids = data_ids
self.obj_store = obj_store

def __call__(self):
"""
Execute `self._func`.

If `self._args`/`self._kwargs` has ``DataID`` objects,
automaterialize happens.

Returns
-------
object
The result of `self._func` invocation.
"""
def get_parameters(self):
materialized_args = [
self.obj_store.get(arg) if isinstance(arg, DataID) else arg
self._obj_store.get(arg) if isinstance(arg, DataID) else arg
for arg in self._args
]
materialized_kwargs = {
key: self.obj_store.get(value) if isinstance(value, DataID) else value
key: self._obj_store.get(value) if isinstance(value, DataID) else value
for key, value in self._kwargs.items()
}
return self._func(*materialized_args, **materialized_kwargs)
return materialized_args, materialized_kwargs
Loading