from pysb.simulator.base import Simulator, SimulationResult, SimulatorException
from pysb.generator.kappa import KappaGenerator
from pysb.kappa import KasimInterfaceError, _parse_kasim_outfile
import numpy as np
import os
import tempfile
import shutil
import pysb.pathfinder as pf
import subprocess
[docs]class KappaSimulator(Simulator):
""" Simulate a model using Kappa """
_supports = {
'multi_initials': True,
'multi_param_values': True
}
def __init__(self, model, tspan=None, cleanup=True, verbose=False):
super(KappaSimulator, self).__init__(model, tspan=tspan,
verbose=verbose)
self.cleanup = cleanup
self._outdir = None
[docs] def run(self, tspan=None, initials=None, param_values=None, n_runs=1,
output_dir=None, output_file_basename=None,
cleanup=None, **additional_args):
"""
Simulate a model using Kappa
Parameters
----------
tspan: vector-like
time span of simulation
initials: vector-like, optional
initial conditions of model
param_values : vector-like or dictionary, optional
Values to use for every parameter in the model. Ordering is
determined by the order of model.parameters.
If not specified, parameter values will be taken directly from
model.parameters.
n_runs: int
number of simulations to run
output_dir : string, optional
Location for temporary files generated by Kappa. If None (the
default), uses a temporary directory provided by the system. A
temporary directory with a random name is created within the
supplied location.
output_file_basename : string, optional
This argument is used as a prefix for the temporary Kappa
output directory, rather than the individual files.
cleanup : bool, optional
If True (default), delete the temporary files after the
simulation is finished. If False, leave them in place (Useful for
debugging). The default value, None, means to use the value
specified in :py:func:`__init__`.
additional_args: kwargs, optional
Additional arguments to pass to Kappa
* seed : int, optional
Random number seed for Kappa simulation
* perturbation : string, optional
Optional perturbation language syntax to be appended to the
Kappa file. See KaSim manual for more details.
Examples
--------
>>> import numpy as np
>>> from pysb.examples import michment
>>> from pysb.simulator import KappaSimulator
>>> sim = KappaSimulator(michment.model, tspan=np.linspace(0, 1))
>>> x = sim.run(n_runs=1)
"""
super(KappaSimulator, self).run(tspan=tspan,
initials=initials,
param_values=param_values,
_run_kwargs=locals()
)
if cleanup is None:
cleanup = self.cleanup
tspan_lin_spaced = np.allclose(
self.tspan,
np.linspace(self.tspan[0], self.tspan[-1], len(self.tspan))
)
if not tspan_lin_spaced or self.tspan[0] != 0.0:
raise SimulatorException('Kappa requires tspan to be linearly '
'spaced starting at t=0')
points = len(self.tspan)
time = self.tspan[-1]
plot_period = time / (len(self.tspan) - 1)
if output_file_basename is None:
output_file_basename = 'tmpKappa_%s_' % self.model.name
base_directory = tempfile.mkdtemp(prefix=output_file_basename,
dir=output_dir)
base_filename = os.path.join(base_directory, self.model.name)
kappa_filename_pattern = base_filename + '_{}.ka'
out_filename_pattern = base_filename + '_{}_run{}.out'
base_args = ['-u', 'time', '-l', str(time), '-p', '%.5f' % plot_period]
if 'seed' in additional_args:
seed = additional_args.pop('seed')
base_args.extend(['-seed', str(seed)])
kasim_path = pf.get_path('kasim')
n_param_sets = self.initials_length
gen = KappaGenerator(self.model, _exclude_ic_param=True)
file_data_base = gen.get_content()
# Check if a perturbation has been set
try:
perturbation = additional_args.pop('perturbation')
except KeyError:
perturbation = None
# Check no unknown arguments have been set
if additional_args:
raise ValueError('Unknown argument(s): {}'.format(
', '.join(additional_args.keys())
))
# Kappa column names, for sanity check
kappa_col_names = tuple(['time'] +
[o.name for o in self.model.observables])
tout = []
observable_traj = []
try:
for pset_idx in range(n_param_sets):
file_data = file_data_base + ''
for param, param_value in zip(self.model.parameters,
self.param_values[pset_idx]):
file_data += "%var: '{}' {:e}\n".format(param.name,
param_value)
file_data += '\n'
for cp, values in self.initials_dict.items():
file_data += "%init: {} {}\n".format(
values[pset_idx], gen.format_complexpattern(cp)
)
# If any perturbation language code has been passed in, add it
# to the Kappa file:
if perturbation:
file_data += '%s\n' % perturbation
# Generate the Kappa model code from the PySB model and write
# it to the Kappa file:
kappa_filename = kappa_filename_pattern.format(pset_idx)
with open(kappa_filename, 'w') as kappa_file:
self._logger.debug('Kappa file contents:\n\n' + file_data)
kappa_file.write(file_data)
for sim_rpt in range(n_runs):
# Run Kappa
out_filename = out_filename_pattern.format(pset_idx,
sim_rpt)
args = [kasim_path] + base_args + [
'-i', kappa_filename, '-o', out_filename]
# Run KaSim
self._logger.debug('Running: {}'.format(' '.join(args)))
p = subprocess.Popen(args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
cwd=base_directory)
for line in p.stdout:
self._logger.debug('@@' + line.decode('utf8')[:-1])
(p_out, p_err) = p.communicate()
if p.returncode:
raise KasimInterfaceError(
p_out.decode('utf8') + '\n' + p_err.decode('utf8'))
# The simulation data, as a numpy array
data = _parse_kasim_outfile(out_filename)
# Sanity check that observables are in correct order
assert data.dtype.names == kappa_col_names
data = data.view('<f8')
# Handle case with single row output
if data.ndim == 1:
data.shape = (1, data.shape[0])
# Parse into format
tout.append(data[:, 0])
observable_traj.append(data[:, 1:])
finally:
if cleanup:
shutil.rmtree(base_directory)
return SimulationResult(self, tout=tout,
observables_and_expressions=observable_traj,
simulations_per_param_set=n_runs)