Skip to content

Commit f95b24c

Browse files
committed
Removing the random_seed parameter, as is no longer necessary, thanks to the working synchronization mechanism
1 parent 67753dd commit f95b24c

File tree

3 files changed

+50
-70
lines changed

3 files changed

+50
-70
lines changed

dhalsim/physical_process.py

Lines changed: 46 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,8 @@ def __init__(self, intermediate_yaml):
120120
self.master_time = -1
121121

122122
self.db_update_string = "UPDATE plant SET value = ? WHERE name = ?"
123-
self.db_sleep_time = random.uniform(0.01, 0.1)
124123

124+
self.db_sleep_time = random.uniform(0.01, 0.1)
125125
self.logger.info("DB Sleep time: " + str(self.db_sleep_time))
126126

127127

@@ -260,7 +260,6 @@ def create_link_header(a_list):
260260
def create_attack_header(self):
261261
"""
262262
Function that creates csv list headers for device and network attacks
263-
264263
:return: list of attack names starting with device and ending with network
265264
"""
266265
result = []
@@ -311,19 +310,25 @@ def register_initial_results(self):
311310
[self.wn.junctions[junction].pressure])
312311
elif self.simulator == 'wntr':
313312
for junction in self.junction_list:
314-
self.values_list.extend(
315-
[self.wn.get_node(junction).head - self.wn.get_node(junction).elevation])
313+
# toDo: Check in wntr 0.4.2 a new way of getting the initial junction pressure
314+
self.values_list.extend([0])
316315

317316
if self.simulator == 'epynet':
318317
# Get pumps flows and status
318+
self.logger.debug('Registering initial results of pumps: ' + str(self.pump_list))
319319
for pump in self.pump_list:
320-
self.values_list.extend([self.wn.pumps[pump].flow, self.wn.pumps[pump].status])
320+
321+
if pump in self.wn.pumps:
322+
self.values_list.extend([self.wn.pumps[pump].flow, self.wn.pumps[pump].status])
323+
elif pump in self.wn.valves:
324+
self.values_list.extend([self.wn.valves[pump].flow, self.wn.valves[pump].status])
325+
else:
326+
self.logger.error("Error. Actuator " + str(pump) + " not found in EPANET file")
321327

322328
elif self.simulator == 'wntr':
323329

324330
for pump in self.pump_list:
325331
self.values_list.extend([self.wn.get_link(pump).flow])
326-
327332
if type(self.wn.get_link(pump).status) is int:
328333
self.values_list.extend([self.wn.get_link(pump).status])
329334
else:
@@ -518,7 +523,6 @@ def get_plcs_ready(self, flag):
518523
def get_attack_flag(self, name):
519524
"""
520525
Get the attack flag of this attack.
521-
522526
:return: False if attack not running, true otherwise
523527
"""
524528

@@ -701,51 +705,66 @@ def simulate_with_epynet(self, iteration_limit, p_bar):
701705
#time.sleep(0.3)
702706

703707
def simulate_with_wntr(self, iteration_limit, p_bar):
704-
self.logger.info("Starting wntr simulation")
708+
self.logger.info("Starting WNTR simulation")
705709
self.wn.options.time.duration = self.wn.options.time.hydraulic_timestep
706710

711+
self.register_initial_results()
712+
self.results_list.append(self.values_list)
713+
707714
while self.master_time < iteration_limit:
708-
conn = sqlite3.connect(self.data["db_path"])
709-
c = conn.cursor()
710-
c.execute("REPLACE INTO master_time (id, time) VALUES(1, ?)", (str(self.master_time),))
711-
conn.commit()
712715

713-
self.master_time = self.master_time + 1
716+
# We check that all PLCs updated their local caches and local CPPPO
717+
while not self.get_plcs_ready(1):
718+
time.sleep(self.WAIT_FOR_FLAG)
714719

715-
while not self.get_plcs_ready():
716-
time.sleep(0.01)
720+
# Notify the PLCs they can start receiving remote values
721+
with sqlite3.connect(self.data["db_path"]) as conn:
722+
c = conn.cursor()
723+
c.execute("UPDATE sync SET flag=2")
724+
conn.commit()
717725

718-
self.update_controls()
726+
# Wait for the PLCs to apply control logic
727+
while not self.get_plcs_ready(3):
728+
time.sleep(self.WAIT_FOR_FLAG)
719729

720-
self.logger.debug("Iteration {x} out of {y}.".format(x=str(self.master_time),
721-
y=str(iteration_limit)))
730+
self.update_controls()
722731

723-
if p_bar:
724-
p_bar.update(self.master_time)
732+
self.logger.debug("Iteration {x} out of {y}.".format(x=str(self.master_time), y=str(iteration_limit)))
725733

