Skip to content
This repository was archived by the owner on May 1, 2025. It is now read-only.

Commit a98b712

Browse files
authored
Merge pull request #87 from salesforce/random-reset
Random reset from the pool
2 parents d48766e + 97fa410 commit a98b712

20 files changed

+644
-66
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
# Changelog
2+
# Release 2.5 (2022-07-27)
3+
- Introduce environment reset pool, so concurrent enviornment replicas can randomly reset themselves from the pool.
4+
25
# Release 2.4 (2022-06-16)
36
- Introduce new device context management and autoinit_pycuda
47
- Therefore, Torch (any version) will not conflict with PyCUDA in the GPU context

example_envs/tag_continuous/tag_continuous.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
import numpy as np
1111
from gym import spaces
12-
from gym.utils import seeding
1312

1413
from warp_drive.utils.constants import Constants
1514
from warp_drive.utils.data_feed import DataFeed
@@ -313,7 +312,7 @@ def seed(self, seed=None):
313312
Note: this uses the code in
314313
https://github.com/openai/gym/blob/master/gym/utils/seeding.py
315314
"""
316-
self.np_random, seed = seeding.np_random(seed)
315+
self.np_random.seed(seed)
317316
return [seed]
318317

319318
def set_global_state(self, key=None, value=None, t=None, dtype=None):
@@ -756,10 +755,6 @@ def get_data_dictionary(self):
756755
)
757756
return data_dict
758757

759-
def get_tensor_dictionary(self):
760-
tensor_dict = DataFeed()
761-
return tensor_dict
762-
763758
def reset(self):
764759
"""
765760
Env reset().

example_envs/tag_gridworld/tag_gridworld.py

Lines changed: 95 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
import numpy as np
88
from gym import spaces
9-
from gym.utils import seeding
109

1110
# seeding code from https://github.com/openai/gym/blob/master/gym/utils/seeding.py
1211
from warp_drive.utils.constants import Constants
@@ -130,7 +129,7 @@ def __init__(
130129
name = "TagGridWorld"
131130

132131
def seed(self, seed=None):
133-
self.np_random, seed = seeding.np_random(seed)
132+
self.np_random.seed(seed)
134133
return [seed]
135134

136135
def set_global_state(self, key=None, value=None, t=None, dtype=None):
@@ -349,9 +348,100 @@ def get_data_dictionary(self):
349348
)
350349
return data_dict
351350

352-
def get_tensor_dictionary(self):
353-
tensor_dict = DataFeed()
354-
return tensor_dict
351+
def step(self, actions=None):
352+
self.timestep += 1
353+
args = [
354+
_LOC_X,
355+
_LOC_Y,
356+
_ACTIONS,
357+
"_done_",
358+
_REWARDS,
359+
_OBSERVATIONS,
360+
"wall_hit_penalty",
361+
"tag_reward_for_tagger",
362+
"tag_penalty_for_runner",
363+
"step_cost_for_tagger",
364+
"use_full_observation",
365+
"world_boundary",
366+
"_timestep_",
367+
("episode_length", "meta"),
368+
]
369+
if self.env_backend == "pycuda":
370+
self.cuda_step(
371+
*self.cuda_step_function_feed(args),
372+
block=self.cuda_function_manager.block,
373+
grid=self.cuda_function_manager.grid,
374+
)
375+
elif self.env_backend == "numba":
376+
self.cuda_step[
377+
self.cuda_function_manager.grid, self.cuda_function_manager.block
378+
](*self.cuda_step_function_feed(args))
379+
else:
380+
raise Exception("CUDATagGridWorld expects env_backend = 'pycuda' or 'numba' ")
381+
382+
383+
class CUDATagGridWorldWithResetPool(TagGridWorld, CUDAEnvironmentContext):
384+
"""
385+
CUDA version of the TagGridWorld environment and with reset pool for the starting point.
386+
Note: this class subclasses the Python environment class TagGridWorld,
387+
and also the CUDAEnvironmentContext
388+
"""
389+
390+
def get_data_dictionary(self):
391+
data_dict = DataFeed()
392+
for feature in [
393+
_LOC_X,
394+
_LOC_Y,
395+
]:
396+
data_dict.add_data(
397+
name=feature,
398+
data=self.global_state[feature][0],
399+
save_copy_and_apply_at_reset=False,
400+
log_data_across_episode=False,
401+
)
402+
data_dict.add_data_list(
403+
[
404+
("wall_hit_penalty", self.wall_hit_penalty),
405+
("tag_reward_for_tagger", self.tag_reward_for_tagger),
406+
("tag_penalty_for_runner", self.tag_penalty_for_runner),
407+
("step_cost_for_tagger", self.step_cost_for_tagger),
408+
("use_full_observation", self.use_full_observation),
409+
("world_boundary", self.grid_length),
410+
]
411+
)
412+
return data_dict
413+
414+
def get_reset_pool_dictionary(self):
415+
416+
def _random_location_generator():
417+
starting_location_x = self.np_random.choice(
418+
np.linspace(1, int(self.grid_length) - 1, int(self.grid_length) - 1),
419+
self.num_agents
420+
).astype(np.int32)
421+
starting_location_x[-1] = 0
422+
starting_location_y = self.np_random.choice(
423+
np.linspace(1, int(self.grid_length) - 1, int(self.grid_length) - 1),
424+
self.num_agents
425+
).astype(np.int32)
426+
starting_location_y[-1] = 0
427+
return starting_location_x, starting_location_y
428+
429+
N = 5 # we hard code the number of env pool for this demo purpose
430+
x_pool = []
431+
y_pool = []
432+
for _ in range(N):
433+
x, y = _random_location_generator()
434+
x_pool.append(x)
435+
y_pool.append(y)
436+
437+
x_pool = np.stack(x_pool, axis=0)
438+
y_pool = np.stack(y_pool, axis=0)
439+
440+
reset_pool_dict = DataFeed()
441+
reset_pool_dict.add_pool_for_reset(name=f"{_LOC_X}_reset_pool", data=x_pool, reset_target=_LOC_X)
442+
reset_pool_dict.add_pool_for_reset(name=f"{_LOC_Y}_reset_pool", data=y_pool, reset_target=_LOC_Y)
443+
444+
return reset_pool_dict
355445

