diff --git a/folding/base/simulation.py b/folding/base/simulation.py index 03c7f6b1..78536dc7 100644 --- a/folding/base/simulation.py +++ b/folding/base/simulation.py @@ -35,126 +35,304 @@ def timed(*args, **kwargs): class OpenMMSimulation(GenericSimulation): - @GenericSimulation.timeit - def create_simulation( - self, - pdb: app.PDBFile, - system_config: dict, - seed: int = None, - verbose=False, - initialize_with_solvent=False, - ) -> Tuple[app.Simulation, SimulationConfig]: - """Recreates a simulation object based on the provided parameters. + def __init__( + self, default_simulation_properties: dict = None, verbose: bool = False + ): + """Initialize the OpenMMSimulation object. - This method takes in a seed, state, and checkpoint file path to recreate a simulation object. Args: - seed (str): The seed for the random number generator. - system_config (dict): A dictionary containing the system configuration settings. - pdb (app.PDBFile): The PDB file used to initialize the simulation - initialize_with_solvent (bool): A boolean flag to determine if the simulation should be initialized with solvent. - - Returns: - Tuple[app.Simulation, SimulationConfig]: A tuple containing the recreated simulation object and the potentially altered system configuration in SystemConfig format. + default_simulation_properties (dict, optional): A dictionary of default simulation properties. Defaults to None. + verbose (bool, optional): A boolean flag to determine if the simulation should be verbose. Defaults to False. """ - setup_times = {} + self.setup_times = {} + + # Reference for DisablePmeStream: https://github.com/openmm/openmm/issues/3589 + self.default_simulation_properties = default_simulation_properties or { + "DeterministicForces": "true", + "Precision": "double", + "DisablePmeStream": "true", + } - start_time = time.time() - forcefield = app.ForceField(system_config["ff"], system_config["water"]) - setup_times["add_ff"] = time.time() - start_time + self.verbose = verbose + @GenericSimulation.timeit + def _setup_forcefield(self, ff: str, water: str): + forcefield = app.ForceField(ff, water) + return forcefield + + @GenericSimulation.timeit + def _setup_modeller(self, pdb: app.PDBFile): modeller = app.Modeller(pdb.topology, pdb.positions) + return modeller - if initialize_with_solvent: - start_time = time.time() - modeller.deleteWater() - setup_times["delete_water"] = time.time() - start_time + @GenericSimulation.timeit + def _initialize_fluid( + self, modeller: app.Modeller, forcefield: app.ForceField + ) -> app.Modeller: + modeller.deleteWater() + modeller.addHydrogens(forcefield) - start_time = time.time() - modeller.addHydrogens(forcefield) - setup_times["add_hydrogens"] = time.time() - start_time + return modeller - start_time = time.time() - modeller.addSolvent( - forcefield, - padding=system_config["box_padding"] * unit.nanometer, - boxShape=system_config["box"], - ) - setup_times["add_solvent"] = time.time() - start_time + @GenericSimulation.timeit + def _use_solvent( + self, + modeller: app.Modeller, + forcefield: app.ForceField, + box_padding: float, + box_shape: str, + ) -> app.Modeller: + modeller.addSolvent( + forcefield, + padding=box_padding * unit.nanometer, + boxShape=box_shape, + ) - modeller.addExtraParticles(forcefield) + return modeller - # Create the system - start_time = time.time() - # The assumption here is that the system_config cutoff MUST be given in nanometers + @GenericSimulation.timeit + def _add_extra_particles( + self, modeller: app.Modeller, forcefield: app.ForceField + ) -> app.Modeller: + modeller.addExtraParticles(forcefield) + return modeller + + @GenericSimulation.timeit + def _create_system( + self, + modeller: app.Modeller, + forcefield: app.ForceField, + cutoff: float, + constraints: str, + ) -> Tuple[mm.System, float]: threshold = ( - pdb.topology.getUnitCellDimensions().min().value_in_unit(mm.unit.nanometers) + modeller.topology.getUnitCellDimensions() + .min() + .value_in_unit(mm.unit.nanometers) ) / 2 - if system_config["cutoff"] > threshold: - nonbondedCutoff = threshold * mm.unit.nanometers - # set the attribute in the config for the pipeline. - system_config["cutoff"] = threshold + + nonbondedCutoff = min(cutoff, threshold) * mm.unit.nanometers + if cutoff > threshold: logger.debug( f"Nonbonded cutoff is greater than half the minimum box dimension. Setting nonbonded cutoff to {threshold} nm" ) - else: - nonbondedCutoff = system_config["cutoff"] * mm.unit.nanometers system = forcefield.createSystem( modeller.topology, nonbondedMethod=mm.app.NoCutoff, nonbondedCutoff=nonbondedCutoff, - constraints=system_config["constraints"], + constraints=constraints, ) - setup_times["create_system"] = time.time() - start_time + return system, cutoff - # Integrator settings + @GenericSimulation.timeit + def _setup_integrator( + self, temperature: float, friction: float, time_step_size: float, seed: int + ) -> mm.LangevinIntegrator: integrator = mm.LangevinIntegrator( - system_config["temperature"] * unit.kelvin, - system_config["friction"] / unit.picosecond, - system_config["time_step_size"] * unit.picoseconds, + temperature * unit.kelvin, + friction / unit.picosecond, + time_step_size * unit.picoseconds, ) - seed = seed if seed is not None else system_config["seed"] integrator.setRandomNumberSeed(seed) + return integrator - # Periodic boundary conditions - # pdb.topology.setPeriodicBoxVectors(system.getDefaultPeriodicBoxVectors()) - - # if state != "nvt": - # system.addForce( - # mm.MonteCarloBarostat( - # system_config["pressure"] * unit.bar, - # system_config["temperature"] * unit.kelvin, - # ) - # ) - + @GenericSimulation.timeit + def _setup_simulation( + self, + modeller: app.Modeller, + system: mm.System, + integrator: mm.LangevinIntegrator, + properties: dict, + ) -> app.Simulation: platform = mm.Platform.getPlatformByName("CUDA") - # Reference for DisablePmeStream: https://github.com/openmm/openmm/issues/3589 - properties = { - "DeterministicForces": "true", - "Precision": "double", - "DisablePmeStream": "true", - } - - start_time = time.time() simulation = mm.app.Simulation( modeller.topology, system, integrator, platform, properties ) - setup_times["create_simulation"] = time.time() - start_time - # Set initial positions - start_time = time.time() + # Set initial positions simulation.context.setPositions(modeller.positions) - setup_times["set_positions"] = time.time() - start_time + return simulation + + @GenericSimulation.timeit + def pipeline( + self, + pdb: app.PDBFile, + use_solvent: bool, + include_fluid: bool, + system_config: dict, + simulation_properties: dict, + seed: int, + ) -> Tuple[app.Simulation, dict]: + """Creates a simulation object with the given parameters. + + Args: + pdb (app.PDBFile): The PDB file to use for the simulation. + use_solvent (bool): Whether to use solvent for the simulation. + system_config (dict): The system configuration to use for the simulation. + simulation_properties (dict): The simulation properties to use for the simulation. + seed (int): The seed to use for the simulation. + + Returns: + Tuple[app.Simulation, dict]: + A tuple containing the simulation object and the system configuration. + """ + forcefield = self._setup_forcefield( + ff=system_config["ff"], water=system_config["water"] + ) + modeller = self._setup_modeller(pdb=pdb) + + if use_solvent or include_fluid: + modeller = self._initialize_fluid(modeller=modeller, forcefield=forcefield) + + if use_solvent: + modeller = self._use_solvent( + modeller=modeller, + forcefield=forcefield, + box_padding=system_config["box_padding"], + box_shape=system_config["box"], + ) + + modeller = self._add_extra_particles( + modeller=modeller, forcefield=forcefield + ) + + system, cutoff = self._create_system( + modeller=modeller, + forcefield=forcefield, + cutoff=system_config["cutoff"], + constraints=system_config["constraints"], + ) + # could change in the process of creating the system + system_config["cutoff"] = cutoff + + integrator = self._setup_integrator( + temperature=system_config["temperature"], + friction=system_config["friction"], + time_step_size=system_config["time_step_size"], + seed=seed, + ) + + simulation = self._setup_simulation( + modeller=modeller, + system=system, + integrator=integrator, + properties=simulation_properties, + ) + + return self.process_return(simulation, system_config) + + @GenericSimulation.timeit + def from_pipeline( + self, + pdb: app.PDBFile, + system_config: dict, + simulation_properties: dict, + seed: int, + ) -> Tuple[app.Simulation, dict]: + """Creates a simulation object from the given parameters. + + Args: + pdb (app.PDBFile): The PDB file to use for the simulation. + system_config (dict): The system configuration to use for the simulation. + simulation_properties (dict): The simulation properties to use for the simulation. + seed (int): The seed to use for the simulation. + """ + return self.pipeline( + pdb=pdb, + include_fluid=True, + use_solvent=False, + system_config=system_config, + simulation_properties=simulation_properties, + seed=seed, + ) + + @GenericSimulation.timeit + def from_solvent_pipeline( + self, + pdb: app.PDBFile, + system_config: dict, + simulation_properties: dict, + seed: int, + ) -> Tuple[app.Simulation, dict]: + """Creates a simulation object from the given parameters. + + Importantly, when the validator creates a simulation with solvent involved, + the miner instantiation pipeline becomes more complicated due to the presence of fluids. + + Therefore, if a pdb has been initialized with solvent from the validator, we must + skip all the steps that contain fluid, and NOT initialize the simulation with solvent information. + + Args: + pdb (app.PDBFile): The PDB file to use for the simulation. + system_config (dict): The system configuration to use for the simulation. + simulation_properties (dict): The simulation properties to use for the simulation. + seed (int): The seed to use for the simulation. + + Returns: + Tuple[app.Simulation, dict]: + A tuple containing the simulation object and the system configuration. + """ + return self.pipeline( + pdb=pdb, + include_fluid=False, + use_solvent=False, + system_config=system_config, + simulation_properties=simulation_properties, + seed=seed, + ) + + @GenericSimulation.timeit + def create_simulation( + self, + pdb: app.PDBFile, + system_config: dict, + seed: int, + with_solvent: bool = False, + ) -> Tuple[app.Simulation, SimulationConfig]: + """Creates a simulation object from the given parameters. + + This method is meant to be only called by the validator. + As a miner, you should use the `from_pipeline` or `from_solvent_pipeline` methods. + + Args: + pdb (app.PDBFile): The PDB file to use for the simulation. + with_solvent (bool): Whether to use solvent for the simulation. + system_config (dict): The system configuration to use for the simulation. + seed (int): The seed to use for the simulation. + + Returns: + Tuple[app.Simulation, SimulationConfig]: + A tuple containing the simulation object and the system configuration. + """ + + simulation, system_config = self.pipeline( + pdb=pdb, + include_fluid=True, + use_solvent=with_solvent, + system_config=system_config, + simulation_properties=self.default_simulation_properties, + seed=seed, + ) + + return self.process_return(simulation, system_config) + + def process_return( + self, simulation: app.Simulation, system_config: dict + ) -> Tuple[app.Simulation, SimulationConfig]: + """Process the return values from the pipeline method. + + I hate that we need to do this, but it's a necessary evil. It's because + I actually don't know when and where the below logic is needed, so I can't remove the filter. + + TODO: Remove this + """ # Converting the system config into a Dict[str,str] and ensure all values in system_config are of the correct type for k, v in system_config.items(): if not isinstance(v, (str, int, float, dict)): system_config[k] = str(v) - if verbose: - for key, t in setup_times: - logger.debug(f"Took {round(t, 3)} to {key}") - return simulation, SimulationConfig(**system_config) diff --git a/folding/registries/evaluation_registry.py b/folding/registries/evaluation_registry.py index 7cda171e..86e6df17 100644 --- a/folding/registries/evaluation_registry.py +++ b/folding/registries/evaluation_registry.py @@ -1,5 +1,5 @@ import os -from typing import Any, Dict +from typing import Any, Dict, Callable import numpy as np import pandas as pd @@ -26,6 +26,7 @@ def __init__( self, pdb_id: str, pdb_location: str, + with_solvent: bool, hotkey: str, state: str, seed: int, @@ -36,6 +37,7 @@ def __init__( **kwargs, ): self.pdb_id = pdb_id + self.with_solvent = with_solvent self.kwargs = kwargs self.md_simulator = OpenMMSimulation() self.pdb_location = pdb_location @@ -56,6 +58,14 @@ def __init__( # TODO: Refactor this method to be more modular, seperate getting energy and setting up simulations and files + def load_pipeline(self, with_solvent: bool) -> Callable: + """Load the appropriate pipeline based on the with_solvent flag.""" + return ( + self.md_simulator.from_solvent_pipeline + if with_solvent + else self.md_simulator.from_pipeline + ) + def process_md_output(self) -> bool: """Method to process molecular dynamics data from a miner and recreate the simulation. @@ -101,11 +111,14 @@ def process_md_output(self) -> bool: logger.info( f"Recreating miner {self.hotkey_alias} simulation in state: {self.current_state}" ) - simulation, self.system_config = self.md_simulator.create_simulation( + + pipeline: Callable = self.load_pipeline(with_solvent=self.with_solvent) + + simulation, self.system_config = pipeline( pdb=load_pdb_file(pdb_file=self.pdb_location), system_config=self.system_config.get_config(), seed=self.miner_seed, - initialize_with_solvent=False, + simulation_properties=self.md_simulator.default_simulation_properties, ) checkpoint_path = os.path.join( @@ -273,12 +286,16 @@ def is_run_valid(self): logger.info( f"Recreating simulation for {self.pdb_id} for state-based analysis..." ) - simulation, system_config = self.md_simulator.create_simulation( + + pipeline: Callable = self.load_pipeline(with_solvent=self.with_solvent) + + simulation, _ = pipeline( pdb=load_pdb_file(pdb_file=self.pdb_location), system_config=self.system_config.get_config(), seed=self.miner_seed, - initialize_with_solvent=False, + simulation_properties=self.md_simulator.default_simulation_properties, ) + simulation.loadState(self.state_xml_path) state_energies = [] for _ in range(self.steps_to_run // 10): @@ -297,12 +314,13 @@ def is_run_valid(self): raise ValidationError(message="state-gradient") # Reload in the checkpoint file and run the simulation for the same number of steps as the miner. - simulation, system_config = self.md_simulator.create_simulation( + simulation, _ = pipeline( pdb=load_pdb_file(pdb_file=self.pdb_location), system_config=self.system_config.get_config(), seed=self.miner_seed, - initialize_with_solvent=False, + simulation_properties=self.md_simulator.default_simulation_properties, ) + simulation.loadCheckpoint(self.checkpoint_path) current_state_logfile = os.path.join( @@ -375,9 +393,9 @@ def is_run_valid(self): return True, check_energies.tolist(), self.miner_energies.tolist(), "valid" - except ValidationError as E: - logger.warning(f"{E}") - return False, [], [], E.message + except ValidationError as e: + logger.warning(f"{e}") + return False, [], [], e.message def evaluate(self) -> bool: """Checks to see if the miner's data can be passed for validation""" diff --git a/folding/store.py b/folding/store.py index d7a7ae3c..2a7366a3 100644 --- a/folding/store.py +++ b/folding/store.py @@ -5,12 +5,11 @@ import sqlite3 import requests from queue import Queue -from typing import Dict, List +from typing import List from dotenv import load_dotenv from datetime import datetime -import numpy as np import pandas as pd from atom.epistula.epistula import Epistula diff --git a/folding/validators/forward.py b/folding/validators/forward.py index 3965d31c..0da04e78 100644 --- a/folding/validators/forward.py +++ b/folding/validators/forward.py @@ -1,19 +1,19 @@ import time -import traceback +import asyncio import numpy as np from tqdm import tqdm -import bittensor as bt from pathlib import Path from typing import List, Dict from collections import defaultdict from async_timeout import timeout + +from folding.store import Job from folding.utils.s3_utils import upload_to_s3 from folding.validators.protein import Protein from folding.utils.logging import log_event from folding.validators.reward import get_energies from folding.protocol import PingSynapse, JobSubmissionSynapse, ParticipationSynapse -import asyncio from folding.utils.openmm_forcefields import FORCEFIELD_REGISTRY from folding.validators.hyperparameters import HyperParameters from folding.utils.ops import ( @@ -72,10 +72,13 @@ async def run_step( job_type: str, job_id: str, best_submitted_energy: float = None, + job: Job = None, ) -> Dict: - start_time = time.time() + """Run the step for a given job.""" + start_time = time.time() if protein is None: + logger.warning("No protein found for job_id: {job_id}") event = { "block": self.block, "step_length": time.time() - start_time, @@ -89,9 +92,6 @@ async def run_step( # Get the list of uids to query for this step. axons = [self.metagraph.axons[uid] for uid in participating_uids] - system_config = protein.system_config.to_dict() - system_config["seed"] = None # We don't want to pass the seed to miners. - synapse = JobSubmissionSynapse( pdb_id=protein.pdb_id, job_id=job_id, @@ -120,6 +120,7 @@ async def run_step( } energies, energy_event = get_energies( + job=job, protein=protein, responses=responses, uids=participating_uids, @@ -272,9 +273,14 @@ async def try_prepare_md_challenge(self, config, pdb_id: str) -> Dict: **hps, ) + generation_kwargs = { + "verbose": False, + "with_solvent": np.random.rand() < 0.5, + } + try: async with timeout(300): - await protein.setup_simulation() + await protein.setup_simulation(generation_kwargs=generation_kwargs) if protein.init_energy > 0: raise ValueError( @@ -311,6 +317,7 @@ async def try_prepare_md_challenge(self, config, pdb_id: str) -> Dict: event["s3_links"] = { "testing": "testing" } # overwritten below if s3 logging is on. + event.update(generation_kwargs) if "validator_search_status" not in event: if not config.s3.off: diff --git a/folding/validators/protein.py b/folding/validators/protein.py index 26d13ff7..64000238 100644 --- a/folding/validators/protein.py +++ b/folding/validators/protein.py @@ -206,8 +206,8 @@ def read_and_return_files(self, filenames: List) -> Dict: continue return files_to_return - async def setup_simulation(self): - """forward method defines the following: + async def setup_simulation(self, generation_kwargs: Dict = None): + """setup_simulation method defines the following: 1. gather the pdb_id and setup the namings. 2. setup the pdb directory and download the pdb file if it doesn't exist. 3. check for missing files and generate the input files if they are missing. @@ -220,7 +220,7 @@ async def setup_simulation(self): ) await self.setup_pdb_directory() - await self.generate_input_files() + await self.generate_input_files(generation_kwargs=generation_kwargs) # Create a validator directory to store the files check_if_directory_exists(output_directory=self.validator_directory) @@ -282,7 +282,7 @@ async def fix_pdb_file(self): # Function to generate the OpenMM simulation state. @OpenMMSimulation.timeit - async def generate_input_files(self): + async def generate_input_files(self, generation_kwargs: Dict = None): """Generate_input_files method defines the following: 1. Load the pdb file and create the simulation object. 2. Minimize the energy of the system. @@ -293,24 +293,25 @@ async def generate_input_files(self): logger.info(f"Loading PDB file from {self.pdb_location}") # Create simulation using absolute paths + system_config = self.system_config.get_config() self.simulation, self.system_config = await asyncio.to_thread( self.create_simulation, - load_pdb_file(pdb_file=self.pdb_location), - self.system_config.get_config(), - None, # seed - False, # verbose - True, # initialize_with_solvent + pdb=load_pdb_file(pdb_file=self.pdb_location), + system_config=system_config, + with_solvent=generation_kwargs["with_solvent"], + seed=system_config["seed"], ) # Get the altered pdb and write. - state = self.simulation.context.getState(getPositions=True) - positions = state.getPositions() - write_pdb_file( - pdb_location_path=self.pdb_location, - topology=self.simulation.topology, - positions=positions, - suffix="_before_solvent", - ) + if generation_kwargs["with_solvent"]: + state = self.simulation.context.getState(getPositions=True) + positions = state.getPositions() + write_pdb_file( + pdb_location_path=self.pdb_location, + topology=self.simulation.topology, + positions=positions, + suffix="_before_solvent", + ) # load in information from the velm memory velm = create_velm(simulation=self.simulation) diff --git a/folding/validators/reward.py b/folding/validators/reward.py index df343b07..3f6dbbbb 100644 --- a/folding/validators/reward.py +++ b/folding/validators/reward.py @@ -3,6 +3,7 @@ import numpy as np +from folding.store import Job from folding.utils.logger import logger from folding.utils import constants as c from folding.validators.protein import Protein @@ -17,6 +18,7 @@ def evaluate( responses: List[JobSubmissionSynapse], uids: List[int], job_type: str, + job: Job, ): reported_energies = np.zeros(len(uids)) evaluators = [None] * len(uids) @@ -37,6 +39,7 @@ def evaluate( evaluator: BaseEvaluator = EVALUATION_REGISTRY[job_type]( pdb_id=protein.pdb_id, pdb_location=protein.pdb_location, + with_solvent=job.event["with_solvent"], hotkey=resp.axon.hotkey, state=resp.miner_state, seed=resp.miner_seed, @@ -69,6 +72,7 @@ def evaluate( def get_energies( + job: Job, protein: Protein, responses: List[JobSubmissionSynapse], uids: List[int], @@ -104,7 +108,7 @@ def get_energies( # Get initial evaluations reported_energies, evaluators, seed, best_cpt, process_md_output_time = evaluate( - protein, responses, uids, job_type + protein=protein, responses=responses, uids=uids, job_type=job_type, job=job ) # Sort all lists by reported energy diff --git a/neurons/validator.py b/neurons/validator.py index 69941b69..f9814442 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -133,6 +133,7 @@ async def forward(self, job: Job) -> dict: job_id=job.job_id, best_submitted_energy=job.best_loss, job_type=job.job_type, + job=job, ) async def ping_all_miners( @@ -205,15 +206,12 @@ async def get_valid_uids(self) -> List[int]: return valid_uids - async def add_job( - self, job_event: dict[str, Any], uids: List[int] = None, protein: Protein = None - ) -> bool: + async def add_job(self, job_event: dict[str, Any], protein: Protein = None) -> bool: """Add a job to the job store while also checking to see what uids can be assigned to the job. If uids are not provided, then the function will sample random uids from the network. Args: job_event (dict[str, Any]): parameters that are needed to make the job. - uids (List[int], optional): List of uids that can be assigned to the job. Defaults to None. """ start_time = time.time() @@ -233,11 +231,15 @@ async def add_job( job_event["s3_links"] = { "testing": "testing" } # overwritten below if s3 logging is on. + generation_kwargs = { + "verbose": job_event.get("verbose", False), + "with_solvent": job_event.get("with_solvent", False), + } async with timeout(300): logger.info( f"setup_simulation for organic query: {job_event['pdb_id']}" ) - await protein.setup_simulation() + await protein.setup_simulation(generation_kwargs=generation_kwargs) logger.success( f"✅✅ organic {job_event['pdb_id']} simulation ran successfully! ✅✅" ) @@ -284,7 +286,6 @@ async def add_job( raise ValueError("job_id is None") logger.success("Job was uploaded successfully!") - self.last_time_created_jobs = datetime.now() # TODO: return job_id