726-
# Check for simulation error, print output on exception
727734
try:
728735
self.sim.run_sim(convergence_error=True)
729736
except Exception as exp:
730737
self.logger.error(f"Error in WNTR simulation: {exp}")
731738
self.finish()
732739

733-
self.register_results()
734-
self.results_list.append(self.values_list)
735-
740+
# Updates the SQLite DB
736741
self.update_tanks()
737742
self.update_pumps()
738743
self.update_valves()
739744
self.update_junctions()
740745

746+
self.master_time = self.master_time + 1
747+
conn = sqlite3.connect(self.data["db_path"])
748+
c = conn.cursor()
749+
c.execute("REPLACE INTO master_time (id, time) VALUES(1, ?)", (str(self.master_time),))
750+
conn.commit()
751+
752+
if p_bar:
753+
p_bar.update(self.master_time)
754+
755+
self.register_results()
756+
self.results_list.append(self.values_list)
757+
741758
# Write results of this iteration if needed
742759
if 'saving_interval' in self.data and self.master_time != 0 and \
743760
self.master_time % self.data['saving_interval'] == 0:
744761
self.write_results(self.results_list)
745762

746763
# Set sync flags for nodes
747-
c.execute("UPDATE sync SET flag=0")
748-
conn.commit()
764+
with sqlite3.connect(self.data["db_path"]) as conn:
765+
c = conn.cursor()
766+
c.execute("UPDATE sync SET flag=0")
767+
conn.commit()
749768

750769
def update_tanks(self, network_state=None):
751770
"""Update tanks in database."""
@@ -893,4 +912,4 @@ def is_valid_file(test_parser, arg):
893912
args = parser.parse_args()
894913

895914
simulation = PhysicalPlant(Path(args.intermediate_yaml))
896-
simulation.main()
915+
simulation.main()

dhalsim/python2/generic_plc.py

Lines changed: 3 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import thread
1919

2020

21-
2221
class Error(Exception):
2322
"""Base class for exceptions in this module."""
2423

@@ -112,8 +111,6 @@ def __init__(self, intermediate_yaml_path, yaml_index):
112111
self.plcs_ready = False
113112

114113
for tag in set(dependant_sensors) - set(plc_sensors):
115-
116-
#todo: We should query the DB for this
117114
self.cache[tag] = Decimal(0)
118115
self.tag_fresh[tag] = False
119116

@@ -132,7 +129,6 @@ def generate_real_tags(sensors, dependants, actuators):
132129
"""
133130
Generates real tags with all sensors, dependants, and actuators
134131
attached to the plc.
135-
136132
:param sensors: list of sensors attached to the plc
137133
:param dependants: list of dependant sensors (from other plcs)
138134
:param actuators: list of actuators controlled by the plc
@@ -155,7 +151,6 @@ def generate_real_tags(sensors, dependants, actuators):
155151
def generate_tags(taggable):
156152
"""
157153
Generates tags from a list of taggable entities (sensor or actuator)
158-
159154
:param taggable: a list of strings containing names of things like tanks, pumps, and valves
160155
"""
161156
tags = []
@@ -171,7 +166,6 @@ def generate_tags(taggable):
171166
def create_controls(controls_list):
172167
"""
173168
Generates list of control objects for a plc
174-
175169
:param controls_list: a list of the control dicts to be converted to Control objects
176170
"""
177171
ret = []
@@ -195,7 +189,6 @@ def create_controls(controls_list):
195189
@staticmethod
196190
def create_attacks(attack_list):
197191
"""This function will create an array of DeviceAttacks
198-
199192
:param attack_list: A list of attack dicts that need to be converted to DeviceAttacks
200193
"""
201194
attacks = []
@@ -226,7 +219,6 @@ def pre_loop(self, sleep=0.5):
226219
"""
227220
The pre loop of a PLC. In everything is setup. Like starting the sending thread through
228221
the :class:`~dhalsim.python2.basePLC` class.
229-
230222
:param sleep: (Default value = 0.5) The time to sleep after setting everything up
231223
"""
232224
self.logger.debug(self.intermediate_plc['name'] + ' enters pre_loop')
@@ -259,7 +251,6 @@ def pre_loop(self, sleep=0.5):
259251
def get_tag(self, tag):
260252
"""
261253
Get the value of a tag that is connected to this PLC or over the network.
262-
263254
:param tag: The tag to get
264255
:type tag: str
265256
:return: value of that tag
@@ -317,15 +308,14 @@ def update_cache(self, a, cache_update_time):
317308
res = self.get_tag_for_cache(cached_tag, plc_data["public_ip"], cache_update_time)
318309
if self.get_master_clock() == start_iteration:
319310
self.tag_fresh[cached_tag] = res
320-
311+
321312
if not self.tag_fresh[cached_tag]:
322313
self.logger.info("Warning: Cache for tag " + str(cached_tag) + " could not be updated")
323314
self.tag_fresh[cached_tag] = True
324315