356446
def step(self, actions=None):
357447
self.timestep += 1

requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
gym>=0.18, <0.26
1+
gym>=0.26
22
matplotlib>=3.2.1
33
numpy>=1.18.1
44
pycuda>=2022.1

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
setup(
1616
name="rl-warp-drive",
17-
version="2.4",
17+
version="2.5.0",
1818
author="Tian Lan, Sunil Srinivasa, Brenton Chu, Stephan Zheng",
1919
author_email="tian.lan@salesforce.com",
2020
description="Framework for fast end-to-end "
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
# Copyright (c) 2021, salesforce.com, inc.
2+
# All rights reserved.
3+
# SPDX-License-Identifier: BSD-3-Clause
4+
# For full license text, see the LICENSE file in the repo root
5+
# or https://opensource.org/licenses/BSD-3-Clause
6+
7+
import unittest
8+
9+
import numpy as np
10+
import torch
11+
12+
from warp_drive.managers.numba_managers.numba_data_manager import NumbaDataManager
13+
from warp_drive.managers.numba_managers.numba_function_manager import (
14+
NumbaEnvironmentReset,
15+
NumbaFunctionManager,
16+
)
17+
from warp_drive.utils.common import get_project_root
18+
from warp_drive.utils.data_feed import DataFeed
19+
20+
_NUMBA_FILEPATH = f"warp_drive.numba_includes"
21+
22+
23+
class TestEnvironmentReset(unittest.TestCase):
24+
"""
25+
Unit tests for the CUDA environment resetter
26+
"""
27+
28+
def __init__(self, *args, **kwargs):
29+
super().__init__(*args, **kwargs)
30+
self.dm = NumbaDataManager(num_agents=5, num_envs=2, episode_length=2)
31+
self.fm = NumbaFunctionManager(
32+
num_agents=int(self.dm.meta_info("n_agents")),
33+
num_envs=int(self.dm.meta_info("n_envs")),
34+
)
35+
self.fm.import_numba_from_source_code(f"{_NUMBA_FILEPATH}.test_build")
36+
self.resetter = NumbaEnvironmentReset(function_manager=self.fm)
37+
38+
def test_reset_for_different_dim(self):
39+
40+
self.dm.data_on_device_via_torch("_done_")[:] = torch.from_numpy(
41+
np.array([1, 0])
42+
).cuda()
43+
44+
done = self.dm.pull_data_from_device("_done_")
45+
self.assertSequenceEqual(list(done), [1, 0])
46+
47+
# expected mean would be around 0.5 * (1+2+3+15) / 4 = 2.625
48+
a_reset_pool = np.random.rand(4, 10, 10)
49+
a_reset_pool[1] *= 2
50+
a_reset_pool[2] *= 3
51+
a_reset_pool[3] *= 15
52+
53+
b_reset_pool = np.random.rand(4, 100)
54+
b_reset_pool[1] *= 2
55+
b_reset_pool[2] *= 3
56+
b_reset_pool[3] *= 15
57+
58+
c_reset_pool = np.random.rand(100)
59+
60+
data_feed = DataFeed()
61+
data_feed.add_data(
62+
name="a", data=np.random.randn(2, 10, 10), save_copy_and_apply_at_reset=False
63+
)
64+
data_feed.add_pool_for_reset(name="a_reset_pool", data=a_reset_pool, reset_target="a")
65+
data_feed.add_data(
66+
name="b", data=np.random.randn(2, 100), save_copy_and_apply_at_reset=False
67+
)
68+
data_feed.add_pool_for_reset(name="b_reset_pool", data=b_reset_pool, reset_target="b")
69+
data_feed.add_data(
70+
name="c", data=np.random.randn(2), save_copy_and_apply_at_reset=False
71+
)
72+
data_feed.add_pool_for_reset(name="c_reset_pool", data=c_reset_pool, reset_target="c")
73+
74+
self.dm.push_data_to_device(data_feed)
75+
76+
self.resetter.init_reset_pool(self.dm)
77+
78+
a = self.dm.pull_data_from_device("a")
79+
b = self.dm.pull_data_from_device("b")
80+
c = self.dm.pull_data_from_device("c")
81+
82+
# soft reset
83+
a_after_reset_0_mean = []
84+
a_after_reset_1_mean = []
85+
b_after_reset_0_mean = []
86+
b_after_reset_1_mean = []
87+
c_after_reset_0_mean = []
88+
c_after_reset_1_mean = []
89+
90+
for _ in range(2000):
91+
self.resetter.reset_when_done(self.dm, mode="if_done", undo_done_after_reset=False)
92+
a_after_reset = self.dm.pull_data_from_device("a")
93+
a_after_reset_0_mean.append(a_after_reset[0].mean())
94+
a_after_reset_1_mean.append(a_after_reset[1].mean())
95+
b_after_reset = self.dm.pull_data_from_device("b")
96+
b_after_reset_0_mean.append(b_after_reset[0].mean())
97+
b_after_reset_1_mean.append(b_after_reset[1].mean())
98+
c_after_reset = self.dm.pull_data_from_device("c")
99+
c_after_reset_0_mean.append(c_after_reset[0].mean())
100+
c_after_reset_1_mean.append(c_after_reset[1].mean())
101+
# env 0 should have 1000 times random reset from the pool, so it should close to a_reset_pool.mean()
102+
print(a_reset_pool.mean())
103+
print(np.array(a_after_reset_0_mean).mean())
104+
self.assertTrue(np.absolute(a_reset_pool.mean() - np.array(a_after_reset_0_mean).mean()) < 5e-1)
105+
print(b_reset_pool.mean())
106+
print(np.array(b_after_reset_0_mean).mean())
107+
self.assertTrue(np.absolute(b_reset_pool.mean() - np.array(b_after_reset_0_mean).mean()) < 5e-1)
108+
print(c_reset_pool.mean())
109+
print(np.array(c_after_reset_0_mean).mean())
110+
self.assertTrue(np.absolute(c_reset_pool.mean() - np.array(c_after_reset_0_mean).mean()) < 5e-1)
111+
# env 1 has no reset at all, so it should be exactly the same as the original one
112+
self.assertTrue(np.absolute(a[1].mean() - np.array(a_after_reset_1_mean).mean()) < 1e-5)
113+
self.assertTrue(np.absolute(b[1].mean() - np.array(b_after_reset_1_mean).mean()) < 1e-5)
114+
self.assertTrue(np.absolute(c[1].mean() - np.array(c_after_reset_1_mean).mean()) < 1e-5)
115+
116+
# hard reset
117+
a_after_reset_0_mean = []
118+
a_after_reset_1_mean = []
119+
b_after_reset_0_mean = []
120+
b_after_reset_1_mean = []
121+
c_after_reset_0_mean = []
122+
c_after_reset_1_mean = []
123+
for _ in range(2000):
124+
self.resetter.reset_when_done(self.dm, mode="force_reset", undo_done_after_reset=False)
125+
a_after_reset = self.dm.pull_data_from_device("a")
126+
a_after_reset_0_mean.append(a_after_reset[0].mean())
127+
a_after_reset_1_mean.append(a_after_reset[1].mean())
128+
b_after_reset = self.dm.pull_data_from_device("b")
129+
b_after_reset_0_mean.append(b_after_reset[0].mean())
130+
b_after_reset_1_mean.append(b_after_reset[1].mean())
131+
c_after_reset = self.dm.pull_data_from_device("c")
132+
c_after_reset_0_mean.append(c_after_reset[0].mean())
133+
c_after_reset_1_mean.append(c_after_reset[1].mean())
134+
# env 0 should have 1000 times random reset from the pool, so it should close to a_reset_pool.mean()
135+
self.assertTrue(np.absolute(a_reset_pool.mean() - np.array(a_after_reset_0_mean).mean()) < 5e-1)
136+
self.assertTrue(np.absolute(b_reset_pool.mean() - np.array(b_after_reset_0_mean).mean()) < 5e-1)
137+
self.assertTrue(np.absolute(c_reset_pool.mean() - np.array(c_after_reset_0_mean).mean()) < 5e-1)
138+
# env 1 should have 1000 times random reset from the pool, so it should close to a_reset_pool.mean()
139+
self.assertTrue(np.absolute(a_reset_pool.mean() - np.array(a_after_reset_1_mean).mean()) < 5e-1)
140+
self.assertTrue(np.absolute(b_reset_pool.mean() - np.array(b_after_reset_1_mean).mean()) < 5e-1)
141+
self.assertTrue(np.absolute(c_reset_pool.mean() - np.array(c_after_reset_1_mean).mean()) < 5e-1)
142+
143+
144+
145+

warp_drive/env_cpu_gpu_consistency_checker.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import numpy as np
1414
import torch
1515
from gym.spaces import Discrete, MultiDiscrete
16-
from gym.utils import seeding
1716

1817
from warp_drive.env_wrapper import EnvWrapper
1918
from warp_drive.training.utils.data_loader import (
@@ -38,10 +37,9 @@ def generate_random_actions(env, num_envs, seed=None):
3837
Generate random actions for each agent and each env.
3938
"""
4039
agent_ids = list(env.action_space.keys())
40+
np_random = np.random
4141
if seed is not None:
42-
np_random = seeding.np_random(seed)[0]
43-
else:
44-
np_random = np.random
42+
np_random.seed(seed)
4543

4644
return [
4745
{

warp_drive/env_wrapper.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,9 +291,10 @@ def repeat_across_env_dimension(array, num_envs):
291291
# Copy host data and tensors to device
292292
# Note: this happens only once after the first reset on the host
293293

294-
# Add env dimension to data if "save_copy_and_apply_at_reset" is True
295294
data_dictionary = self.env.get_data_dictionary()
296295
tensor_dictionary = self.env.get_tensor_dictionary()
296+
reset_pool_dictionary = self.env.get_reset_pool_dictionary()
297+
# Add env dimension to data if "save_copy_and_apply_at_reset" is True
297298
for key in data_dictionary:
298299
if data_dictionary[key]["attributes"][
299300
"save_copy_and_apply_at_reset"
@@ -309,13 +310,35 @@ def repeat_across_env_dimension(array, num_envs):
309310
tensor_dictionary[key]["data"] = repeat_across_env_dimension(
310311
tensor_dictionary[key]["data"], self.n_envs
311312
)
313+
# Add env dimension to data if "is_reset_pool" exists for this data
314+
# if so, also check this data has "save_copy_and_apply_at_reset" = False
315+
for key in reset_pool_dictionary:
316+
if "is_reset_pool" in reset_pool_dictionary[key]["attributes"] and \
317+
reset_pool_dictionary[key]["attributes"]["is_reset_pool"]:
318+
# find the corresponding target data
319+
reset_target = reset_pool_dictionary[key]["attributes"]["reset_target"]
320+
if reset_target in data_dictionary:
321+
assert not data_dictionary[reset_target]["attributes"]["save_copy_and_apply_at_reset"]
322+
data_dictionary[reset_target]["data"] = repeat_across_env_dimension(
323+
data_dictionary[reset_target]["data"], self.n_envs
324+
)
325+
elif reset_target in tensor_dictionary:
326+
assert not tensor_dictionary[reset_target]["attributes"]["save_copy_and_apply_at_reset"]
327+
tensor_dictionary[reset_target]["data"] = repeat_across_env_dimension(
328+
tensor_dictionary[reset_target]["data"], self.n_envs
329+
)
330+
else:
331+
raise Exception(f"Fail to locate the target data {reset_target} for the reset pool "
332+
f"in neither data_dictionary nor tensor_dictionary")
312333

313334
self.cuda_data_manager.push_data_to_device(data_dictionary)
314335

315336
self.cuda_data_manager.push_data_to_device(
316337
tensor_dictionary, torch_accessible=True
317338
)
318339

340+
self.cuda_data_manager.push_data_to_device(reset_pool_dictionary)
341+
319342
# All subsequent resets happen on the GPU
320343
self.reset_on_host = False
321344

@@ -329,6 +352,9 @@ def repeat_across_env_dimension(array, num_envs):
329352
return {}
330353
return obs # CPU version
331354

355+
def init_reset_pool(self, seed=None):
356+
self.env_resetter.init_reset_pool(self.cuda_data_manager, seed)
357+
332358
def reset_only_done_envs(self):
333359
"""
334360
This function only works for GPU example_envs.

0 commit comments

Comments
 (0)