Skip to content

Commit 715e700

Browse files
author
Ritvik Rao
committed
workaround for mac library path
2 parents 3695cd0 + 666cd10 commit 715e700

24 files changed

+1543
-88
lines changed

.github/workflows/charm4py.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ jobs:
1717
- name: build-charm4py
1818
run: |
1919
git fetch --unshallow # Need full repo for 'git describe' used by setup.py
20-
pip3 install setuptools 'cython<3' cffi greenlet numpy
20+
pip3 install setuptools cython cffi greenlet numpy torch torchvision
2121
git clone https://github.com/UIUC-PPL/charm charm_src/charm
2222
export CHARM_EXTRA_BUILD_OPTS="--enable-error-checking"
2323
export CHARM_BUILD_PROCESSES=2

auto_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ def searchForPython(python_implementations):
3030

3131

3232
# ----------------------------------------------------------------------------------
33-
TIMEOUT = 60 # timeout for each test (in seconds)
33+
TIMEOUT = 120 # timeout for each test (in seconds)
3434
CHARM_QUIET_AFTER_NUM_TESTS = 5
3535

3636
commonArgs = ['++local']

charm4py/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
from .chare import Chare, Group, Array, ArrayMap
3232
from .channel import Channel
33+
from .object_store import ObjectStore
3334

3435
def checkCharmStarted():
3536
if not charm.started:

charm4py/c_object_store.pxd

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# cython: language_level=3, embedsignature=True
2+
# distutils: language=c++
3+
4+
from libcpp.list cimport list
5+
from libcpp.vector cimport vector
6+
from libcpp.pair cimport pair
7+
from libcpp.unordered_map cimport unordered_map
8+
from libc.stdint cimport uint64_t
9+
10+
import numpy as np
11+
cimport numpy as np
12+
13+
ctypedef uint64_t ObjectId
14+
ctypedef unordered_map[ObjectId, void*] ObjectMap
15+
ctypedef unordered_map[ObjectId, void*].iterator ObjectMapIterator
16+
ctypedef unordered_map[ObjectId, vector[int]] ObjectPEMap
17+
ctypedef unordered_map[ObjectId, vector[int]].iterator ObjectPEMapIterator
18+
19+
ctypedef pair[void*, int] MessageDependency
20+
ctypedef list[MessageDependency*] DependencyList
21+
ctypedef list[MessageDependency*].iterator DependencyListIterator
22+
ctypedef unordered_map[ObjectId, DependencyList] DependencyMap
23+
ctypedef unordered_map[ObjectId, DependencyList].iterator DependencyMapIterator
24+
25+
26+
cdef class MessageBuffer:
27+
cdef DependencyMap dependecies
28+
29+
cpdef void insert(self, object obj_ids, object msg)
30+
cpdef object check(self, ObjectId obj_id)
31+
32+
33+
cdef class CObjectStore:
34+
cdef uint64_t replica_choice
35+
cdef ObjectMap object_map
36+
cdef ObjectPEMap location_map
37+
cdef ObjectPEMap obj_req_buffer
38+
cdef ObjectPEMap loc_req_buffer
39+
cdef ObjectPEMap obj_loc_req_buffer
40+
cdef object proxy
41+
42+
cdef void buffer_obj_request(self, ObjectId obj_id, int requesting_pe)
43+
cdef void buffer_loc_request(self, ObjectId obj_id, int requesting_pe)
44+
cdef void buffer_obj_loc_request(self, ObjectId obj_id, int requesting_pe)
45+
cdef void check_obj_requests_buffer(self, ObjectId obj_id)
46+
cdef void check_loc_requests_buffer(self, ObjectId obj_id)
47+
cdef void check_obj_loc_requests_buffer(self, ObjectId obj_id)
48+
49+
cpdef object lookup_object(self, ObjectId obj_id)
50+
cpdef int lookup_location(self, ObjectId obj_id, bint fetch=*)
51+
cdef void insert_object(self, ObjectId obj_id, object obj)
52+
cpdef void insert_object_small(self, ObjectId obj_id, object obj)
53+
cpdef void delete_remote_objects(self, ObjectId obj_id)
54+
cpdef void delete_object(self, ObjectId obj_id)
55+
56+
cdef int choose_pe(self, vector[int] &node_list)
57+
58+
cpdef void update_location(self, ObjectId obj_id, int pe)
59+
cpdef void request_location_object(self, ObjectId obj_id, int requesting_pe)
60+
cpdef void request_location(self, ObjectId obj_id, int requesting_pe)
61+
cpdef void receive_remote_object(self, ObjectId obj_id, object obj)
62+
cpdef void request_object(self, ObjectId obj_id, int requesting_pe)
63+
cpdef void bulk_send_object(self, ObjectId obj_id, np.ndarray[int, ndim=1] requesting_pes)
64+
cpdef void create_object(self, ObjectId obj_id, object obj)

