#!/usr/bin/env python
"""
COPYRIGHT AND LICENSE
@author John D. Chodera <jchodera@gmail.com>
All code in this repository is released under the MIT License.
This program is free software: you can redistribute it and/or modify it under
the terms of the MIT License.
This program is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE. See the MIT License for more details.
You should have received a copy of the MIT License along with this program.
"""
# =============================================================================
# MODULE DOCSTRING
# =============================================================================
"""Markov chain Monte Carlo simulation framework.
This module provides a framework for equilibrium sampling from a given
thermodynamic state of a biomolecule using a Markov chain Monte Carlo scheme.
It currently offer supports for
* Langevin dynamics (assumed to be free of integration error; use at your own risk]),
* hybrid Monte Carlo,
* generalized hybrid Monte Carlo, and
* Monte Carlo barostat moves,
which can be combined through the SequenceMove and WeightedMove classes.
By default, the MCMCMoves use the fastest OpenMM platform available and a
shared global ContextCache that minimizes the number of OpenMM. The examples
below show how to configure these aspects.
References
----------
Jun S. Liu. Monte Carlo Strategies in Scientific Computing. Springer, 2008.
Examples
--------
>>> from openmm import unit
>>> from openmmtools import testsystems, cache
>>> from openmmtools.states import ThermodynamicState, SamplerState
Create the initial state (thermodynamic and microscopic) for an alanine
dipeptide system in vacuum.
>>> test = testsystems.AlanineDipeptideVacuum()
>>> thermodynamic_state = ThermodynamicState(system=test.system,
... temperature=298*unit.kelvin)
>>> sampler_state = SamplerState(positions=test.positions)
Create an MCMC move to perform at every iteration of the simulation, and
initialize a sampler instance.
>>> ghmc_move = GHMCMove(timestep=1.0*unit.femtosecond, n_steps=50)
>>> langevin_move = LangevinDynamicsMove(n_steps=10)
>>> sampler = MCMCSampler(thermodynamic_state, sampler_state, move=ghmc_move)
You can combine them to form a sequence of moves
>>> sequence_move = SequenceMove([ghmc_move, langevin_move])
>>> sampler = MCMCSampler(thermodynamic_state, sampler_state, move=sequence_move)
or create a move that selects one of them at random with given probability
at each iteration.
>>> weighted_move = WeightedMove([(ghmc_move, 0.5), (langevin_move, 0.5)])
>>> sampler = MCMCSampler(thermodynamic_state, sampler_state, move=weighted_move)
By default the MCMCMove use a global ContextCache that creates Context on the
fastest available OpenMM platform. You can configure the default platform to use
before starting the simulation
>>> reference_platform = openmm.Platform.getPlatformByName('Reference')
>>> cache.global_context_cache.platform = reference_platform
>>> cache.global_context_cache.time_to_live = 10 # number of read/write operations
Minimize and run the simulation for few iterations.
>>> sampler.minimize()
>>> sampler.run(n_iterations=2)
If you don't want to use a global cache, you can create local ones.
>>> local_cache1 = cache.ContextCache(capacity=5, time_to_live=50)
>>> local_cache2 = cache.ContextCache(platform=reference_platform, capacity=1)
>>> sequence_move = SequenceMove([HMCMove(), LangevinDynamicsMove()],
... context_cache=local_cache1)
>>> ghmc_move = GHMCMove(context_cache=local_cache2)
If you don't want to cache Context at all but create one every time, you can use
the DummyCache.
>>> dummy_cache = cache.DummyContextCache(platform=reference_platform)
>>> ghmc_move = GHMCMove(context_cache=dummy_cache)
"""
# =============================================================================
# GLOBAL IMPORTS
# =============================================================================
import abc
import copy
import logging
import os
import warnings
import numpy as np
try:
import openmm
from openmm import unit
except ImportError: # OpenMM < 7.6
from simtk import openmm, unit
from openmmtools import integrators, cache, utils
from openmmtools.utils import SubhookedABCMeta, Timer
logger = logging.getLogger(__name__)
# =============================================================================
# MODULE CONSTANTS
# =============================================================================
_RANDOM_SEED_MAX = np.iinfo(np.int32).max # maximum random number seed value
# =============================================================================
# MARKOV CHAIN MOVE ABSTRACTION
# =============================================================================
[docs]
class MCMCMove(SubhookedABCMeta):
"""Markov chain Monte Carlo (MCMC) move abstraction.
To create a new MCMCMove class compatible with this framework, you
will have to implement this abstraction. The instance can keep internal
statistics such as number of attempted moves and acceptance rates.
"""
[docs]
def __init__(self, context_cache=None):
if context_cache is not None:
logger.warning("Ignoring context_cache argument. The MCMCMove.context_cache field has been deprecated."
" The API now requires context_cache be passed to apply(). Please update your code.'")
@abc.abstractmethod
def apply(self, thermodynamic_state, sampler_state, context_cache=None):
"""Apply the MCMC move.
Depending on the implementation, this can alter the thermodynamic
state and/or the sampler state.
Parameters
----------
thermodynamic_state : openmmtools.states.ThermodynamicState
The initial thermodynamic state before applying the move. This
may be modified depending on the implementation.
sampler_state : openmmtools.states.SamplerState
The initial sampler state before applying the move. This may
be modified depending on the implementation.
context_cache : opemmtools.cache.ContextCache
Context cache to be used in the propagation of the mcmc move. If None,
it will use the global context cache.
"""
pass
@property
def context_cache(self):
logger.warning('The MCMCMove.context_cache field has been deprecated. The API now requires context_cache '
'be passed to apply(). Please update your code.')
from openmmtools import cache
return cache.global_context_cache
@staticmethod
def _get_context_cache(context_cache_input):
"""
Method to return context to be used for move propagation.
.. note:: centralized API point to deal with context cache behavior.
Parameters
----------
context_cache_input : openmmtools.cache.ContextCache or None
Context cache to be used in the propagation of the mcmc move. If None,
it will create a new unlimited ContextCache object.
Returns
-------
context_cache : openmmtools.cache.ContextCache
Context cache object to be used for propagation.
"""
if context_cache_input is None:
# Default behavior, gobal Context Cache
context_cache = cache.global_context_cache
elif isinstance(context_cache_input, cache.ContextCache):
context_cache = context_cache_input
else:
raise ValueError("Context cache input is not a valid ContextCache or None type.")
return context_cache
# =============================================================================
# MARKOV CHAIN MONTE CARLO SAMPLER
# =============================================================================
[docs]
class MCMCSampler(object):
"""Basic Markov chain Monte Carlo sampler.
Parameters
----------
thermodynamic_state : openmmtools.states.ThermodynamicState
Initial thermodynamic state.
sampler_state : openmmtools.states.SamplerState
Initial sampler state.
move_set : container of MarkovChainMonteCarloMove objects
Moves to attempt during MCMC run. If list or tuple, will run all moves each
iteration in specified sequence (e.g. [move1, move2, move3]). If dict, will
use specified unnormalized weights (e.g. { move1 : 0.3, move2 : 0.5, move3, 0.9 })
Attributes
----------
thermodynamic_state : openmmtools.states.ThermodynamicState
Current thermodynamic state.
sampler_state : openmmtools.states.SamplerState
Current sampler state.
move_set : container of MarkovChainMonteCarloMove objects
Moves to attempt during MCMC run. If list or tuple, will run all moves each
iteration in specified sequence (e.g. [move1, move2, move3]). If dict, will
use specified unnormalized weights (e.g. { move1 : 0.3, move2 : 0.5, move3, 0.9 })
Examples
--------
>>> import numpy as np
>>> from openmm import unit
>>> from openmmtools import testsystems
>>> from openmmtools.states import ThermodynamicState, SamplerState
Create and run an alanine dipeptide simulation with a weighted move.
>>> test = testsystems.AlanineDipeptideVacuum()
>>> thermodynamic_state = ThermodynamicState(system=test.system,
... temperature=298*unit.kelvin)
>>> sampler_state = SamplerState(positions=test.positions)
>>> # Create a move set specifying probabilities fo each type of move.
>>> move = WeightedMove([(HMCMove(n_steps=10), 0.5), (LangevinDynamicsMove(n_steps=10), 0.5)])
>>> # Create an MCMC sampler instance and run 10 iterations of the simulation.
>>> sampler = MCMCSampler(thermodynamic_state, sampler_state, move=move)
>>> sampler.run(n_iterations=2)
>>> np.allclose(sampler.sampler_state.positions, test.positions)
False
NPT ensemble simulation of a Lennard Jones fluid with a sequence of moves.
>>> test = testsystems.LennardJonesFluid(nparticles=200)
>>> thermodynamic_state = ThermodynamicState(system=test.system, temperature=298*unit.kelvin,
... pressure=1*unit.atmospheres)
>>> sampler_state = SamplerState(positions=test.positions)
>>> # Create a move set that includes a Monte Carlo barostat move.
>>> move = SequenceMove([GHMCMove(n_steps=50), MonteCarloBarostatMove(n_attempts=5)])
>>> # Create an MCMC sampler instance and run 5 iterations of the simulation.
>>> sampler = MCMCSampler(thermodynamic_state, sampler_state, move=move)
>>> sampler.run(n_iterations=2)
>>> np.allclose(sampler.sampler_state.positions, test.positions)
False
"""
[docs]
def __init__(self, thermodynamic_state, sampler_state, move):
# Make a deep copy of the state so that initial state is unchanged.
self.thermodynamic_state = copy.deepcopy(thermodynamic_state)
self.sampler_state = copy.deepcopy(sampler_state)
self.move = move
def run(self, n_iterations=1, context_cache=None):
"""
Run the sampler for a specified number of iterations.
Parameters
----------
n_iterations : int
Number of iterations of the sampler to run.
context_cache : openmmtools.cache.ContextCache or None, optional, default None
Context cache to be used for move/integrator propagation. If None, global context cache will be used.
"""
# Handle context cache, fall back to global if None.
if context_cache is None:
context_cache = cache.global_context_cache
# Apply move for n_iterations.
for iteration in range(n_iterations):
self.move.apply(self.thermodynamic_state, self.sampler_state, context_cache=context_cache)
def minimize(self, tolerance=1.0*unit.kilocalories_per_mole/unit.angstroms,
max_iterations=100, context_cache=None):
"""Minimize the current configuration.
Parameters
----------
tolerance : openmm.unit.Quantity, optional
Tolerance to use for minimization termination criterion (units of
energy/(mole*distance), default is 1*kilocalories_per_mole/angstroms).
max_iterations : int, optional
Maximum number of iterations to use for minimization. If 0, the minimization
will continue until convergence (default is 100).
context_cache : openmmtools.cache.ContextCache, optional
The ContextCache to use for Context creation. If None, the global cache
openmmtools.cache.global_context_cache is used (default is None).
"""
if context_cache is None:
context_cache = cache.global_context_cache
timer = Timer()
# Use LocalEnergyMinimizer
timer.start("Context request")
integrator = openmm.VerletIntegrator(1.0*unit.femtosecond)
context, integrator = context_cache.get_context(self.thermodynamic_state, integrator)
self.sampler_state.apply_to_context(context)
logger.debug("LocalEnergyMinimizer: platform is %s" % context.getPlatform().getName())
logger.debug("Minimizing with tolerance %s and %d max. iterations." % (tolerance, max_iterations))
timer.stop("Context request")
timer.start("LocalEnergyMinimizer minimize")
openmm.LocalEnergyMinimizer.minimize(context, tolerance, max_iterations)
timer.stop("LocalEnergyMinimizer minimize")
# Retrieve data.
self.sampler_state.update_from_context(context)
#timer.report_timing()
# =============================================================================
# MCMC MOVE CONTAINERS
# =============================================================================
[docs]
class SequenceMove(MCMCMove):
"""A sequence of MCMC moves.
Parameters
----------
move_list : list-like of MCMCMove
The sequence of MCMC moves to apply.
Attributes
----------
move_list : list of MCMCMove
The sequence of MCMC moves to apply.
Examples
--------
NPT ensemble simulation of a Lennard Jones fluid with a sequence of moves.
>>> import numpy as np
>>> from openmm import unit
>>> from openmmtools import testsystems
>>> from openmmtools.states import ThermodynamicState, SamplerState
>>> test = testsystems.LennardJonesFluid(nparticles=200)
>>> thermodynamic_state = ThermodynamicState(system=test.system, temperature=298*unit.kelvin,
... pressure=1*unit.atmospheres)
>>> sampler_state = SamplerState(positions=test.positions)
>>> # Create a move set that includes a Monte Carlo barostat move.
>>> move = SequenceMove([GHMCMove(n_steps=50), MonteCarloBarostatMove(n_attempts=5)])
>>> # Create an MCMC sampler instance and run 5 iterations of the simulation.
>>> sampler = MCMCSampler(thermodynamic_state, sampler_state, move=move)
>>> sampler.run(n_iterations=2)
>>> np.allclose(sampler.sampler_state.positions, test.positions)
False
"""
[docs]
def __init__(self, move_list, **kwargs):
super(SequenceMove, self).__init__(**kwargs)
self.move_list = list(move_list)
@property
def statistics(self):
"""The statistics of all moves as a list of dictionaries."""
stats = [None for _ in range(len(self.move_list))]
for i, move in enumerate(self.move_list):
try:
stats[i] = move.statistics
except AttributeError:
stats[i] = {}
return stats
@statistics.setter
def statistics(self, value):
for i, move in enumerate(self.move_list):
if hasattr(move, 'statistics'):
move.statistics = value[i]
def apply(self, thermodynamic_state, sampler_state, context_cache=None):
"""Apply the sequence of MCMC move in order.
Parameters
----------
thermodynamic_state : openmmtools.states.ThermodynamicState
The thermodynamic state to use to propagate dynamics.
sampler_state : openmmtools.states.SamplerState
The sampler state to apply the move to.
context_cache : opemmtools.cache.ContextCache, optional
"""
# Get context cache to be used
local_context_cache = self._get_context_cache(context_cache)
for move in self.move_list:
# Apply each move with the specified local context
move.apply(thermodynamic_state, sampler_state, context_cache=local_context_cache)
def __str__(self):
return str(self.move_list)
def __iter__(self):
return iter(self.move_list)
def __getstate__(self):
serialized_moves = [utils.serialize(move) for move in self.move_list]
return dict(move_list=serialized_moves)
def __setstate__(self, serialization):
serialized_moves = serialization['move_list']
self.move_list = [utils.deserialize(move) for move in serialized_moves]
[docs]
class WeightedMove(MCMCMove):
"""Pick an MCMC move out of set with given probability at each iteration.
Parameters
----------
move_set : list of tuples (MCMCMove, float_
Each tuple associate an MCMCMoves to its probability of being
selected on apply().
Attributes
----------
move_set
Examples
--------
Create and run an alanine dipeptide simulation with a weighted move.
>>> import numpy as np
>>> from openmm import unit
>>> from openmmtools import testsystems
>>> from openmmtools.states import ThermodynamicState, SamplerState
>>> test = testsystems.AlanineDipeptideVacuum()
>>> thermodynamic_state = ThermodynamicState(system=test.system,
... temperature=298*unit.kelvin)
>>> sampler_state = SamplerState(positions=test.positions)
>>> # Create a move set specifying probabilities fo each type of move.
>>> move = WeightedMove([(HMCMove(n_steps=10), 0.5),
... (LangevinDynamicsMove(n_steps=10), 0.5)])
>>> # Create an MCMC sampler instance and run 10 iterations of the simulation.
>>> sampler = MCMCSampler(thermodynamic_state, sampler_state, move=move)
>>> sampler.run(n_iterations=2)
>>> np.allclose(sampler.sampler_state.positions, test.positions)
False
"""
[docs]
def __init__(self, move_set, **kwargs):
super(WeightedMove, self).__init__(**kwargs)
self.move_set = move_set
@property
def statistics(self):
"""The statistics of all moves as a list of dictionaries."""
stats = [None for _ in range(len(self.move_set))]
for i, (move, weight) in enumerate(self.move_set):
try:
stats[i] = move.statistics
except AttributeError:
stats[i] = {}
return stats
@statistics.setter
def statistics(self, value):
for i, (move, weight) in enumerate(self.move_set):
if hasattr(move, 'statistics'):
move.statistics = value[i]
def apply(self, thermodynamic_state, sampler_state, context_cache=None):
"""Apply one of the MCMC moves in the set to the state.
The probability that a move is picked is given by its weight.
Parameters
----------
thermodynamic_state : openmmtools.states.ThermodynamicState
The thermodynamic state to use to propagate dynamics.
sampler_state : openmmtools.states.SamplerState
The sampler state to apply the move to.
context_cache : openmmtools.cache.ContextCache
The ContextCache to use for Context creation.
"""
moves, weights = zip(*self.move_set)
move = np.random.choice(moves, p=weights)
# Get context cache to be used
local_context_cache = self._get_context_cache(context_cache)
move.apply(thermodynamic_state, sampler_state, context_cache=local_context_cache)
def __getstate__(self):
serialized_moves = [utils.serialize(move) for move, _ in self.move_set]
weights = [weight for _, weight in self.move_set]
return dict(moves=serialized_moves, weights=weights)
def __setstate__(self, serialization):
serialized_moves = serialization['moves']
weights = serialization['weights']
self.move_set = [(utils.deserialize(move), weight)
for move, weight in zip(serialized_moves, weights)]
def __str__(self):
return str(self.move_set)
def __iter__(self):
return self.move_set
# =============================================================================
# INTEGRATOR MCMC MOVE BASE CLASS
# =============================================================================
class IntegratorMoveError(Exception):
"""An error raised when NaN is found after applying a move.
Parameters
----------
message : str
A description of the error.
move : MCMCMove
The MCMCMove that raised the error.
context : openmm.Context, optional
The context after the integration.
"""
def __init__(self, message, move, context=None):
super(IntegratorMoveError, self).__init__(message)
self.move = move
self.context = context
def serialize_error(self, path_files_prefix):
"""Serializes and save the state of the simulation causing the error.
This creates several files:
- path_files_prefix-move.yaml
A YAML serialization of the MCMCMove.
- path_files_prefix-system.xml
The serialized system in the Context.
- path_files_prefix-integrator.xml
The serialized integrator in the Context.
- path_files_prefix-state.xml
The serialized OpenMM State object before the integration.
Parameters
----------
path_files_prefix : str
The prefix (including eventually a directory) for the files. Existing
files will be overwritten.
"""
directory_path = os.path.dirname(path_files_prefix)
if not os.path.exists(directory_path):
os.makedirs(directory_path)
# Serialize MCMCMove.
import json
# Create class to encode quantities
class quantity_encoder(json.JSONEncoder):
def default(self, o):
if type(o) in [unit.quantity.Quantity, unit.unit.Unit]:
return str(o)
super(quantity_encoder, self).default(o)
serialized_move = utils.serialize(self.move)
with open(os.path.join(path_files_prefix + '-move.json'), 'w') as f:
json.dump(serialized_move, f, cls=quantity_encoder)
# Serialize Context.
openmm_state = self.context.getState(getPositions=True, getVelocities=True,
getEnergy=True, getForces=True, getParameters=True)
to_serialize = [self.context.getSystem(), self.context.getIntegrator(), openmm_state]
for name, openmm_object in zip(['system', 'integrator', 'state'], to_serialize):
serialized_object = openmm.XmlSerializer.serialize(openmm_object)
with open(os.path.join(path_files_prefix + '-' + name + '.xml'), 'w') as f:
f.write(serialized_object)
[docs]
class BaseIntegratorMove(MCMCMove):
"""A general MCMC move that applies an integrator.
This class is intended to be inherited by MCMCMoves that need to integrate
the system. The child class has to implement the _get_integrator method.
You can decide to override _before_integration() and _after_integration()
to execute some code at specific points of the workflow, for example to
read data from the Context before the it is destroyed.
Parameters
----------
n_steps : int
The number of integration steps to take each time the move is applied.
reassign_velocities : bool, optional
If True, the velocities will be reassigned from the Maxwell-Boltzmann
distribution at the beginning of the move (default is False).
n_restart_attempts : int, optional
When greater than 0, if after the integration there are NaNs in energies,
the move will restart. When the integrator has a random component, this
may help recovering. On the last attempt, the ``Context`` is
re-initialized in a slower process, but better than the simulation
crashing. An IntegratorMoveError is raised after the given number of
attempts if there are still NaNs.
Attributes
----------
n_steps : int
reassign_velocities : bool
n_restart_attempts : int or None
Examples
--------
Create a VerletIntegratorMove class.
>>> from openmmtools import testsystems, states
>>> from openmm import VerletIntegrator
>>> class VerletMove(BaseIntegratorMove):
... def __init__(self, timestep, n_steps, **kwargs):
... super(VerletMove, self).__init__(n_steps, **kwargs)
... self.timestep = timestep
... def _get_integrator(self, thermodynamic_state):
... return VerletIntegrator(self.timestep)
... def _before_integration(self, context, thermodynamic_state):
... print('Setting velocities')
... context.setVelocitiesToTemperature(thermodynamic_state.temperature)
... def _after_integration(self, context, thermodynamic_state):
... print('Reading statistics')
...
>>> alanine = testsystems.AlanineDipeptideVacuum()
>>> sampler_state = states.SamplerState(alanine.positions)
>>> thermodynamic_state = states.ThermodynamicState(alanine.system, 300*unit.kelvin)
>>> move = VerletMove(timestep=1.0*unit.femtosecond, n_steps=2)
>>> move.apply(thermodynamic_state,sampler_state,context_cache=context_cache)
Setting velocities
Reading statistics
"""
[docs]
def __init__(self, n_steps, reassign_velocities=False, n_restart_attempts=4, **kwargs):
super(BaseIntegratorMove, self).__init__(**kwargs)
self.n_steps = n_steps
self.reassign_velocities = reassign_velocities
self.n_restart_attempts = n_restart_attempts
def apply(self, thermodynamic_state, sampler_state, context_cache=None):
"""Propagate the state through the integrator.
This updates the SamplerState after the integration. It also logs
benchmarking information through the utils.Timer class.
Parameters
----------
thermodynamic_state : openmmtools.states.ThermodynamicState
The thermodynamic state to use to propagate dynamics.
sampler_state : openmmtools.states.SamplerState
The sampler state to apply the move to. This is modified.
context_cache : openmmtools.cache.ContextCache
Context cache to be used during propagation with the integrator.
See Also
--------
openmmtools.utils.Timer
"""
move_name = self.__class__.__name__ # shortcut
timer = Timer()
# Create integrator.
integrator = self._get_integrator(thermodynamic_state)
# Get context cache
local_context_cache = self._get_context_cache(context_cache)
# Create context.
timer.start("{}: Context request".format(move_name))
# TODO: Is this still needed now that we are specifying the context?
context, integrator = local_context_cache.get_context(thermodynamic_state, integrator)
timer.stop("{}: Context request".format(move_name))
# inform of platform used in current context
logger.debug(f"{move_name}: Integrator using {context.getPlatform().getName()} platform.")
# Perform the integration.
for attempt_counter in range(self.n_restart_attempts + 1):
# If we reassign velocities, we can ignore the ones in sampler_state.
sampler_state.apply_to_context(context, ignore_velocities=self.reassign_velocities)
if self.reassign_velocities:
context.setVelocitiesToTemperature(thermodynamic_state.temperature)
# Subclasses may implement _before_integration().
self._before_integration(context, thermodynamic_state)
try:
# Run dynamics.
timer.start("{}: step({})".format(move_name, self.n_steps))
integrator.step(self.n_steps)
except Exception as e:
# Catches particle positions becoming nan during integration.
# Return the exception message as a warning
warnings.warn(str(e))
restart = True
else:
timer.stop("{}: step({})".format(move_name, self.n_steps))
# We get also velocities here even if we don't need them because we
# will recycle this State to update the sampler state object. This
# way we won't need a second call to Context.getState().
context_state = context.getState(getPositions=True, getVelocities=True, getEnergy=True,
enforcePeriodicBox=thermodynamic_state.is_periodic)
# Check for NaNs in energies.
potential_energy = context_state.getPotentialEnergy()
restart = np.isnan(potential_energy.value_in_unit(potential_energy.unit))
# TODO: Probably refactor this whole thing to do simple restart
# Restart the move if we found NaNs.
if restart:
err_msg = ('Potential energy is NaN after {} attempts of integration '
'with move {}'.format(attempt_counter, self.__class__.__name__))
# If we are on our last chance before crash, try to re-initialize context
if attempt_counter == self.n_restart_attempts - 1:
logger.error(err_msg + ' Trying to reinitialize Context as a last-resort restart attempt...')
context.reinitialize()
sampler_state.apply_to_context(context)
thermodynamic_state.apply_to_context(context)
# If we have hit the number of restart attempts, raise an exception.
elif attempt_counter == self.n_restart_attempts:
# Restore the context to the state right before the integration.
sampler_state.apply_to_context(context)
logger.error(err_msg)
raise IntegratorMoveError(err_msg, self, context)
else:
logger.warning(err_msg + ' Attempting a restart...')
else:
break
# Subclasses can read here info from the context to update internal statistics.
self._after_integration(context, thermodynamic_state)
# Updated sampler state.
timer.start("{}: update sampler state".format(move_name))
# This is an optimization around the fact that Collective Variables are not a part of the State,
# but are a part of the Context. We do this call twice to minimize duplicating information fetched from
# the State.
# Update everything but the collective variables from the State object
sampler_state.update_from_context(context_state, ignore_collective_variables=True)
# Update only the collective variables from the Context
sampler_state.update_from_context(context, ignore_positions=True, ignore_velocities=True,
ignore_collective_variables=False)
timer.stop("{}: update sampler state".format(move_name))
#timer.report_timing()
@abc.abstractmethod
def _get_integrator(self, thermodynamic_state):
"""Create a new instance of the integrator to apply."""
pass
def _before_integration(self, context, thermodynamic_state):
"""Execute code after Context creation and before integration."""
pass
def _after_integration(self, context, thermodynamic_state):
"""Execute code after integration.
After this point there are no guarantees that the Context will still
exist, together with its bound integrator and system.
"""
pass
def __getstate__(self):
return dict(n_steps=self.n_steps,
reassign_velocities=self.reassign_velocities,
n_restart_attempts=self.n_restart_attempts)
def __setstate__(self, serialization):
self.n_steps = serialization['n_steps']
self.reassign_velocities = serialization['reassign_velocities']
self.n_restart_attempts = serialization['n_restart_attempts']
# =============================================================================
# METROPOLIZED MOVE BASE CLASS
# =============================================================================
[docs]
class MetropolizedMove(MCMCMove):
"""A base class for metropolized moves.
This class is intended to be inherited by MCMCMoves that needs to
accept or reject a proposed move with a Metropolis criterion. Only
the proposal needs to be specified by subclasses through the method
_propose_positions().
Parameters
----------
atom_subset : slice or list of int, optional
If specified, the move is applied only to those atoms specified by these
indices. If None, the move is applied to all atoms (default is None).
Attributes
----------
n_accepted : int
The number of proposals accepted.
n_proposed : int
The total number of attempted moves.
atom_subset
Examples
--------
>>> from openmm import unit
>>> from openmmtools import testsystems, states
>>> class AddOneVector(MetropolizedMove):
... def __init__(self, **kwargs):
... super(AddOneVector, self).__init__(**kwargs)
... def _propose_positions(self, initial_positions):
... print('Propose new positions')
... displacement = unit.Quantity(np.array([1.0, 1.0, 1.0]), initial_positions.unit)
... return initial_positions + displacement
...
>>> alanine = testsystems.AlanineDipeptideVacuum()
>>> sampler_state = states.SamplerState(alanine.positions)
>>> thermodynamic_state = states.ThermodynamicState(alanine.system, 300*unit.kelvin)
>>> move = AddOneVector(atom_subset=list(range(sampler_state.n_particles)))
>>> move.apply(thermodynamic_state,sampler_state,context_cache=context_cache)
Propose new positions
>>> move.n_accepted
1
>>> move.n_proposed
1
"""
[docs]
def __init__(self, atom_subset=None, **kwargs):
super(MetropolizedMove, self).__init__(**kwargs)
self.n_accepted = 0
self.n_proposed = 0
self.atom_subset = atom_subset
@property
def statistics(self):
"""The acceptance statistics as a dictionary."""
return dict(n_accepted=self.n_accepted, n_proposed=self.n_proposed)
@statistics.setter
def statistics(self, value):
self.n_accepted = value['n_accepted']
self.n_proposed = value['n_proposed']
def apply(self, thermodynamic_state, sampler_state, context_cache=None):
"""Apply a metropolized move to the sampler state.
Total number of acceptances and proposed move are updated.
Parameters
----------
thermodynamic_state : openmmtools.states.ThermodynamicState
The thermodynamic state to use to apply the move.
sampler_state : openmmtools.states.SamplerState
The initial sampler state to apply the move to. This is modified.
context_cache : openmmtools.cache.ContextCache
The ContextCache to use for Context creation.
"""
timer = Timer()
benchmark_id = 'Applying {}'.format(self.__class__.__name__ )
timer.start(benchmark_id)
# Get context cache
local_context_cache = self._get_context_cache(context_cache)
# TODO: Is this still needed now that we are specifying the context?
# Create context, any integrator works.
context, unused_integrator = local_context_cache.get_context(thermodynamic_state)
# Compute initial energy. We don't need to set velocities to compute the potential.
# TODO assume sampler_state.potential_energy is the correct potential if not None?
sampler_state.apply_to_context(context, ignore_velocities=True)
initial_energy = thermodynamic_state.reduced_potential(context)
# Handle default and weird cases for atom_subset.
if self.atom_subset is None:
atom_subset = slice(None)
elif not isinstance(self.atom_subset, slice) and len(self.atom_subset) == 1:
# Slice so that initial_positions (below) will have a 2D shape.
atom_subset = slice(self.atom_subset[0], self.atom_subset[0]+1)
else:
atom_subset = self.atom_subset
# Store initial positions of the atoms that are moved.
# We'll use this also to recover in case the move is rejected.
if isinstance(atom_subset, slice):
# Numpy array when sliced return a view, they are not copied.
initial_positions = copy.deepcopy(sampler_state.positions[atom_subset])
else:
# This automatically creates a copy.
initial_positions = sampler_state.positions[atom_subset]
# Propose perturbed positions. Modifying the reference changes the sampler state.
proposed_positions = self._propose_positions(initial_positions)
# Compute the energy of the proposed positions.
sampler_state.positions[atom_subset] = proposed_positions
sampler_state.apply_to_context(context, ignore_velocities=True)
proposed_energy = thermodynamic_state.reduced_potential(context)
# Accept or reject with Metropolis criteria.
delta_energy = proposed_energy - initial_energy
if (not np.isnan(proposed_energy) and
(delta_energy <= 0.0 or np.random.rand() < np.exp(-delta_energy))):
self.n_accepted += 1
else:
# Restore original positions.
sampler_state.positions[atom_subset] = initial_positions
self.n_proposed += 1
# Print timing information.
timer.stop(benchmark_id)
#timer.report_timing()
def __getstate__(self):
serialization = dict(atom_subset=self.atom_subset)
serialization.update(self.statistics)
return serialization
def __setstate__(self, serialization):
self.atom_subset = serialization['atom_subset']
self.statistics = serialization
@abc.abstractmethod
def _propose_positions(self, positions):
"""Return new proposed positions.
These method must be implemented in subclasses.
Parameters
----------
positions : nx3 numpy.ndarray
The original positions of the subset of atoms that these move
applied to.
Returns
-------
proposed_positions : nx3 numpy.ndarray
The new proposed positions.
"""
pass
# =============================================================================
# GENERIC INTEGRATOR MOVE
# =============================================================================
[docs]
class IntegratorMove(BaseIntegratorMove):
"""An MCMCMove that propagate the system with an integrator.
This class makes it easy to convert OpenMM Integrator objects to
MCMCMove objects.
Parameters
----------
integrator : openmm.Integrator
An instance of an OpenMM Integrator object to use for propagation.
n_steps : int
The number of integration steps to take each time the move is applied.
Attributes
----------
integrator
n_steps
"""
[docs]
def __init__(self, integrator, n_steps, **kwargs):
super(IntegratorMove, self).__init__(n_steps=n_steps, **kwargs)
self.integrator = integrator
def _get_integrator(self, thermodynamic_state):
"""Implement BaseIntegratorMove._get_integrator abstract method."""
# We copy the integrator to make sure that the MCMCMove
# can be applied to multiple Contexts.
copied_integrator = copy.deepcopy(self.integrator)
# Restore eventual extra methods for custom forces.
integrators.ThermostatedIntegrator.restore_interface(copied_integrator)
return copied_integrator
def __getstate__(self):
serialization = super(IntegratorMove, self).__getstate__()
serialization['integrator'] = openmm.XmlSerializer.serialize(self.integrator)
return serialization
def __setstate__(self, serialization):
super(IntegratorMove, self).__setstate__(serialization)
self.integrator = openmm.XmlSerializer.deserialize(serialization['integrator'])
# =============================================================================
# LANGEVIN DYNAMICS MOVES
# =============================================================================
[docs]
class LangevinDynamicsMove(BaseIntegratorMove):
"""Langevin dynamics segment as a (pseudo) Monte Carlo move.
This move assigns a velocity from the Maxwell-Boltzmann distribution
and executes a number of Maxwell-Boltzmann steps to propagate dynamics.
This is not a *true* Monte Carlo move, in that the generation of the
correct distribution is only exact in the limit of infinitely small
timestep; in other words, the discretization error is assumed to be
negligible. Use HybridMonteCarloMove instead to ensure the exact
distribution is generated.
The OpenMM LangevinMiddleIntegrator, based on BAOAB [1], is used.
.. warning::
The LangevinMiddleIntegrator generates velocities that are half a timestep lagged behind the positions.
.. warning::
No Metropolization is used to ensure the correct phase space
distribution is sampled. This means that timestep-dependent errors
will remain uncorrected, and are amplified with larger timesteps.
Use this move at your own risk!
References
----------
[1] Leimkuhler B and Matthews C. Robust and efficient configurational molecular sampling via Langevin dynamics. https://doi.org/10.1063/1.4802990
[2] Leimkuhler B and Matthews C. Efficient molecular dynamics using geodesic integration and solvent–solute splitting. https://doi.org/10.1098/rspa.2016.0138
Parameters
----------
timestep : openmm.unit.Quantity, optional
The timestep to use for Langevin integration
(time units, default is 1*openmm.unit.femtosecond).
collision_rate : openmm.unit.Quantity, optional
The collision rate with fictitious bath particles
(1/time units, default is 10/openmm.unit.picoseconds).
n_steps : int, optional
The number of integration timesteps to take each time the
move is applied (default is 1000).
reassign_velocities : bool, optional
If True, the velocities will be reassigned from the Maxwell-Boltzmann
distribution at the beginning of the move (default is False).
constraint_tolerance : float, optional
Fraction of the constrained distance within which constraints are maintained for the
integrator (default is 1e-8).
Attributes
----------
timestep : openmm.unit.Quantity
The timestep to use for Langevin integration (time units).
collision_rate : openmm.unit.Quantity
The collision rate with fictitious bath particles (1/time units).
n_steps : int
The number of integration timesteps to take each time the move
is applied.
reassign_velocities : bool
If True, the velocities will be reassigned from the Maxwell-Boltzmann
distribution at the beginning of the move.
constraint_tolerance : float
Fraction of the constrained distance within which constraints are maintained for the
integrator.
Examples
--------
First we need to create the thermodynamic state and the sampler
state to propagate. Here we create an alanine dipeptide system
in vacuum.
>>> from openmm import unit
>>> from openmmtools import testsystems
>>> from openmmtools.states import SamplerState, ThermodynamicState
>>> test = testsystems.AlanineDipeptideVacuum()
>>> sampler_state = SamplerState(positions=test.positions)
>>> thermodynamic_state = ThermodynamicState(system=test.system, temperature=298*unit.kelvin)
Create a Langevin move with default parameters
>>> move = LangevinDynamicsMove()
or create a Langevin move with specified parameters.
>>> move = LangevinDynamicsMove(timestep=0.5*unit.femtoseconds,
... collision_rate=20.0/unit.picoseconds, n_steps=10)
Perform one update of the sampler state. The sampler state is updated
with the new state.
>>> move.apply(thermodynamic_state,sampler_state,context_cache=context_cache)
>>> np.allclose(sampler_state.positions, test.positions)
False
The same move can be applied to a different state, here an ideal gas.
>>> test = testsystems.IdealGas()
>>> sampler_state = SamplerState(positions=test.positions)
>>> thermodynamic_state = ThermodynamicState(system=test.system,
... temperature=298*unit.kelvin)
>>> move.apply(thermodynamic_state,sampler_state,context_cache=context_cache)
>>> np.allclose(sampler_state.positions, test.positions)
False
"""
[docs]
def __init__(self, timestep=1.0*unit.femtosecond, collision_rate=10.0/unit.picoseconds,
n_steps=1000, reassign_velocities=False, constraint_tolerance=1e-8, **kwargs):
super(LangevinDynamicsMove, self).__init__(n_steps=n_steps,
reassign_velocities=reassign_velocities,
**kwargs)
self.timestep = timestep
self.collision_rate = collision_rate
self.constraint_tolerance = constraint_tolerance
def apply(self, thermodynamic_state, sampler_state, context_cache=None):
"""Apply the Langevin dynamics MCMC move.
This modifies the given sampler_state. The temperature of the
thermodynamic state is used in Langevin dynamics.
Parameters
----------
thermodynamic_state : openmmtools.states.ThermodynamicState
The thermodynamic state to use to propagate dynamics.
sampler_state : openmmtools.states.SamplerState
The sampler state to apply the move to. This is modified.
context_cache : openmmtools.cache.ContextCache
Context cache to be used during propagation with the integrator.
"""
# Explicitly implemented just to have more specific docstring.
super(LangevinDynamicsMove, self).apply(thermodynamic_state, sampler_state,
context_cache=context_cache)
def __getstate__(self):
serialization = super(LangevinDynamicsMove, self).__getstate__()
serialization['timestep'] = self.timestep
serialization['collision_rate'] = self.collision_rate
serialization['constraint_tolerance'] = self.constraint_tolerance
return serialization
def __setstate__(self, serialization):
super(LangevinDynamicsMove, self).__setstate__(serialization)
self.timestep = serialization['timestep']
self.collision_rate = serialization['collision_rate']
self.constraint_tolerance = serialization['constraint_tolerance']
def _get_integrator(self, thermodynamic_state):
"""Implement BaseIntegratorMove._get_integrator()."""
integrator = openmm.LangevinMiddleIntegrator(thermodynamic_state.temperature,
self.collision_rate, self.timestep)
integrator.setConstraintTolerance(self.constraint_tolerance)
return integrator
[docs]
class LangevinSplittingDynamicsMove(LangevinDynamicsMove):
"""
Langevin dynamics segment with custom splitting of the operators and optional Metropolized Monte Carlo validation.
Besides all the normal properties of the :class:`LangevinDynamicsMove`, this class implements the custom splitting
sequence of the :class:`openmmtools.integrators.LangevinIntegrator`. Additionally, the steps can be wrapped around
a proper Generalized Hybrid Monte Carlo step to ensure that the exact distribution is generated.
Parameters
----------
timestep : openmm.unit.Quantity, optional
The timestep to use for Langevin integration
(time units, default is 1*openmm.unit.femtosecond).
collision_rate : openmm.unit.Quantity, optional
The collision rate with fictitious bath particles
(1/time units, default is 10/openmm.unit.picoseconds).
n_steps : int, optional
The number of integration timesteps to take each time the
move is applied (default is 1000).
reassign_velocities : bool, optional
If True, the velocities will be reassigned from the Maxwell-Boltzmann
distribution at the beginning of the move (default is False).
splitting : string, default: "V R O R V"
Sequence of "R", "V", "O" (and optionally "{", "}", "V0", "V1", ...) substeps to be executed each timestep.
Forces are only used in V-step. Handle multiple force groups by appending the force group index
to V-steps, e.g. "V0" will only use forces from force group 0. "V" will perform a step using all forces.
"{" will cause metropolization, and must be followed later by a "}".
constraint_tolerance : float, default: 1.0e-8
Tolerance for constraint solver
measure_shadow_work : boolean, default: False
Accumulate the shadow work performed by the symplectic substeps, in the global `shadow_work`
measure_heat : boolean, default: False
Accumulate the heat exchanged with the bath in each step, in the global `heat`
Attributes
----------
timestep : openmm.unit.Quantity
The timestep to use for Langevin integration (time units).
collision_rate : openmm.unit.Quantity
The collision rate with fictitious bath particles (1/time units).
n_steps : int
The number of integration timesteps to take each time the move
is applied.
reassign_velocities : bool
If True, the velocities will be reassigned from the Maxwell-Boltzmann
distribution at the beginning of the move.
splitting : str
Splitting applied to this integrator represented as a string.
constraint_tolerance : float, default: 1.0e-8
Tolerance for constraint solver
measure_shadow_work : boolean, default: False
Accumulate the shadow work performed by the symplectic substeps, in the global `shadow_work`
measure_heat : boolean, default: False
Accumulate the heat exchanged with the bath in each step, in the global `heat`
Examples
--------
First we need to create the thermodynamic state and the sampler
state to propagate. Here we create an alanine dipeptide system
in vacuum.
>>> from openmm import unit
>>> from openmmtools import testsystems
>>> from openmmtools.states import SamplerState, ThermodynamicState
>>> test = testsystems.AlanineDipeptideVacuum()
>>> sampler_state = SamplerState(positions=test.positions)
>>> thermodynamic_state = ThermodynamicState(system=test.system, temperature=298*unit.kelvin)
Create a Langevin move with default parameters
>>> move = LangevinSplittingDynamicsMove()
or create a Langevin move with specified splitting.
>>> move = LangevinSplittingDynamicsMove(splitting="O { V R V } O")
Where this splitting is a 5 step symplectic integrator:
*. Ornstein-Uhlenbeck (O) interactions with the stochastic heat bath interactions
*. Hybrid Metropolized step around the half-step velocity updates (V) with full position updates (R).
Perform one update of the sampler state. The sampler state is updated
with the new state.
>>> move.apply(thermodynamic_state,sampler_state,context_cache=context_cache)
>>> np.allclose(sampler_state.positions, test.positions)
False
The same move can be applied to a different state, here an ideal gas.
>>> test = testsystems.IdealGas()
>>> sampler_state = SamplerState(positions=test.positions)
>>> thermodynamic_state = ThermodynamicState(system=test.system,
... temperature=298*unit.kelvin)
>>> move.apply(thermodynamic_state,sampler_state,context_cache=context_cache)
>>> np.allclose(sampler_state.positions, test.positions)
False
"""
[docs]
def __init__(self, timestep=1.0 * unit.femtosecond, collision_rate=10.0 / unit.picoseconds,
n_steps=1000, reassign_velocities=False, splitting="V R O R V", constraint_tolerance=1.0e-8,
measure_shadow_work=False, measure_heat=False, **kwargs):
super(LangevinSplittingDynamicsMove, self).__init__(n_steps=n_steps,
reassign_velocities=reassign_velocities,
timestep=timestep,
collision_rate=collision_rate,
**kwargs)
self.splitting = splitting
self.constraint_tolerance = constraint_tolerance
self.measure_shadow_work = measure_shadow_work
self.measure_heat = measure_heat
def __getstate__(self):
serialization = super(LangevinSplittingDynamicsMove, self).__getstate__()
serialization['splitting'] = self.splitting
serialization['constraint_tolerance'] = self.constraint_tolerance
serialization['measure_shadow_work'] = self.measure_shadow_work
serialization['measure_heat'] = self.measure_heat
return serialization
def __setstate__(self, serialization):
super(LangevinSplittingDynamicsMove, self).__setstate__(serialization)
self.splitting = serialization['splitting']
self.constraint_tolerance = serialization['constraint_tolerance']
self.measure_shadow_work = serialization['measure_shadow_work']
self.measure_heat = serialization['measure_heat']
def _get_integrator(self, thermodynamic_state):
"""Implement BaseIntegratorMove._get_integrator()."""
return integrators.LangevinIntegrator(temperature=thermodynamic_state.temperature,
collision_rate=self.collision_rate,
timestep=self.timestep,
splitting=self.splitting,
constraint_tolerance=self.constraint_tolerance,
measure_shadow_work=self.measure_shadow_work,
measure_heat=self.measure_heat)
# =============================================================================
# GENERALIZED HYBRID MONTE CARLO MOVE
# =============================================================================
[docs]
class GHMCMove(BaseIntegratorMove):
"""Generalized hybrid Monte Carlo (GHMC) Markov chain Monte Carlo move.
This move uses generalized Hybrid Monte Carlo (GHMC), a form of Metropolized
Langevin dynamics, to propagate the system.
Parameters
----------
timestep : openmm.unit.Quantity, optional
The timestep to use for Langevin integration (time units, default
is 1*openmm.unit.femtoseconds).
collision_rate : openmm.unit.Quantity, optional
The collision rate with fictitious bath particles (1/time units,
default is 20/openmm.unit.picoseconds).
n_steps : int, optional
The number of integration timesteps to take each time the move
is applied (default is 1000).
Attributes
----------
timestep : openmm.unit.Quantity
The timestep to use for Langevin integration (time units).
collision_rate : openmm.unit.Quantity
The collision rate with fictitious bath particles (1/time units).
n_steps : int
The number of integration timesteps to take each time the move
is applied.
n_accepted : int
The number of accepted steps.
n_proposed : int
The number of attempted steps.
fraction_accepted
References
----------
Lelievre T, Stoltz G, Rousset M. Free energy computations: A mathematical
perspective. World Scientific, 2010.
Examples
--------
First we need to create the thermodynamic state and the sampler
state to propagate. Here we create an alanine dipeptide system
in vacuum.
>>> from openmm import unit
>>> from openmmtools import testsystems
>>> from openmmtools.states import ThermodynamicState, SamplerState
>>> test = testsystems.AlanineDipeptideVacuum()
>>> sampler_state = SamplerState(positions=test.positions)
>>> thermodynamic_state = ThermodynamicState(system=test.system, temperature=298*unit.kelvin)
Create a GHMC move with default parameters.
>>> move = GHMCMove()
or create a GHMC move with specified parameters.
>>> move = GHMCMove(timestep=0.5*unit.femtoseconds,
... collision_rate=20.0/unit.picoseconds, n_steps=10)
Perform one update of the sampler state. The sampler state is updated
with the new state.
>>> move.apply(thermodynamic_state,sampler_state,context_cache=context_cache)
>>> np.allclose(sampler_state.positions, test.positions)
False
The same move can be applied to a different state, here an ideal gas.
>>> test = testsystems.IdealGas()
>>> sampler_state = SamplerState(positions=test.positions)
>>> thermodynamic_state = ThermodynamicState(system=test.system,
... temperature=298*unit.kelvin)
>>> move.apply(thermodynamic_state,sampler_state,context_cache=context_cache)
>>> np.allclose(sampler_state.positions, test.positions)
False
"""
[docs]
def __init__(self, timestep=1.0*unit.femtosecond, collision_rate=20.0/unit.picoseconds,
n_steps=1000, **kwargs):
super(GHMCMove, self).__init__(n_steps=n_steps, **kwargs)
self.timestep = timestep
self.collision_rate = collision_rate
self.n_accepted = 0 # Number of accepted steps.
self.n_proposed = 0 # Number of attempted steps.
@property
def fraction_accepted(self):
"""Ratio between accepted over attempted moves (read-only).
If the number of attempted steps is 0, this is numpy.NaN.
"""
if self.n_proposed == 0:
return np.NaN
# TODO drop the casting when stop Python2 support
return float(self.n_accepted) / self.n_proposed
@property
def statistics(self):
"""The acceptance statistics as a dictionary."""
return dict(n_accepted=self.n_accepted, n_proposed=self.n_proposed)
@statistics.setter
def statistics(self, value):
self.n_accepted = value['n_accepted']
self.n_proposed = value['n_proposed']
def reset_statistics(self):
"""Reset the internal statistics of number of accepted and attempted moves."""
self.n_accepted = 0
self.n_proposed = 0
def apply(self, thermodynamic_state, sampler_state, context_cache=None):
"""Apply the GHMC MCMC move.
This modifies the given sampler_state. The temperature of the
thermodynamic state is used.
Parameters
----------
thermodynamic_state : openmmtools.states.ThermodynamicState
The thermodynamic state to use when applying the MCMC move.
sampler_state : openmmtools.states.SamplerState
The sampler state to apply the move to. This is modified.
context_cache : openmmtools.cache.ContextCache
Context cache to be used during propagation with the integrator.
"""
# Explicitly implemented just to have more specific docstring.
super(GHMCMove, self).apply(thermodynamic_state, sampler_state, context_cache=context_cache)
def __getstate__(self):
serialization = super(GHMCMove, self).__getstate__()
serialization['timestep'] = self.timestep
serialization['collision_rate'] = self.collision_rate
serialization.update(self.statistics)
return serialization
def __setstate__(self, serialization):
super(GHMCMove, self).__setstate__(serialization)
self.timestep = serialization['timestep']
self.collision_rate = serialization['collision_rate']
self.statistics = serialization
def _get_integrator(self, thermodynamic_state):
"""Implement BaseIntegratorMove._get_integrator()."""
# Store lastly generated integrator to collect statistics.
return integrators.GHMCIntegrator(temperature=thermodynamic_state.temperature,
collision_rate=self.collision_rate,
timestep=self.timestep)
def _after_integration(self, context, thermodynamic_state):
"""Implement BaseIntegratorMove._after_integration()."""
integrator = context.getIntegrator()
# Accumulate acceptance statistics.
ghmc_global_variables = {integrator.getGlobalVariableName(index): index
for index in range(integrator.getNumGlobalVariables())}
n_accepted = integrator.getGlobalVariable(ghmc_global_variables['naccept'])
n_proposed = integrator.getGlobalVariable(ghmc_global_variables['ntrials'])
self.n_accepted += n_accepted
self.n_proposed += n_proposed
# =============================================================================
# HYBRID MONTE CARLO MOVE
# =============================================================================
[docs]
class HMCMove(BaseIntegratorMove):
"""Hybrid Monte Carlo dynamics.
This move assigns a velocity from the Maxwell-Boltzmann distribution
and executes a number of velocity Verlet steps to propagate dynamics.
Parameters
----------
timestep : openmm.unit.Quantity, optional
The timestep to use for HMC dynamics, which uses velocity Verlet following
velocity randomization (time units, default is 1*openmm.unit.femtosecond)
n_steps : int, optional
The number of dynamics steps to take before Metropolis acceptance/rejection
(default is 1000).
Attributes
----------
timestep : openmm.unit.Quantity
The timestep to use for HMC dynamics, which uses velocity Verlet following
velocity randomization (time units).
n_steps : int
The number of dynamics steps to take before Metropolis acceptance/rejection.
Examples
--------
First we need to create the thermodynamic state and the sampler
state to propagate. Here we create an alanine dipeptide system
in vacuum.
>>> from openmm import unit
>>> from openmmtools import testsystems
>>> from openmmtools.states import ThermodynamicState, SamplerState
>>> test = testsystems.AlanineDipeptideVacuum()
>>> sampler_state = SamplerState(positions=test.positions)
>>> thermodynamic_state = ThermodynamicState(system=test.system, temperature=298*unit.kelvin)
Create a GHMC move with default parameters.
>>> move = HMCMove()
or create a GHMC move with specified parameters.
>>> move = HMCMove(timestep=0.5*unit.femtoseconds, n_steps=10)
Perform one update of the sampler state. The sampler state is updated
with the new state.
>>> move.apply(thermodynamic_state,sampler_state,context_cache=context_cache)
>>> np.allclose(sampler_state.positions, test.positions)
False
The same move can be applied to a different state, here an ideal gas.
>>> test = testsystems.IdealGas()
>>> sampler_state = SamplerState(positions=test.positions)
>>> thermodynamic_state = ThermodynamicState(system=test.system,
... temperature=298*unit.kelvin)
>>> move.apply(thermodynamic_state,sampler_state,context_cache=context_cache)
>>> np.allclose(sampler_state.positions, test.positions)
False
"""
[docs]
def __init__(self, timestep=1.0*unit.femtosecond, n_steps=1000, **kwargs):
super(HMCMove, self).__init__(n_steps=n_steps, **kwargs)
self.timestep = timestep
def apply(self, thermodynamic_state, sampler_state, context_cache=None):
"""Apply the MCMC move.
This modifies the given sampler_state.
Parameters
----------
thermodynamic_state : openmmtools.states.ThermodynamicState
The thermodynamic state to use when applying the MCMC move.
sampler_state : openmmtools.states.SamplerState
The sampler state to apply the move to. This is modified.
context_cache : openmmtools.cache.ContextCache
Context cache to be used during propagation with the integrator.
"""
# Explicitly implemented just to have more specific docstring.
super(HMCMove, self).apply(thermodynamic_state, sampler_state, context_cache=context_cache)
def __getstate__(self):
serialization = super(HMCMove, self).__getstate__()
serialization['timestep'] = self.timestep
return serialization
def __setstate__(self, serialization):
super(HMCMove, self).__setstate__(serialization)
self.timestep = serialization['timestep']
def _get_integrator(self, thermodynamic_state):
"""Implement BaseIntegratorMove._get_integrator()."""
return integrators.HMCIntegrator(temperature=thermodynamic_state.temperature,
timestep=self.timestep, nsteps=self.n_steps)
# =============================================================================
# MONTE CARLO BAROSTAT MOVE
# =============================================================================
[docs]
class MonteCarloBarostatMove(BaseIntegratorMove):
"""Monte Carlo barostat move.
This move makes one or more attempts to update the box volume using
Monte Carlo updates.
Parameters
----------
n_attempts : int, optional
The number of Monte Carlo attempts to make to adjust the box
volume (default is 5).
Attributes
----------
n_attempts
Examples
--------
The thermodynamic state must be barostated by a MonteCarloBarostat
force. The class ThermodynamicState takes care of adding one when
we specify the pressure in its constructor.
>>> from openmm import unit
>>> from openmmtools import testsystems
>>> from openmmtools.states import ThermodynamicState, SamplerState
>>> test = testsystems.AlanineDipeptideExplicit()
>>> sampler_state = SamplerState(positions=test.positions)
>>> thermodynamic_state = ThermodynamicState(system=test.system, temperature=298*unit.kelvin,
... pressure=1.0*unit.atmosphere)
Create a MonteCarloBarostatMove move with default parameters.
>>> move = MonteCarloBarostatMove()
or create a GHMC move with specified parameters.
>>> move = MonteCarloBarostatMove(n_attempts=2)
Perform one update of the sampler state. The sampler state is updated
with the new state.
>>> move.apply(thermodynamic_state,sampler_state,context_cache=context_cache)
>>> np.allclose(sampler_state.positions, test.positions)
False
"""
[docs]
def __init__(self, n_attempts=5, **kwargs):
super(MonteCarloBarostatMove, self).__init__(n_steps=n_attempts, **kwargs)
@property
def n_attempts(self):
"""The number of MC attempts to make to adjust the box volume."""
return self.n_steps # The number of steps of the dummy integrator.
@n_attempts.setter
def n_attempts(self, value):
self.n_steps = value
def apply(self, thermodynamic_state, sampler_state, context_cache=None):
"""Apply the MCMC move.
The thermodynamic state must be barostated by a MonteCarloBarostat
force. This modifies the given sampler_state.
Parameters
----------
thermodynamic_state : openmmtools.states.ThermodynamicState
The thermodynamic state to use when applying the MCMC move.
sampler_state : openmmtools.states.SamplerState
The sampler state to apply the move to. This is modified.
context_cache : openmmtools.cache.ContextCache
Context cache to be used during propagation with the integrator
"""
# Make sure system contains a MonteCarlo barostat.
barostat = thermodynamic_state.barostat
if barostat is None:
raise RuntimeError('Requested a MonteCarloBarostat move'
' on a system at constant pressure')
if not isinstance(barostat, openmm.MonteCarloBarostat):
raise RuntimeError('Requested a MonteCarloBarostat move on a system '
'barostated with a {}'.format(barostat.__class__.__name__))
# Set temporarily the frequency if needed.
old_barostat_frequency = barostat.getFrequency()
if old_barostat_frequency != 1:
barostat.setFrequency(1)
thermodynamic_state.barostat = barostat
super(MonteCarloBarostatMove, self).apply(thermodynamic_state, sampler_state,
context_cache=context_cache)
# Restore frequency of barostat.
if old_barostat_frequency != 1:
barostat.setFrequency(old_barostat_frequency)
thermodynamic_state.barostat = barostat
def _get_integrator(self, thermodynamic_state):
"""Implement BaseIntegratorMove._get_integrator()."""
return integrators.DummyIntegrator()
# =============================================================================
# RANDOM DISPLACEMENT MOVE
# =============================================================================
[docs]
class MCDisplacementMove(MetropolizedMove):
"""A metropolized move that randomly displace a subset of atoms.
Parameters
----------
displacement_sigma : openmm.unit.Quantity
The standard deviation of the normal distribution used to propose the
random displacement (units of length, default is 1.0*nanometer).
atom_subset : slice or list of int, optional
If specified, the move is applied only to those atoms specified by these
indices. If None, the move is applied to all atoms (default is None).
Attributes
----------
n_accepted : int
The number of proposals accepted.
n_proposed : int
The total number of attempted moves.
displacement_sigma
atom_subset
See Also
--------
MetropolizedMove
"""
[docs]
def __init__(self, displacement_sigma=1.0*unit.nanometer, **kwargs):
super(MCDisplacementMove, self).__init__(**kwargs)
self.displacement_sigma = displacement_sigma
@staticmethod
def displace_positions(positions, displacement_sigma=1.0*unit.nanometer):
"""Return the positions after applying a random displacement to them.
Parameters
----------
positions : nx3 numpy.ndarray openmm.unit.Quantity
The positions to displace.
displacement_sigma : openmm.unit.Quantity
The standard deviation of the normal distribution used to propose
the random displacement (units of length, default is 1.0*nanometer).
Returns
-------
rotated_positions : nx3 numpy.ndarray openmm.unit.Quantity
The displaced positions.
"""
positions_unit = positions.unit
unitless_displacement_sigma = displacement_sigma / positions_unit
displacement_vector = unit.Quantity(np.random.randn(3) * unitless_displacement_sigma,
positions_unit)
return positions + displacement_vector
def __getstate__(self):
serialization = super(MCDisplacementMove, self).__getstate__()
serialization['displacement_sigma'] = self.displacement_sigma
return serialization
def __setstate__(self, serialization):
super(MCDisplacementMove, self).__setstate__(serialization)
self.displacement_sigma = serialization['displacement_sigma']
def _propose_positions(self, initial_positions):
"""Implement MetropolizedMove._propose_positions for apply()."""
return self.displace_positions(initial_positions, self.displacement_sigma)
# =============================================================================
# RANDOM ROTATION MOVE
# =============================================================================
[docs]
class MCRotationMove(MetropolizedMove):
"""A metropolized move that randomly rotate a subset of atoms.
Parameters
----------
atom_subset : slice or list of int, optional
If specified, the move is applied only to those atoms specified by these
indices. If None, the move is applied to all atoms (default is None).
Attributes
----------
n_accepted : int
The number of proposals accepted.
n_proposed : int
The total number of attempted moves.
atom_subset
See Also
--------
MetropolizedMove
"""
[docs]
def __init__(self, **kwargs):
super(MCRotationMove, self).__init__(**kwargs)
@classmethod
def rotate_positions(cls, positions):
"""Return the positions after applying a random rotation to them.
Parameters
----------
positions : nx3 numpy.ndarray openmm.unit.Quantity
The positions to rotate.
Returns
-------
rotated_positions : nx3 numpy.ndarray openmm.unit.Quantity
The rotated positions.
"""
positions_unit = positions.unit
x_initial = positions / positions_unit
# Compute center of geometry of atoms to rotate.
x_initial_mean = x_initial.mean(0)
# Generate a random rotation matrix.
rotation_matrix = cls.generate_random_rotation_matrix()
# Apply rotation.
x_proposed = (rotation_matrix * np.matrix(x_initial - x_initial_mean).T).T + x_initial_mean
return unit.Quantity(x_proposed, positions_unit)
@classmethod
def generate_random_rotation_matrix(cls):
"""Return a random 3x3 rotation matrix.
Returns
-------
Rq : 3x3 numpy.ndarray
The random rotation matrix.
"""
q = cls._generate_uniform_quaternion()
return cls._rotation_matrix_from_quaternion(q)
@staticmethod
def _rotation_matrix_from_quaternion(q):
"""Compute a 3x3 rotation matrix from a given quaternion (4-vector).
Parameters
----------
q : 1x4 numpy.ndarray
Quaterion (need not be normalized, zero norm OK).
Returns
-------
Rq : 3x3 numpy.ndarray
Orthogonal rotation matrix corresponding to quaternion q.
Examples
--------
>>> q = np.array([0.1, 0.2, 0.3, -0.4])
>>> Rq = MCRotationMove._rotation_matrix_from_quaternion(q)
References
----------
[1] http://en.wikipedia.org/wiki/Rotation_matrix#Quaternion
"""
w, x, y, z = q
Nq = (q**2).sum() # Squared norm.
if Nq > 0.0:
s = 2.0 / Nq
else:
s = 0.0
X = x*s; Y = y*s; Z = z*s
wX = w*X; wY = w*Y; wZ = w*Z
xX = x*X; xY = x*Y; xZ = x*Z
yY = y*Y; yZ = y*Z; zZ = z*Z
Rq = np.matrix([[1.0-(yY+zZ), xY-wZ, xZ+wY],
[xY+wZ, 1.0-(xX+zZ), yZ-wX],
[xZ-wY, yZ+wX, 1.0-(xX+yY)]])
return Rq
@staticmethod
def _generate_uniform_quaternion():
"""Generate a uniform normalized quaternion 4-vector.
References
----------
[1] K. Shoemake. Uniform random rotations. In D. Kirk, editor,
Graphics Gems III, pages 124-132. Academic, New York, 1992.
[2] Described briefly here: http://planning.cs.uiuc.edu/node198.html
Examples
--------
>>> q = MCRotationMove._generate_uniform_quaternion()
"""
u = np.random.rand(3)
q = np.array([np.sqrt(1-u[0])*np.sin(2*np.pi*u[1]),
np.sqrt(1-u[0])*np.cos(2*np.pi*u[1]),
np.sqrt(u[0])*np.sin(2*np.pi*u[2]),
np.sqrt(u[0])*np.cos(2*np.pi*u[2])])
return q
def _propose_positions(self, initial_positions):
"""Implement MetropolizedMove._propose_positions for apply()"""
return self.rotate_positions(initial_positions)
# =============================================================================
# MAIN AND TESTS
# =============================================================================
if __name__ == "__main__":
import doctest
doctest.testmod()