Source code for pysb.simulator.stochkit

from pysb.simulator.base import Simulator, SimulationResult, SimulatorException
from pysb.bng import generate_equations
from pysb.export.stochkit import StochKitExporter
import numpy as np
import subprocess
import os
import shutil
import tempfile
import re
from pysb.pathfinder import get_path
from pysb.logging import EXTENDED_DEBUG


[docs]class StochKitSimulator(Simulator): """ Interface to the StochKit 2 stochastic simulation toolkit StochKit can be installed from GitHub: https://github.com/stochss/stochkit This class is inspired by the `gillespy <https://github.com/JohnAbel/gillespy>` library, but has been optimised for use with PySB. .. warning:: The interface for this class is considered experimental and may change without warning as PySB is updated. Parameters ---------- model : pysb.Model Model to simulate. tspan : vector-like, optional Time values over which to simulate. The first and last values define the time range. Returned trajectories are sampled at every value unless the simulation is interrupted for some reason, e.g., due to satisfaction of a logical stopping criterion (see 'tout' below). initials : vector-like or dict, optional Values to use for the initial condition of all species. Ordering is determined by the order of model.species. If not specified, initial conditions will be taken from model.initials (with initial condition parameter values taken from `param_values` if specified). param_values : vector-like or dict, optional Values to use for every parameter in the model. Ordering is determined by the order of model.parameters. If passed as a dictionary, keys must be parameter names. If not specified, parameter values will be taken directly from model.parameters. verbose : bool or int, optional (default: False) Sets the verbosity level of the logger. See the logging levels and constants from Python's logging module for interpretation of integer values. False is equal to the PySB default level (currently WARNING), True is equal to DEBUG. **kwargs : dict Extra keyword arguments, including: * ``cleanup``: Boolean, delete directory after completion if True Examples -------- Simulate a model and display the results for an observable: >>> from pysb.examples.robertson import model >>> import numpy as np >>> np.set_printoptions(precision=4) >>> sim = StochKitSimulator(model, tspan=np.linspace(0, 10, 5)) Here we supply a "seed" to the random number generator for deterministic results, but for most purposes it is recommended to leave this blank. >>> simulation_result = sim.run(n_runs=2, seed=123456) A_total trajectory for first run >>> print(simulation_result.observables[0]['A_total']) \ #doctest: +NORMALIZE_WHITESPACE [1. 0. 0. 0. 0.] A_total trajectory for second run >>> print(simulation_result.observables[1]['A_total']) \ #doctest: +SKIP [1. 1. 1. 0. 0.] For further information on retrieving trajectories (species, observables, expressions over time) from the ``simulation_result`` object returned by :func:`run`, see the examples under the :class:`SimulationResult` class. """ _supports = {'multi_initials': True, 'multi_param_values': True} def __init__(self, model, tspan=None, initials=None, param_values=None, verbose=False, **kwargs): super(StochKitSimulator, self).__init__(model, tspan=tspan, initials=initials, param_values=param_values, verbose=verbose, **kwargs) self.cleanup = kwargs.pop('cleanup', True) if kwargs: raise ValueError('Unknown keyword argument(s): {}'.format( ', '.join(kwargs.keys()) )) self._outdir = None generate_equations(self._model, cleanup=self.cleanup, verbose=self.verbose) def _run_stochkit(self, t=20, t_length=100, number_of_trajectories=1, seed=None, algorithm='ssa', method=None, num_processors=1, stats=False, epsilon=None, threshold=None): extra_args = '-p {:d}'.format(num_processors) # Random seed for stochastic simulation if seed is not None: extra_args += ' --seed {:d}'.format(seed) # Keep all the trajectories by default extra_args += ' --keep-trajectories' # Number of trajectories extra_args += ' --realizations {:d}'.format(number_of_trajectories) # We generally don't need the extra stats if not stats: extra_args += ' --no-stats' if method is not None: # This only works for StochKit 2.1 extra_args += ' --method {}'.format(method) if epsilon is not None: extra_args += ' --epsilon {:f}'.format(epsilon) if threshold is not None: extra_args += ' --threshold {:d}'.format(threshold) # Find binary for selected algorithm (SSA, Tau-leaping, ...) if algorithm not in ['ssa', 'tau_leaping']: raise StochKitSimulatorException( "algorithm must be 'ssa' or 'tau_leaping'") executable = get_path('stochkit_{}'.format(algorithm)) # Output model file to directory fname = os.path.join(self._outdir, 'pysb.xml') trajectories = [] for i in range(len(self.initials)): # We write all StochKit output files to a temporary folder prefix_outdir = os.path.join(self._outdir, 'output_{}'.format(i)) # Export model file stoch_xml = StochKitExporter(self._model).export( self.initials[i], self.param_values[i]) self._logger.log(EXTENDED_DEBUG, 'StochKit XML:\n%s' % stoch_xml) with open(fname, 'wt') as f: f.write(stoch_xml) # Assemble the argument list args = '--model {} --out-dir {} -t {:f} -i {:d}'.format( fname, prefix_outdir, t, t_length - 1) # If we are using local mode, shell out and run StochKit # (SSA or Tau-leaping or ODE) cmd = '{} {} {}'.format(executable, args, extra_args) self._logger.debug("StochKit run {} of {} (cmd: {})".format( (i + 1), len(self.initials), cmd)) # Execute try: handle = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) return_code = handle.wait() except OSError as e: raise StochKitSimulatorException("StochKit execution failed: \ {0}\n{1}".format(cmd, e)) try: stderr = handle.stderr.read().decode('utf8') except Exception as e: stderr = 'Error reading stderr: {0}'.format(e) try: stdout = handle.stdout.read().decode('utf8') except Exception as e: stdout = 'Error reading stdout: {0}'.format(e) if return_code != 0: raise StochKitSimulatorException( "Solver execution failed (return code: {})".format( return_code), stdout=stdout, stderr=stderr ) traj_dir = os.path.join(prefix_outdir, 'trajectories') try: trajectories.extend([np.loadtxt(os.path.join( traj_dir, f)) for f in sorted(os.listdir(traj_dir))]) except Exception as e: raise StochKitSimulatorException( "Error reading StochKit trajectories: {}".format(e), stdout=stdout, stderr=stderr ) if len(trajectories) == 0 or len(stderr) != 0: raise StochKitSimulatorException( "Solver execution failed: {}".format(e), stdout=stdout, stderr=stderr ) self._logger.debug("StochKit STDOUT:\n{0}".format(stdout)) # Return data return trajectories
[docs] def run(self, tspan=None, initials=None, param_values=None, n_runs=1, algorithm='ssa', output_dir=None, num_processors=1, seed=None, method=None, stats=False, epsilon=None, threshold=None): """ Run a simulation and returns the result (trajectories) .. note:: In early versions of the Simulator class, ``tspan``, ``initials`` and ``param_values`` supplied to this method persisted to future :func:`run` calls. This is no longer the case. Parameters ---------- tspan initials param_values See parameter definitions in :class:`StochKitSimulator`. n_runs : int The number of simulation runs per parameter set. The total number of simulations is therefore n_runs * max(len(initials), len(param_values)) algorithm : str Choice of 'ssa' (Gillespie's stochastic simulation algorithm) or 'tau_leaping' (Tau leaping algorithm) output_dir : str or None Directory for StochKit output, or None for a system-specific temporary directory num_processors : int Number of CPU cores for StochKit to use (default: 1) seed : int or None A random number seed for StochKit. Set to any integer value for deterministic behavior. method : str or None StochKit "method" argument, default None. Only used by StochKit 2.1 (not yet released at time of writing). stats : bool Ask StochKit to generate simulation summary statistics if True epsilon : float or None Tolerance parameter for tau-leaping algorithm threshold : int or None Threshold parameter for tau-leaping algorithm Returns ------- A :class:`SimulationResult` object """ super(StochKitSimulator, self).run(tspan=tspan, initials=initials, param_values=param_values, _run_kwargs=locals()) self._logger.info('Running StochKit with {:d} parameter sets, ' '{:d} repeats ({:d} simulations total)'.format( len(self.initials), n_runs, len(self.initials) * n_runs)) if output_dir is None: self._outdir = tempfile.mkdtemp() else: self._outdir = output_dir # Calculate time intervals and validate t_range = self.tspan[-1] - self.tspan[0] t_length = len(self.tspan) if not np.allclose(self.tspan, np.linspace(0, self.tspan[-1], t_length)): raise StochKitSimulatorException( 'StochKit requires tspan to be linearly ' 'spaced starting at t=0' ) try: trajectories = self._run_stochkit(t=t_range, number_of_trajectories=n_runs, t_length=t_length, seed=seed, algorithm=algorithm, method=method, num_processors=num_processors, stats=stats, epsilon=epsilon, threshold=threshold) finally: if self.cleanup: try: shutil.rmtree(self._outdir) except OSError: pass # set output time points trajectories_array = np.array(trajectories) self.tout = trajectories_array[:, :, 0] + self.tspan[0] # species species = trajectories_array[:, :, 1:] return SimulationResult(self, self.tout, species, simulations_per_param_set=n_runs)
class StochKitSimulatorException(SimulatorException): def __init__(self, *args, **kwargs): super(StochKitSimulatorException, self).__init__(*args) stdout = kwargs.get('stdout') stderr = kwargs.get('stderr') log_data = None m = re.search(r'Check log file "(.*?)" for error messages', stdout) if m: try: with open(m.group(1), 'r') as f: log_data = f.read() except OSError: log_data = '<Error reading log file>' message = self.args[0] if stdout: message += '\n\nSTDOUT:\n' for l in stdout.splitlines(): message += ' {}\n'.format(l) if stderr: message += '\nSTDERR:\n' for l in stderr.splitlines(): message += ' {}\n'.format(l) if log_data: message += '\nLOG FILE:\n' for l in log_data.splitlines(): message += ' {}\n'.format(l) self.args = (message, )