charm4py/c_object_store.pyx

Lines changed: 279 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,279 @@
1+
# cython: language_level=3, embedsignature=True
2+
# distutils: language=c++
3+
4+
from cpython.ref cimport Py_INCREF, Py_DECREF
5+
from cython.operator cimport dereference as deref
6+
from cython.operator cimport preincrement as inc
7+
from cpython.mem cimport PyMem_Malloc, PyMem_Realloc, PyMem_Free
8+
from libc.stdio cimport printf
9+
10+
import sys
11+
import numpy as np
12+
cimport numpy as np
13+
cimport cython
14+
15+
from copy import deepcopy
16+
17+
cdef int OBJ_SIZE_THRESHOLD = 1024
18+
19+
cdef extern from "numpy/arrayobject.h":
20+
cdef void import_array()
21+
22+
ctypedef struct PyArrayObject:
23+
char *data
24+
np.npy_intp *dimensions
25+
26+
cdef enum NPY_TYPES:
27+
NPY_INT,
28+
NPY_UINT,
29+
NPY_LONG,
30+
NPY_FLOAT,
31+
NPY_DOUBLE
32+
33+
np.ndarray PyArray_SimpleNewFromData(int, np.npy_intp*, int, void*)
34+
35+
36+
cdef class MessageBuffer:
37+
def __init__(self):
38+
pass
39+
40+
def __cinit__(self):
41+
self.dependecies = DependencyMap()
42+
43+
cpdef void insert(self, object obj_ids, object msg):
44+
cdef int ndeps = len(obj_ids)
45+
cdef MessageDependency* dep = <MessageDependency*> PyMem_Malloc(
46+
sizeof(MessageDependency)
47+
)
48+
Py_INCREF(msg)
49+
deref(dep).first = <void*> msg
50+
deref(dep).second = ndeps
51+
cdef DependencyMapIterator it
52+
cdef DependencyList* dep_list
53+
54+
cdef ObjectId obj_id
55+
for i in range(ndeps):
56+
obj_id = <ObjectId> obj_ids[i]
57+
it = self.dependecies.find(obj_id)
58+
if it == self.dependecies.end():
59+
self.dependecies[obj_id] = DependencyList()
60+
dep_list = &(self.dependecies[obj_id])
61+
deref(dep_list).push_back(dep)
62+
63+
cpdef object check(self, ObjectId obj_id):
64+
cdef DependencyList* dep_list
65+
cdef DependencyMapIterator it = self.dependecies.find(obj_id)
66+
cdef DependencyListIterator dep_list_it
67+
cdef object completed = []
68+
if it != self.dependecies.end():
69+
dep_list = &(self.dependecies[obj_id])
70+
dep_list_it = deref(dep_list).begin()
71+
while dep_list_it != deref(dep_list).end():
72+
deref(dep_list_it)[0].second -= 1
73+
if deref(deref(dep_list_it)).second == 0:
74+
# this element dependencies are satisfied
75+
# send it to scheduling
76+
completed.append(<object> deref(deref(dep_list_it)).first)
77+
Py_DECREF(<object> deref(deref(dep_list_it)).first)
78+
# remove from buffer
79+
PyMem_Free(deref(dep_list_it))
80+
dep_list_it = deref(dep_list).erase(dep_list_it)
81+
else:
82+
inc(dep_list_it)
83+
return completed
84+
85+
86+
cdef class CObjectStore:
87+
def __init__(self, proxy):
88+
self.proxy = proxy
89+
90+
def __cinit__(self, proxy):
91+
self.replica_choice = 0
92+
self.object_map = ObjectMap()
93+
self.location_map = ObjectPEMap()
94+
self.obj_req_buffer = ObjectPEMap()
95+
self.loc_req_buffer = ObjectPEMap()
96+
self.obj_loc_req_buffer = ObjectPEMap()
97+
98+
cpdef object lookup_object(self, ObjectId obj_id):
99+
#from charm4py import charm
100+
#charm.print_dbg("Lookup on", obj_id)
101+
cdef ObjectMapIterator it = self.object_map.find(obj_id)
102+
if it == self.object_map.end():
103+
return None
104+
return <object> deref(it).second
105+
106+
cdef void insert_object(self, ObjectId obj_id, object obj):
107+
#FIXME when is a copy required here?
108+
#obj_copy = deepcopy(obj)
109+
if self.object_map.find(obj_id) != self.object_map.end():
110+
return
111+
Py_INCREF(obj)
112+
self.object_map[obj_id] = <void*> obj
113+
114+
cpdef void insert_object_small(self, ObjectId obj_id, object obj):
115+
from charm4py import charm
116+
#FIXME when is a copy required here?
117+
#obj_copy = deepcopy(obj)
118+
if self.object_map.find(obj_id) != self.object_map.end():
119+
return
120+
Py_INCREF(obj)
121+
self.object_map[obj_id] = <void*> obj
122+
self.location_map[obj_id].push_back(<int> charm.myPe())
123+
self.check_loc_requests_buffer(obj_id)
124+
self.check_obj_loc_requests_buffer(obj_id)
125+
126+
cpdef void delete_remote_objects(self, ObjectId obj_id):
127+
cdef ObjectPEMapIterator it = self.location_map.find(obj_id)
128+
if it == self.location_map.end():
129+
return
130+
cdef int* pe_arr = deref(it).second.data()
131+
cdef int size = deref(it).second.size()
132+
cdef int i
133+
for i in range(size):
134+
self.proxy[pe_arr[i]].delete_object(obj_id)
135+
136+
cpdef void delete_object(self, ObjectId obj_id):
137+
cdef ObjectMapIterator it = self.object_map.find(obj_id)
138+
cdef object obj
139+
if it != self.object_map.end():
140+
obj = <object> deref(it).second
141+
Py_DECREF(obj)
142+
self.object_map.erase(obj_id)
143+
144+
cdef void buffer_obj_request(self, ObjectId obj_id, int requesting_pe):
145+
if self.obj_req_buffer.find(obj_id) == self.obj_req_buffer.end():
146+
self.obj_req_buffer[obj_id] = vector[int]()
147+
self.obj_req_buffer[obj_id].push_back(requesting_pe)
148+
149+
cdef void buffer_loc_request(self, ObjectId obj_id, int requesting_pe):
150+
if self.loc_req_buffer.find(obj_id) == self.loc_req_buffer.end():
151+
self.loc_req_buffer[obj_id] = vector[int]()
152+
self.loc_req_buffer[obj_id].push_back(requesting_pe)
153+
154+
cdef void buffer_obj_loc_request(self, ObjectId obj_id, int requesting_pe):
155+
if self.obj_loc_req_buffer.find(obj_id) == self.obj_loc_req_buffer.end():
156+
self.obj_loc_req_buffer[obj_id] = vector[int]()
157+
self.obj_loc_req_buffer[obj_id].push_back(requesting_pe)
158+
159+
@cython.cdivision(True)
160+
cdef int choose_pe(self, vector[int] &node_list):
161+
# replica choice should be per entry
162+
cdef int pe = node_list[self.replica_choice % node_list.size()]
163+
self.replica_choice += 1
164+
return pe
165+
166+
@cython.cdivision(True)
167+
cpdef int lookup_location(self, ObjectId obj_id, bint fetch=True):
168+
from charm4py import charm
169+
cdef ObjectPEMapIterator it = self.location_map.find(obj_id)
170+
if it != self.location_map.end():
171+
return self.choose_pe(deref(it).second)
172+
cdef int npes
173+
if fetch:
174+
npes = charm.numPes()
175+
self.proxy[obj_id % npes].request_location(obj_id, charm.myPe())
176+
return -1
177+
178+
cdef void check_loc_requests_buffer(self, ObjectId obj_id):
179+
cdef ObjectPEMapIterator it = self.loc_req_buffer.find(obj_id)
180+
if it == self.loc_req_buffer.end():
181+
return
182+
# TODO is this creating a copy of the vector?
183+
cdef vector[int] vec = deref(it).second
184+
cdef np.npy_intp size = vec.size()
185+
cdef int pe = self.lookup_location(obj_id, fetch=False)
186+
cdef int[::1] arr = <int [:vec.size()]> vec.data()
187+
cdef np.ndarray[int, ndim=1] req_pes = np.asarray(arr)
188+
self.proxy[pe].bulk_request_location(obj_id, req_pes)
189+
self.loc_req_buffer.erase(obj_id)
190+
191+
cdef void check_obj_loc_requests_buffer(self, ObjectId obj_id):
192+
cdef ObjectPEMapIterator it = self.obj_loc_req_buffer.find(obj_id)
193+
if it == self.obj_loc_req_buffer.end():
194+
return
195+
cdef vector[int] vec = deref(it).second
196+
cdef np.npy_intp size = vec.size()
197+
cdef int pe = self.lookup_location(obj_id, fetch=False)
198+
cdef int[::1] arr = <int [:vec.size()]> vec.data()
199+
cdef np.ndarray[int, ndim=1] req_pes = np.asarray(arr)
200+
self.proxy[pe].bulk_send_object(obj_id, req_pes)
201+
self.loc_req_buffer.erase(obj_id)
202+
203+
cdef void check_obj_requests_buffer(self, ObjectId obj_id):
204+
cdef ObjectPEMapIterator it = self.obj_req_buffer.find(obj_id)
205+
if it == self.obj_req_buffer.end():
206+
return
207+
cdef vector[int] vec = deref(it).second
208+
cdef np.npy_intp size = vec.size()
209+
cdef int[::1] arr = <int [:vec.size()]> vec.data()
210+
cdef np.ndarray[int, ndim=1] req_pes = np.asarray(arr)
211+
self.bulk_send_object(obj_id, req_pes)
212+
self.obj_req_buffer.erase(obj_id)
213+
214+
cpdef void update_location(self, ObjectId obj_id, int pe):
215+
cdef bint new_entry = False
216+
if self.location_map.find(obj_id) == self.location_map.end():
217+
self.location_map[obj_id] = vector[int]()
218+
new_entry = True
219+
self.location_map[obj_id].push_back(pe)
220+
if new_entry:
221+
self.check_loc_requests_buffer(obj_id)
222+
self.check_obj_loc_requests_buffer(obj_id)
223+
224+
cpdef void request_location_object(self, ObjectId obj_id, int requesting_pe):
225+
# this function is intended to be called on home pes of the object id
226+
cdef object obj = self.lookup_object(obj_id)
227+
if not (obj is None):
228+
self.proxy[requesting_pe].receive_remote_object(obj_id, obj)
229+
return
230+
cdef int pe = self.lookup_location(obj_id, fetch=False)
231+
if pe == -1:
232+
self.buffer_obj_loc_request(obj_id, requesting_pe)
233+
else:
234+
self.proxy[pe].request_object(obj_id, requesting_pe)
235+
self.location_map[obj_id].push_back(requesting_pe)
236+
237+
cpdef void request_location(self, ObjectId obj_id, int requesting_pe):
238+
# this function is intended to be called on home pes of the object id
239+
cdef int pe = self.lookup_location(obj_id)
240+
if pe == -1:
241+
self.buffer_loc_request(obj_id, requesting_pe)
242+
else:
243+
self.proxy[requesting_pe].update_location(obj_id, pe)
244+
245+
cpdef void receive_remote_object(self, ObjectId obj_id, object obj):
246+
self.insert_object(obj_id, obj)
247+
self.check_obj_requests_buffer(obj_id)
248+
249+
cpdef void request_object(self, ObjectId obj_id, int requesting_pe):
250+
cdef object obj = self.lookup_object(obj_id)
251+
if obj is None:
252+
self.buffer_obj_request(obj_id, requesting_pe)
253+
else:
254+
self.proxy[requesting_pe].receive_remote_object(obj_id, obj)
255+
256+
@cython.boundscheck(False)
257+
@cython.wraparound(False)
258+
@cython.cdivision(True)
259+
cpdef void bulk_send_object(self, ObjectId obj_id, np.ndarray[int, ndim=1] requesting_pes):
260+
cdef object obj = self.lookup_object(obj_id)
261+
for i in range(requesting_pes.shape[0]):
262+
self.proxy[requesting_pes[i]].receive_remote_object(obj_id, obj)
263+
264+
@cython.cdivision(True)
265+
cpdef void create_object(self, ObjectId obj_id, object obj):
266+
from charm4py import charm
267+
cdef int npes = charm.numPes()
268+
269+
# add logic to check size of obj
270+
cdef int size = sys.getsizeof(obj)
271+
if size < OBJ_SIZE_THRESHOLD:
272+
# in this case keep the object data on home
273+
self.proxy[obj_id % npes].insert_object_small(obj_id, obj)
274+
else:
275+
#insert to local object map
276+
self.insert_object(obj_id, obj)
277+
278+
#send a message to home to add entry
279+
self.proxy[obj_id % npes].update_location(obj_id, charm.myPe())

0 commit comments

Comments
 (0)