Source code for pysb.bng

from __future__ import absolute_import
from __future__ import print_function as _

from sympy.parsing.sympy_parser import ParenthesisGroup

import pysb.core
from pysb.generator.bng import BngGenerator, format_complexpattern
import os
import subprocess
import re
import itertools
import sympy
import sympy.parsing.sympy_parser as sympy_parser
import numpy
import tempfile
import abc
from warnings import warn
import shutil
import collections
from collections.abc import Sequence
import pysb.pathfinder as pf
import tokenize
from pysb.logging import get_logger, EXTENDED_DEBUG
from sympy.logic.boolalg import Boolean
from sympy import Function
from io import StringIO


[docs] def set_bng_path(dir): """ Deprecated. Use pysb.pathfinder.set_path() instead. """ warn("Function %s() is deprecated; use pysb.pathfinder.set_path() " "instead" % set_bng_path.__name__, category=DeprecationWarning, stacklevel=2) pf.set_path('bng', dir)
class BNGLStrictLessThan(Function): """ BNGL compatible implementation of sympy.StrictLessThan. Name formatted such that respective tokenization can easily be reverted to corresponding sympy function. """ nargs = 2 @classmethod def eval(cls, left, right): return sympy.Piecewise((1, left < right), (0, True)) def __str__(self): return f'({self.args[0]} < {self.args[1]})' __repr__ = __str__ class BNGLLessThan(Function): """ BNGL compatible implementation of sympy.LessThan. Name formatted such that respective tokenization can easily be reverted to corresponding sympy function. """ nargs = 2 @classmethod def eval(cls, left, right): return sympy.Piecewise((1, left <= right), (0, True)) def __str__(self): return f'({self.args[0]} <= {self.args[1]})' __repr__ = __str__ class BNGLStrictGreaterThan(Function): """ BNGL compatible implementation of sympy.StrictGreaterThan. Name formatted such that respective tokenization can easily be reverted to corresponding sympy function. """ nargs = 2 @classmethod def eval(cls, left, right): return sympy.Piecewise((1, left > right), (0, True)) def __str__(self): return f'({self.args[0]} > {self.args[1]})' __repr__ = __str__ class BNGLGreaterThan(Function): """ BNGL compatible implementation of sympy.GreaterThan. Name formatted such that respective tokenization can easily be reverted to corresponding sympy function. """ nargs = 2 @classmethod def eval(cls, left, right): return sympy.Piecewise((1, left >= right), (0, True)) def __str__(self): return f'({self.args[0]} >= {self.args[1]})' __repr__ = __str__ class BNGLIf(Function): """ BNGL compatible implementation of an if-statement. Implemented using sympy.Piecewise to allow conversion between BNGL conditional expressions and their SymPy equivalents. """ nargs = 3 @classmethod def eval(cls, condition, true, false): return sympy.Piecewise((true, condition), (false, True)) def __str__(self): return f'ITE({self.args[0]}, {self.args[1]}, {self.args[2]})' __repr__ = __str__ BNG_COMPARATIVE_OPERATORS = { '<': BNGLStrictLessThan, '>': BNGLStrictGreaterThan, '<=': BNGLLessThan, '>=': BNGLGreaterThan, }
[docs] class BngInterfaceError(RuntimeError): """BNG reported an error""" pass
[docs] class BngBaseInterface(object): """ Abstract base class for interfacing with BNG """ __metaclass__ = abc.ABCMeta @abc.abstractmethod def __init__(self, model=None, verbose=False, cleanup=False, output_prefix=None, output_dir=None, model_additional_species=None, model_population_maps=None): self._logger = get_logger(__name__, model=model, log_level=verbose) self._base_file_stem = 'pysb' self.cleanup = cleanup self.output_prefix = 'tmpBNG' if output_prefix is None else \ output_prefix if model: self.generator = BngGenerator( model, additional_initials=model_additional_species, population_maps=model_population_maps ) self.model = self.generator.model self._check_model() else: self.generator = None self.model = None self.base_directory = tempfile.mkdtemp(prefix=self.output_prefix, dir=output_dir) self._logger.debug('{} instantiated in directory {}'.format( self.__class__, self.base_directory) ) def __enter__(self): return self @abc.abstractmethod def __exit__(self): return def _delete_tmpdir(self): shutil.rmtree(self.base_directory) def _check_model(self): """ Checks a model has at least one initial condition and rule, raising an exception if not """ if not self.model.rules: raise NoRulesError() if ( not self.model.initials and not any(r.is_synth() for r in self.model.rules) ): raise NoInitialConditionsError() @classmethod def _bng_param(cls, param): """ Ensures a BNG console parameter is in the correct format Strings are double quoted and booleans are mapped to [0,1]. Other types are currently used verbatim. Parameters ---------- param : An argument to a BNG action call """ if isinstance(param, str): return '"%s"' % param elif isinstance(param, bool): return 1 if param else 0 elif isinstance(param, (Sequence, numpy.ndarray)): return list(param) return param
[docs] @abc.abstractmethod def action(self, action, **kwargs): """ Generates code to execute a BNG action command Parameters ---------- action: string The name of the BNG action function kwargs: kwargs, optional Arguments and values to supply to BNG """ return
@classmethod def _format_action_args(cls, **kwargs): """ Formats a set of arguments for BNG Parameters ---------- kwargs: kwargs, optional Arguments and values to supply to BNG """ if kwargs: action_args = ','.join('%s=>%s' % (k, BngConsole._bng_param(v)) for k, v in kwargs.items()) else: action_args = '' return action_args @property def base_filename(self): """ Returns the base filename (without extension) for BNG output files """ return os.path.join(self.base_directory, self._base_file_stem) @property def bng_filename(self): """ Returns the BNG command list (.bngl) filename (does not check whether the file exists) """ return self.base_filename + '.bngl' @property def net_filename(self): """ Returns the BNG network filename (does not check whether the file exists) """ return self.base_filename + '.net'
[docs] def read_netfile(self): """ Reads a BNG network file as a string. Note that you must execute network generation separately before attempting this, or the file will not be found. :return: Contents of the BNG network file as a string """ self._logger.debug('Reading BNG netfile: %s' % self.net_filename) with open(self.net_filename, 'r') as net_file: output = net_file.read() return output
[docs] def read_simulation_results(self): """ Read the results of a BNG simulation as a numpy array Returns ------- numpy.ndarray Simulation results in a 2D matrix (time on Y axis, species/observables/expressions on X axis depending on simulation type) """ return self.read_simulation_results_multi([self.base_filename])[0]
[docs] @staticmethod def read_simulation_results_multi(base_filenames): """ Read the results of multiple BNG simulations Parameters ---------- base_filenames: list of str A list of filename stems to read simulation results in from, including the full path but not including any file extension. Returns ------- list of numpy.ndarray List of simulation results, each in a 2D matrix (time on Y axis, species/observables/expressions on X axis depending on simulation type) """ list_of_yfulls = [] for base_filename in base_filenames: names = ['time'] # Read concentrations data try: cdat_arr = numpy.loadtxt(base_filename + '.cdat', skiprows=1, ndmin=2) # -1 for time column names += ['__s%d' % i for i in range(cdat_arr.shape[1] - 1)] except IOError: cdat_arr = None # Read groups data try: with open(base_filename + '.gdat', 'r') as f: # Exclude \# and time column names += f.readline().split()[2:] # Exclude first column (time) gdat_arr = numpy.loadtxt(f, ndmin=2) if cdat_arr is None: cdat_arr = numpy.ndarray((len(gdat_arr), 0)) else: gdat_arr = gdat_arr[:, 1:] except IOError: if cdat_arr is None: raise BngInterfaceError('Need at least one of .cdat file or ' '.gdat file to read simulation ' 'results') gdat_arr = numpy.ndarray((len(cdat_arr), 0)) yfull_dtype = list(zip(names, itertools.repeat(float))) yfull = numpy.ndarray(len(cdat_arr), yfull_dtype) yfull_view = yfull.view(float).reshape(len(yfull), -1) yfull_view[:, :cdat_arr.shape[1]] = cdat_arr yfull_view[:, cdat_arr.shape[1]:] = gdat_arr list_of_yfulls.append(yfull) return list_of_yfulls
[docs] class BngConsole(BngBaseInterface): """ Interact with BioNetGen through BNG Console """ def __init__(self, model=None, verbose=False, cleanup=True, output_dir=None, output_prefix=None, timeout=30, suppress_warnings=False, model_additional_species=None): super(BngConsole, self).__init__( model, verbose, cleanup, output_prefix, output_dir, model_additional_species=model_additional_species ) try: import pexpect except ImportError: raise ImportError("Library 'pexpect' is required to use " "BNGConsole, please install it to continue.\n" "It is not currently available on Windows.") if suppress_warnings: warn("suppress_warnings is deprecated and has no effect. Adjust " "the log level with the verbose argument instead.", category=DeprecationWarning, stacklevel=2) # Generate BNGL file if self.model: with open(self.bng_filename, mode='w') as bng_file: bng_file.write(self.generator.get_content()) # Start BNG Console and load BNGL bng_path = pf.get_path('bng') bng_exec_path = '%s --console' % bng_path if not bng_path.endswith('.bat'): bng_exec_path = 'perl %s' % bng_exec_path self.console = pexpect.spawn(bng_exec_path, cwd=self.base_directory, timeout=timeout) self._console_wait() if self.model: self.console.sendline('load %s' % self.bng_filename) self._console_wait() def __exit__(self, exc_type, exc_val, exc_tb): """ In console mode, commands have already been executed, so we simply close down the console and erase the temporary directory if applicable. """ self.console.sendline('done') self.console.close() if self.cleanup: self._delete_tmpdir() def _console_wait(self): """ Wait for BNG console to process the command, and return the output :return: BNG console output from the previous command """ self.console.expect('BNG>') # Python 3 requires explicit conversion of 'bytes' to 'str' console_msg = self.console.before.decode('utf-8') if "ERROR:" in console_msg: raise BngInterfaceError(console_msg) elif "WARNING:" in console_msg: self._logger.warning(console_msg) else: self._logger.debug(console_msg) return console_msg
[docs] def generate_network(self, overwrite=False): """ Generates a network in BNG and returns the network file contents as a string Parameters ---------- overwrite: bool, optional Overwrite existing network file, if any """ self.action('generate_network', overwrite=overwrite) return self.read_netfile()
[docs] def action(self, action, **kwargs): """ Generates a BNG action command and executes it through the console, returning any console output Parameters ---------- action : string The name of the BNG action function kwargs : kwargs, optional Arguments and values to supply to BNG """ # Process BNG arguments into a string action_args = self._format_action_args(**kwargs) # Execute the command via the console if action_args == '': cmd = 'action %s()' % action else: cmd = 'action %s({%s})' % (action, action_args) self._logger.debug(cmd) self.console.sendline(cmd) # Wait for the command to execute and return the result return self._console_wait()
[docs] def load_bngl(self, bngl_file): """ Load a BNGL file in the BNG console Parameters ---------- bngl_file : string The filename of a .bngl file """ cmd = 'load %s' % bngl_file self._logger.debug(cmd) self.console.sendline(cmd) self._console_wait() self._base_file_stem = os.path.splitext(os.path.basename(bngl_file))[0]
[docs] class BngFileInterface(BngBaseInterface): def __init__(self, model=None, verbose=False, output_dir=None, output_prefix=None, cleanup=True, model_additional_species=None, model_population_maps=None): super(BngFileInterface, self).__init__( model, verbose, cleanup, output_prefix, output_dir, model_additional_species=model_additional_species, model_population_maps=model_population_maps ) self._init_command_queue() def _init_command_queue(self): """ Initializes the BNG command queue """ self.command_queue = StringIO() self.command_queue.write('begin actions\n') def __exit__(self, exc_type, exc_val, exc_tb): """ In file interface mode, we close the command queue buffer (whether or not it's been executed) and erase the temporary directory if applicable. """ self.command_queue.close() if self.cleanup: self._delete_tmpdir()
[docs] def execute(self, reload_netfile=False, skip_file_actions=True): """ Executes all BNG commands in the command queue. Parameters ---------- reload_netfile: bool or str If true, attempts to reload an existing .net file from a previous execute() iteration. If a string, the filename specified in the string is supplied to BNG's readFile (which can be any file type BNG supports, such as .net or .bngl). This is useful for running multiple actions in a row, where results need to be read into PySB before a new series of actions is executed. skip_file_actions: bool Only used if the previous argument is not False. Set this argument to True to ignore any actions block in the loaded file. """ self.command_queue.write('end actions\n') bng_commands = self.command_queue.getvalue() # Generate BNGL file with open(self.bng_filename, 'w') as bng_file: output = '' if self.model and not reload_netfile: output += self.generator.get_content() if reload_netfile: filename = reload_netfile if \ isinstance(reload_netfile, str) \ else self.net_filename output += '\n readFile({file=>"%s",skip_actions=>%d})\n' \ % (filename, int(skip_file_actions)) output += bng_commands bng_file.write(output) lines = output.split('\n') line_number_format = 'L{{:0{}d}} {{}}'.format(int(numpy.ceil(numpy.log10(len(lines))))) output = '\n'.join(line_number_format.format(ln + 1, line) for ln, line in enumerate(lines)) self._logger.debug('BNG command file contents:\n' + output) # Reset the command queue, in case execute() is called again self.command_queue.close() self._init_command_queue() bng_exec_args = [pf.get_path('bng'), self.bng_filename] if not bng_exec_args[0].endswith('.bat'): bng_exec_args.insert(0, 'perl') p = subprocess.Popen(bng_exec_args, cwd=self.base_directory, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # output lines as DEBUG, unless a warning or error is encountered capture_error = False captured_error_lines = [] for line in iter(p.stdout.readline, b''): line = line[:-1].decode('utf-8') if line.startswith('ERROR:'): capture_error = True if capture_error: captured_error_lines.append(line) self._logger.debug(line) # p_out is already consumed, so only get p_err (_, p_err) = p.communicate() p_err = p_err.decode('utf-8') if p.returncode or captured_error_lines: raise BngInterfaceError('\n'.join(captured_error_lines) + "\n" + p_err.rstrip())
[docs] def action(self, action, **kwargs): """ Generates a BNG action command and adds it to the command queue Parameters ---------- action : string The name of the BNG action function kwargs : kwargs, optional Arguments and values to supply to BNG """ # Process BNG arguments into a string action_args = self._format_action_args(**kwargs) # Add the command to the queue if action_args == '': self.command_queue.write('\t%s()\n' % action) else: self.command_queue.write('\t%s({%s})\n' % (action, action_args)) return
[docs] def set_parameter(self, name, value): """ Generates a BNG action command and adds it to the command queue Parameters ---------- name: string The name of the parameter to set value: float-like Value of parameter """ self.command_queue.write('\tsetParameter("%s", %g)\n' % (name, value))
[docs] def set_concentration(self, cplx_pat, value): """ Generates a BNG action command and adds it to the command queue Parameters ---------- cplx_pat: pysb.ComplexPattern or string Species ComplexPattern, or a BNG format string representation value: float-like Initial concentration """ if isinstance(cplx_pat, str): formatted_name = cplx_pat else: formatted_name = format_complexpattern( pysb.core.as_complex_pattern(cplx_pat) ) self.command_queue.write('\tsetConcentration("%s", %g)\n' % ( formatted_name, value))
def generate_hybrid_model(model, population_maps, additional_species=None, safe=False, verbose=False, output_dir=None, output_prefix=None, cleanup=True): with BngFileInterface(model, output_dir=output_dir, output_prefix=output_prefix, cleanup=cleanup, model_additional_species=additional_species, model_population_maps=population_maps) as bng: bng.action('generate_hybrid_model', verbose=verbose, safe=safe, suffix='hpp') bng.execute() with open(bng.base_filename + '_hpp.bngl', 'r') as f: return f.read()
[docs] def run_ssa(model, t_end=10, n_steps=100, param_values=None, output_dir=None, output_file_basename=None, cleanup=True, verbose=False, **additional_args): """ Simulate a model with BNG's SSA simulator and return the trajectories. Parameters ---------- model : Model Model to simulate. t_end : number, optional Final time point of the simulation. n_steps : int, optional Number of steps in the simulation. param_values : vector-like or dictionary, optional Values to use for every parameter in the model. Ordering is determined by the order of model.parameters. If not specified, parameter values will be taken directly from model.parameters. output_dir : string, optional Location for temporary files generated by BNG. If None (the default), uses a temporary directory provided by the system. A temporary directory with a random name is created within the supplied location. output_file_basename : string, optional This argument is used as a prefix for the temporary BNG output directory, rather than the individual files. cleanup : bool, optional If True (default), delete the temporary files after the simulation is finished. If False, leave them in place. Useful for debugging. verbose : bool or int, optional (default: False) Sets the verbosity level of the logger. See the logging levels and constants from Python's logging module for interpretation of integer values. False is equal to the PySB default level (currently WARNING), True is equal to DEBUG. additional_args: kwargs, optional Additional arguments to pass to BioNetGen """ bng_action_debug = verbose if isinstance(verbose, bool) else \ verbose <= EXTENDED_DEBUG additional_args['method'] = 'ssa' additional_args['t_end'] = t_end additional_args['n_steps'] = n_steps additional_args['verbose'] = additional_args.get('verbose', bng_action_debug) if param_values is not None: if len(param_values) != len(model.parameters): raise Exception("param_values must be the same length as " "model.parameters") for i in range(len(param_values)): model.parameters[i].value = param_values[i] with BngFileInterface(model, verbose=verbose, output_dir=output_dir, output_prefix=output_file_basename, cleanup=cleanup) as bngfile: bngfile.action('generate_network', overwrite=True, verbose=bng_action_debug) bngfile.action('simulate', **additional_args) bngfile.execute() output = bngfile.read_netfile() # Parse netfile (in case this hasn't already been done) if not model.species: _parse_netfile(model, iter(output.split('\n'))) yfull = bngfile.read_simulation_results() return yfull
[docs] def generate_network(model, cleanup=True, append_stdout=False, verbose=False, **kwargs): """ Return the output from BNG's generate_network function given a model. The output is a BNGL model definition with additional sections 'reactions' and 'groups', and the 'species' section expanded to contain all possible species. BNG refers to this as a 'net' file. Parameters ---------- model : Model Model to pass to generate_network. cleanup : bool, optional If True (default), delete the temporary files after the simulation is finished. If False, leave them in place (in `output_dir`). Useful for debugging. append_stdout : bool, optional This option is no longer supported and has been left here for API compatibility reasons. verbose : bool or int, optional (default: False) Sets the verbosity level of the logger. See the logging levels and constants from Python's logging module for interpretation of integer values. False is equal to the PySB default level (currently WARNING), True is equal to DEBUG. """ bng_action_debug = verbose if isinstance(verbose, bool) else \ verbose <= EXTENDED_DEBUG with BngFileInterface(model, verbose=verbose, cleanup=cleanup) as bngfile: bngfile._logger.info('Generating reaction network') bngfile.action('generate_network', overwrite=True, verbose=bng_action_debug, **kwargs) bngfile.execute() output = bngfile.read_netfile() return output
[docs] def load_equations(model, netfile): """ Load model equations from a specified netfile Useful for large models where BioNetGen network generation takes a long time - the .net file can be saved and reloaded using this function at a later date. Parameters ---------- model: pysb.Model PySB model file netfile: str BNG netfile """ if model.reactions: return with open(netfile, 'r') as f: _parse_netfile(model, f)
[docs] def generate_equations(model, cleanup=True, verbose=False, **kwargs): """ Generate math expressions for reaction rates and species in a model. This fills in the following pieces of the model: * species * reactions * reactions_bidirectional * observables (just `coefficients` and `species` fields for each element) Parameters ---------- model : Model Model to pass to generate_network. cleanup : bool, optional If True (default), delete the temporary files after the simulation is finished. If False, leave them in place (in `output_dir`). Useful for debugging. verbose : bool or int, optional (default: False) Sets the verbosity level of the logger. See the logging levels and constants from Python's logging module for interpretation of integer values. False is equal to the PySB default level (currently WARNING), True is equal to DEBUG. """ # only need to do this once # TODO track "dirty" state, i.e. "has model been modified?" # or, use a separate "math model" object to contain ODEs if model.reactions: return lines = iter(generate_network(model, cleanup=cleanup, verbose=verbose, **kwargs).split('\n')) _parse_netfile(model, lines)
def _parse_netfile(model, lines): """ Parse species, rxns and groups from a BNGL net file """ try: while 'begin parameters' not in next(lines): pass while True: line = next(lines) if 'end parameters' in line: break _parse_parameter(model, line) while 'begin species' not in next(lines): pass model.species = [] while True: line = next(lines) if 'end species' in line: break _parse_species(model, line) while 'begin reactions' not in next(lines): pass reaction_cache = {} while True: line = next(lines) if 'end reactions' in line: break _parse_reaction(model, line, reaction_cache=reaction_cache) # fix up reactions whose reverse version we saw first for r in model.reactions_bidirectional: if all(r['reverse']): r['reactants'], r['products'] = r['products'], r['reactants'] r['rate'] *= -1 # now the 'reverse' value is no longer needed del r['reverse'] while 'begin groups' not in next(lines): pass while True: line = next(lines) if 'end groups' in line: break _parse_group(model, line) except StopIteration as e: pass def _parse_parameter(model, line): _, pname, pval, _, ptype = line.strip().split() par_names = model.components.keys() if pname not in par_names: # Need to parse the value even for constants, since BNG considers some # expressions to be "Constant", e.g. "2^2" parsed_expr = parse_bngl_expr(pval, local_dict={ e.name: e for e in model.parameters | model.expressions}) if ptype == 'Constant' and pname not in model._derived_parameters.keys(): p = pysb.core.Parameter(pname, parsed_expr, _export=False) model._derived_parameters.add(p) elif ptype == 'ConstantExpression' and \ pname not in model._derived_expressions.keys(): p = pysb.core.Expression(pname, parsed_expr, _export=False) model._derived_expressions.add(p) else: raise ValueError('Unknown type {} for parameter {}'.format( ptype, pname)) def _parse_species(model, line): """Parse a 'species' line from a BNGL net file.""" index, species, value = line.strip().split() species_compartment_name, complex_string = \ re.match(r'(?:@(\w+)::)?(.*)', species).groups() species_compartment = model.compartments.get(species_compartment_name) monomer_strings = complex_string.split('.') monomer_patterns = [] for ms in monomer_strings: monomer_name, site_strings, monomer_compartment_name = \ re.match(r'\$?(\w+)\(([^)]*)\)(?:@(\w+))?', ms).groups() site_conditions = collections.defaultdict(list) if len(site_strings): for ss in site_strings.split(','): # FIXME this should probably be done with regular expressions if '!' in ss and '~' in ss: site_name, condition = ss.split('~') state, bond = condition.split('!') if bond == '?': bond = pysb.core.WILD elif bond == '!': bond = pysb.core.ANY else: bond = int(bond) condition = (state, bond) elif '!' in ss: site_name, condition = ss.split('!', 1) if '!' in condition: condition = [int(c) for c in condition.split('!')] else: condition = int(condition) elif '~' in ss: site_name, condition = ss.split('~') else: site_name, condition = ss, None site_conditions[site_name].append(condition) site_conditions = {k: v[0] if len(v) == 1 else pysb.core.MultiState(*v) for k, v in site_conditions.items()} monomer = model.monomers[monomer_name] monomer_compartment = model.compartments.get(monomer_compartment_name) # Compartment prefix notation in BNGL means "assign this compartment to # all molecules without their own explicit compartment". compartment = monomer_compartment or species_compartment mp = pysb.core.MonomerPattern(monomer, site_conditions, compartment) monomer_patterns.append(mp) cp = pysb.core.ComplexPattern(monomer_patterns, None) model.species.append(cp) def _parse_reaction(model, line, reaction_cache): """Parse a 'reaction' line from a BNGL net file.""" (number, reactants, products, rate, rule) = line.strip().split(' ', 4) # the -1 is to switch from one-based to zero-based indexing reactants = tuple(int(r) - 1 for r in reactants.split(',') if r != '0') products = tuple(int(p) - 1 for p in products.split(',') if p != '0') rate = rate.rsplit('*') (rule_list, unit_conversion) = re.match( r'#([\w,\(\)]+)(?: unit_conversion=(.*))?\s*$', rule).groups() rule_list = rule_list.split(',') # BNG lists all rules that generate a rxn # Support new (BNG 2.2.6-stable or greater) and old BNG naming convention # for reverse rules rule_name, is_reverse = zip(*[re.subn(r'^_reverse_|\(reverse\)$', '', r) for r in rule_list]) is_reverse = tuple(bool(i) for i in is_reverse) r_names = ['__s%d' % r for r in reactants] rate_param = [model.parameters.get(r) or model.expressions.get(r) or model._derived_parameters.get(r) or model._derived_expressions.get(r) or float(r) for r in rate] combined_rate = sympy.Mul(*[sympy.S(t) for t in r_names + rate_param]) reaction = { 'reactants': reactants, 'products': products, 'rate': combined_rate, 'rule': rule_name, 'reverse': is_reverse, } model.reactions.append(reaction) # bidirectional reactions key = (reactants, products) key_reverse = (products, reactants) if key in reaction_cache: reaction_bd = reaction_cache.get(key) reaction_bd['rate'] += combined_rate reaction_bd['rule'] += tuple(r for r in rule_name if r not in reaction_bd['rule']) elif key_reverse in reaction_cache: reaction_bd = reaction_cache.get(key_reverse) reaction_bd['reversible'] = True reaction_bd['rate'] -= combined_rate reaction_bd['rule'] += tuple(r for r in rule_name if r not in reaction_bd['rule']) else: # make a copy of the reaction dict reaction_bd = dict(reaction) # default to false until we find a matching reverse reaction reaction_bd['reversible'] = False reaction_cache[key] = reaction_bd model.reactions_bidirectional.append(reaction_bd) def _parse_group(model, line): """Parse a 'group' line from a BNGL net file.""" # values are number (which we ignore), name, and species list values = line.strip().split() obs = model.observables[values[1]] if len(values) == 3: # combination is a comma separated list of [coeff*]speciesnumber for product in values[2].split(','): terms = product.split('*') # if no coeff given (just species), insert a coeff of 1 if len(terms) == 1: terms.insert(0, 1) obs.coefficients.append(int(terms[0])) # -1 to change to 0-based indexing obs.species.append(int(terms[1]) - 1) def _convert_tokens(tokens, local_dict, global_dict): for pos, tok in enumerate(tokens): if tok == (tokenize.NAME, 'if'): tokens[pos] = (tokenize.NAME, 'BNGLIf') local_dict['BNGLIf'] = BNGLIf elif tok == (tokenize.OP, '^'): tokens[pos] = (tokenize.OP, '**') elif tok == (tokenize.NAME, 'and'): tokens[pos] = (tokenize.OP, '&') elif tok == (tokenize.NAME, 'or'): tokens[pos] = (tokenize.OP, '|') return tokens def _convert_comparative_operators(tokens, local_dict, global_dict): """ Transforms comparative operators to corresponding Piecewise functions. Supported operators are less-than and greater than (``<``, ``>``, ``<=``, ``>=``). Transformation ensure compatibility with other symbolic operators such as multiplication or division, which is supported in BNGL but not in sympy. """ res1 = sympy_parser._group_parentheses(_convert_comparative_operators)( tokens, local_dict, global_dict ) res2 = _apply_functions(res1, local_dict, global_dict) res3 = _revert_conversion(res2) result = _transform_operator_symbols(res3, local_dict, global_dict) return result def _revert_conversion(tokens): """ Reverts certain conversions of comparative operators to function calls. For settings where a comparative operator appears in an if statement, we actually want to return boolean values. This function reverts the conversion of comparative operators (with boolean return values) to function calls (with numerical return values) in the top level expression of the first argument of an if statement. """ # check if the expression contains an if statement and if so revert all # conversions. In this setting we actually want to return boolean values. # This check needs to happen after grouping of parentheses, as we only want # to check for the if statement in the (current) top level of the # expression. result = tokens revert_ops = [ func.__name__ for func in BNG_COMPARATIVE_OPERATORS.values() ] for pos, tok in enumerate(tokens): if not isinstance(tok, sympy_parser.AppliedFunction): continue if not tok.function == (tokenize.NAME, 'BNGLIf'): continue # we need to revert the conversions we just applied, but # only for the first argument of the if statement and only # at the top level of the expression for arg_pos, arg_tok in enumerate(tok.args): if arg_tok == (tokenize.OP, ','): break if isinstance(arg_tok, sympy_parser.AppliedFunction) and ( arg_tok.function[0] == tokenize.NAME and arg_tok.function[1] in revert_ops ): result[pos].args[arg_pos].function = ( tokenize.NAME, arg_tok.function[1].replace('BNGL', '') ) return result def _apply_functions(tokens, local_dict, global_dict): """Convert a NAME token + ParenthesisGroup into an AppliedFunction. Based on sympy_parser._apply_functions. NB: In contrast to sympy_parser._apply_functions, ParenthesisGroups, if not applied to any function, are _not_ converted back into lists of tokens. """ result = [] symbol = None for tok in tokens: if isinstance(tok, sympy_parser.ParenthesisGroup): if symbol and sympy_parser._token_callable(symbol, local_dict, global_dict): result[-1] = sympy_parser.AppliedFunction(symbol, tok) symbol = None else: result.append(tok) elif tok[0] == tokenize.NAME: symbol = tok result.append(tok) else: symbol = None result.append(tok) return result def _flatten(tokens, local_dict, global_dict): """Flatten AppliedFunctions and Parenthesis Groups into lists of tokens. Based on sympy_parser._flatten. NB: In contrast to sympy_parser._flatten, this function will also flatten ParenthesisGroups and flattening is done recursively. """ result = [] for tok in tokens: if isinstance(tok, sympy_parser.AppliedFunction): result.extend(_flatten(tok.expand(), local_dict, global_dict)) elif isinstance(tok, sympy_parser.ParenthesisGroup): result.extend(_flatten(tok, local_dict, global_dict)) else: result.append(tok) return result def _transform_operator_symbols(tokens, local_dict, global_dict): """Transforms the infix comparative operator symbols to function calls. This is a helper function for ``_convert_comparative_operators``. Works with expressions containing one comparative operator, no nesting and grouped parenthesis. Expressions like ``(1<2)<4`` will not work with this and should be used with ``_convert_comparative_operators``. Examples: 1<2 to BNGLessThan(1,2) 1*2<x to BNGLessThan(1*2, x) 1*(2<x) to 1*BNGLessThan(2, x) """ result = tokens for op, fun in BNG_COMPARATIVE_OPERATORS.items(): if (tokenize.OP, op) not in tokens: continue # enable parsing of the function implementing the operator local_dict[fun.__name__] = fun # find the operator op_idx = tokens.index((tokenize.OP, op)) # insert the function call, with the left and right tokens as arguments # NB: tokenization adds parenthesis based on operator precedence, so we # can assume that the arguments are the tokens immediately before and # after the operator left = tokens[op_idx-1] right = tokens[op_idx+1] # replace the operator and its arguments with the function call args = ParenthesisGroup([ (tokenize.OP, '('), left, (tokenize.OP, ','), right, (tokenize.OP, ')') ]) result = result[:op_idx-1] + [sympy_parser.AppliedFunction( (tokenize.NAME, fun.__name__), args )] + result[op_idx+2:] return result def _is_bool_expr(e): return isinstance(e, Boolean) and not isinstance(e, sympy.AtomicExpr)
[docs] def parse_bngl_expr(text, *args, **kwargs): """Convert a BNGL math expression string to a sympy Expr.""" # Translate a few operators with simple text replacement. text = text.replace('()', '') text = text.replace('==', '=') # Use sympy to parse the text into an Expr. trans = ( *sympy_parser.standard_transformations, sympy_parser.convert_equals_signs, # order is important here _convert_tokens, _convert_comparative_operators, _flatten, ) return sympy_parser.parse_expr( text, *args, transformations=trans, evaluate=False, **kwargs )
[docs] class NoInitialConditionsError(RuntimeError): """Model initial_conditions is empty.""" def __init__(self): RuntimeError.__init__(self, "Model has no initial conditions or " "zero-order synthesis rules")
[docs] class NoRulesError(RuntimeError): """Model rules is empty.""" def __init__(self): RuntimeError.__init__(self, "Model has no rules")