325316
def set_tag(self, tag, value):
326317
"""
327318
Set a tag that is connected to this PLC to a value.
328-
329319
:param tag: Which tag to set
330320
:type tag: str
331321
:param value: value to set the Tag to
@@ -349,14 +339,10 @@ def db_query(self, query, write=False, parameters=None):
349339
On a :code:`sqlite3.OperationalError` it will retry with a max of :code:`DB_TRIES` tries.
350340
Before it reties, it will sleep for :code:`DB_SLEEP_TIME` seconds.
351341
This is necessary because of the limited concurrency in SQLite.
352-
353342
:param query: The SQL query to execute in the db
354343
:type query: str
355-
356344
:param write: Boolean flag to indicate if this query will write into the database
357-
358345
:param parameters: The parameters to put in the query. This must be a tuple.
359-
360346
:raise DatabaseError: When a :code:`sqlite3.OperationalError` is still raised after
361347
:code:`DB_TRIES` tries.
362348
"""
@@ -389,9 +375,7 @@ def get_master_clock(self):
389375
Get the value of the master clock of the physical process through the database.
390376
On a :code:`sqlite3.OperationalError` it will retry with a max of :code:`DB_TRIES` tries.
391377
Before it reties, it will sleep for :code:`DB_SLEEP_TIME` seconds.
392-
393378
:return: Iteration in the physical process.
394-
395379
:raise DatabaseError: When a :code:`sqlite3.OperationalError` is still raised after
396380
:code:`DB_TRIES` tries.
397381
"""
@@ -403,9 +387,7 @@ def get_sync(self, flag):
403387
Get the sync flag of this plc.
404388
On a :code:`sqlite3.OperationalError` it will retry with a max of :code:`DB_TRIES` tries.
405389
Before it reties, it will sleep for :code:`DB_SLEEP_TIME` seconds.
406-
407390
:return: False if physical process wants the plc to do a iteration, True if not.
408-
409391
:raise DatabaseError: When a :code:`sqlite3.OperationalError` is still raised after
410392
:code:`DB_TRIES` tries.
411393
"""
@@ -418,10 +400,8 @@ def set_sync(self, flag):
418400
knows this plc finished the requested iteration.
419401
On a :code:`sqlite3.OperationalError` it will retry with a max of :code:`DB_TRIES` tries.
420402
Before it reties, it will sleep for :code:`DB_SLEEP_TIME` seconds.
421-
422403
:param flag: True for sync to 1, False for sync to 0
423404
:type flag: bool
424-
425405
:raise DatabaseError: When a :code:`sqlite3.OperationalError` is still raised after
426406
:code:`DB_TRIES` tries.
427407
"""
@@ -433,13 +413,10 @@ def set_attack_flag(self, flag, attack_name):
433413
provided name is currently running. When it is 0, it is not.
434414
On a :code:`sqlite3.OperationalError` it will retry with a max of :code:`DB_TRIES` tries.
435415
Before it reties, it will sleep for :code:`DB_SLEEP_TIME` seconds.
436-
437416
:param flag: True for running to 1, False for running to 0
438417
:type flag: bool
439-
440418
:param attack_name: The name of the attack
441419
:type attack_name: str
442-
443420
:raise DatabaseError: When a :code:`sqlite3.OperationalError` is still raised after
444421
:code:`DB_TRIES` tries.
445422
"""
@@ -451,7 +428,6 @@ def stop_cache_update(self):
451428
def main_loop(self, sleep=0.5, test_break=False):
452429
"""
453430
The main loop of a PLC. In here all the controls will be applied.
454-
455431
:param sleep: (Default value = 0.5) Not used
456432
:param test_break: (Default value = False) used for unit testing, breaks the loop after one iteration
457433
"""
@@ -498,6 +474,7 @@ def main_loop(self, sleep=0.5, test_break=False):
498474
if test_break:
499475
break
500476

477+
501478
def is_valid_file(parser_instance, arg):
502479
if not os.path.exists(arg):
503480
parser_instance.error(arg + " does not exist")
@@ -516,4 +493,4 @@ def is_valid_file(parser_instance, arg):
516493
args = parser.parse_args()
517494
plc = GenericPLC(
518495
intermediate_yaml_path=Path(args.intermediate_yaml),
519-
yaml_index=args.index)
496+
yaml_index=args.index)

0 commit comments

Comments
 (0)