Skip to content
Snippets Groups Projects
Commit 99ee11bf authored by abaucher's avatar abaucher
Browse files

Added comments to parse dynamo and system, added error when time misssing

parent 527b3649
No related branches found
No related tags found
No related merge requests found
"""Parse DYNAMO special functions.
"""
import ast import ast
from astunparse import unparse from astunparse import unparse
from .parse_equations import reformat_eq from .parse_equations import reformat_eq
# List of special functions to handle
instance_fun_names = {'smooth', 'sample', 'dlinf3', 'delay3', 'tabhl', 'step'} instance_fun_names = {'smooth', 'sample', 'dlinf3', 'delay3', 'tabhl', 'step'}
def change_and_get_params(node, node_name): def change_and_get_params(node, node_name):
# Node_name is the name of the updated node """Get information needed to call the special function in the `node`, and also change the `node` to include appropriate parameter names.
Parameters
----------
node : ast.Module
Function call to handle.
node_name : str
Name of the node (variable or constant) which uses this function to be upated. This name is useful to determine the new name of the function (es: `tabhl_io` for the funciton `tabhl` and the updated variable `io`.
Returns
-------
dict
All useful information to generate the special function. Depends on the type of the function.
"""
name = node.func.id name = node.func.id
# For every type of special function, different treatments apply.
if name == 'tabhl': if name == 'tabhl':
params = {'table': node.args[0].id, params = {'table': node.args[0].id,
'val': unparse(node.args[1]), 'val': unparse(node.args[1]),
...@@ -18,7 +37,7 @@ def change_and_get_params(node, node_name): ...@@ -18,7 +37,7 @@ def change_and_get_params(node, node_name):
new_fun_name = f"tabhl_{node_name}" new_fun_name = f"tabhl_{node_name}"
params['fun'] = new_fun_name params['fun'] = new_fun_name
node.args = [node.args[1]] node.args = [node.args[1]]
# node.func.id = new_fun_name
elif name in {'smooth', 'dlinf3', 'delay3'}: elif name in {'smooth', 'dlinf3', 'delay3'}:
params = {'val': node.args[0].value.id, params = {'val': node.args[0].value.id,
...@@ -26,7 +45,7 @@ def change_and_get_params(node, node_name): ...@@ -26,7 +45,7 @@ def change_and_get_params(node, node_name):
new_fun_name = f"{name}_{node_name}" new_fun_name = f"{name}_{node_name}"
params['fun'] = new_fun_name params['fun'] = new_fun_name
node.args = [node.args[0], node.args[1], ast.Name('k')] node.args = [node.args[0], node.args[1], ast.Name('k')]
# node.func.id = new_fun_name
elif name == 'sample': elif name == 'sample':
params = {'fun': f'sample_{node_name}', params = {'fun': f'sample_{node_name}',
...@@ -42,7 +61,24 @@ def change_and_get_params(node, node_name): ...@@ -42,7 +61,24 @@ def change_and_get_params(node, node_name):
return params return params
def get_dynamo_fun_params(root, node_name): def get_dynamo_fun_params(root, node_name):
"""Get information needed to execute the equation `node`, and also change `node` with appropriate parameters.
Parameters
----------
node : ast.Module
Equation.
node_name : str
Name of the node (variable or constant) which uses this function to be upated. This name is useful to determine the new name of the function (es: `tabhl_io` for the funciton `tabhl` and the updated variable `io`.
Returns
-------
(ast.Module, dict)
The modified node and all useful information to generate the equation.
"""
root = root.body[0].value root = root.body[0].value
# The node is walked through to detect and change a special function call
# Only one special DYNAMO function is allowed in an equation ...
for node in ast.walk(root): for node in ast.walk(root):
if isinstance(node, ast.Call): if isinstance(node, ast.Call):
if node.func.id in instance_fun_names: if node.func.id in instance_fun_names:
......
"""Functions to parse an entire pydynamo code and generate a System object. Also defines every political changes.
"""
import ast import ast
import re import re
import inspect import inspect
...@@ -8,21 +10,41 @@ from .specials import step, clip ...@@ -8,21 +10,41 @@ from .specials import step, clip
from .system import System from .system import System
def get_comment(line): def get_comment(line):
"""Retrieve comment by removing '#' and additional spaces.
"""
if '#' in line: if '#' in line:
return line.split('#', 1)[1].strip() return line.split('#', 1)[1].strip()
return '' return ''
def get_nodes_eqs_dicts(lines): def get_nodes_eqs_dicts(lines):
"""
Parameters
----------
lines : iterable(str)
List of every equations.
Returns
-------
(dict, dict, dict)
Nodes (variable, constants and functions), equations (constant, update and initialisation) and comments parsed in the equations list.
"""
all_eqs = {name: dict() for name in ['cst', 'update', 'init']} all_eqs = {name: dict() for name in ['cst', 'update', 'init']}
all_nodes = {name: set() for name in ['cst', 'var', 'fun']} all_nodes = {name: set() for name in ['cst', 'var', 'fun']}
comments = {} comments = {}
# For each equation, try to detect its type and retrive informations.
for l in lines: for l in lines:
root = ast.parse(l) root = ast.parse(l)
type_found = False type_is_identified = False
for eq_type in all_eqs:
for eq_type in all_eqs.keys():
if is_eq_of_type(root, eq_type): if is_eq_of_type(root, eq_type):
type_found = True type_is_identified = True
# Get assigned node and equation arguments
try: try:
node, args = get_pars_eq(root, eq_type) node, args = get_pars_eq(root, eq_type)
args['raw_line'] = l.split('#')[0].strip() args['raw_line'] = l.split('#')[0].strip()
...@@ -56,46 +78,110 @@ def get_nodes_eqs_dicts(lines): ...@@ -56,46 +78,110 @@ def get_nodes_eqs_dicts(lines):
all_nodes['fun'].add(arg_node) all_nodes['fun'].add(arg_node)
if root.body and not type_is_identified:
if root.body and not type_found:
raise(SyntaxError(f"Invalid equation:\n {l}")) raise(SyntaxError(f"Invalid equation:\n {l}"))
return all_nodes, all_eqs, comments return all_nodes, all_eqs, comments
def system_from_nodes_eqs(nodes, eqs, comments, s=None, prepare=True): def system_from_nodes_eqs(nodes, eqs, comments, s=None, prepare=True):
"""Get a system from nodes, equations and comments dictionnaries. If a system is given, just add this equations, nodes and comments to it.
Parameters
----------
nodes : dict
Node (variables, constants or functions) names.
eqs : dict
Equations (constant, initialisation or updates).
comments : dict(str: str)
Comments of each node.
s : System
System to which we add changes.
prepare : bool
If True, set all constants, and functions (constants, update and init), but not special functions.
"""
# In case a System is given, eventually change variables to constants and vice-versa
if s: if s:
for n, args in eqs['cst'].items(): for n, args in eqs['cst'].items():
if n in s.eqs['update']: if n in s.eqs['update']:
change_var_to_cst(s, n) change_var_to_cst(s, n)
for n, args in eqs['update'].items(): for n, args in eqs['update'].items():
if n in s.eqs['cst']: if n in s.eqs['cst']:
change_cst_to_var(s, n) change_cst_to_var(s, n)
if not s: if not s:
s = System() s = System()
s.add_all_nodes(nodes) s.add_all_nodes(nodes)
s.add_all_eqs(eqs) s.add_all_eqs(eqs)
s.add_comments(comments) s.add_comments(comments)
if prepare: if prepare:
s.prepare() s.prepare()
return s return s
def system_from_lines(lines, s=None, prepare=True): def system_from_lines(lines, s=None, prepare=True):
"""Get a system from a list of pydynamo equations. If a system is given, just add this equations to it.
Parameters
----------
lines : iterable(str)
List of every pydynamo equations.
s : System
System to which we add changes.
prepare : bool
If True, set all constants, and functions (constants, update and init), but not special functions.
"""
nodes, eqs, comments = get_nodes_eqs_dicts(lines) nodes, eqs, comments = get_nodes_eqs_dicts(lines)
return system_from_nodes_eqs(nodes, eqs, comments, s, prepare) return system_from_nodes_eqs(nodes, eqs, comments, s, prepare)
def system_from_file(filename, s=None, prepare=True): def system_from_file(filename, s=None, prepare=True):
"""Get a system from a file with pydynamo equations. If a system is given, just add this equations to it.
Parameters
----------
filename : str
Files in which every pydynamo equations are written.
s : System
System to which we add changes.
prepare : bool
If True, set all constants, and functions (constants, update and init), but not special functions.
"""
check_file(filename) check_file(filename)
with open(filename, 'r') as f: with open(filename, 'r') as f:
return system_from_lines(f.readlines(), s, prepare) return system_from_lines(f.readlines(), s, prepare)
def system_from_fun(fun, s=None, prepare=True): def system_from_fun(fun, s=None, prepare=True):
"""Get a system from a function which lists pydynamo equations. If a system is given, just add this equations to it.
Parameters
----------
fun : function
Function in which every pydynamo equations are written.
s : System
System to which we add changes.
prepare : bool
If True, set all constants, and functions (constants, update and init), but not special functions.
Examples
--------
Just write pydynamo equations inside a function, and create a System with it:
>>> def custom_equations():
>>> pop.i = 100
>>> pop.k = pop.j /2 # Population
>>> custom_system = system_from_fun(custom_equations)
"""
lines =[l.strip() lines =[l.strip()
for l in inspect.getsource(fun).split('\n')[1:]] for l in inspect.getsource(fun).split('\n')[1:]]
return system_from_lines(lines, s, prepare) return system_from_lines(lines, s, prepare)
def check_file(filename): def check_file(filename):
"""Check if every line in the file can be parsed by the ast module. If not, raise an error.
Parameters
----------
filename : str
Files in which every pydynamo equations are written.
"""
with open(filename, 'r') as f: with open(filename, 'r') as f:
for i, l in enumerate(f.readlines()): for i, l in enumerate(f.readlines()):
try: try:
...@@ -105,43 +191,125 @@ def check_file(filename): ...@@ -105,43 +191,125 @@ def check_file(filename):
e.lineno = i + 1 e.lineno = i + 1
raise raise
def new_cst_politic(s, cst, year, val2): def new_system(raw, s=None):
"""Add new equations for cst, which becomes a variable changing from initval to endval during time interval from year""" """Create a new system from either a file, a list, or a function, including pydynamo equations.
Parameters
----------
raw : str, iterable(str) or function
If str, open the file named `raw` and read pydynamo equations inside (see parse_system.system_from_file`). If iterable(str), each element is a pydynamo equations (see `parse_system.system_from_lines`). If function, read pydynamo equations written inside (see `parse_system.system_from_fun`).
Returns
-------
System
System object.
"""
if callable(raw):
return system_from_fun(raw, s=s)
elif isinstance(raw, list):
return system_from_lines(raw, s=s)
elif isinstance(raw, str):
return system_from_file(raw, s=s)
def new_cst_politic(s, cst, date, new_value):
"""Add a new equations to `s` for the constant `cst`, which becomes a variable changing from its former value to `new_value` after the `date`.
Parameters
----------
s : System
The system to implement the new politic.
cst : str
Constant name.
date : float
Date of politic application.
new_value : float
The new value that `cst` will take after the date `date`.
"""
assert cst in s.eqs['cst'], f"{cst} is not a constant" assert cst in s.eqs['cst'], f"{cst} is not a constant"
initval= s.eqs['cst'][cst]['line'] initval= s.eqs['cst'][cst]['line']
del s.eqs['cst'][cst] del s.eqs['cst'][cst]
# New equations # New equations
eq_clip = f"{cst}.k = clip({cst}2, {cst}1, time.k, {cst}_date) # {s.get_comment(cst)} "
eq_clip = f"{cst}.k = clip({cst}2, {cst}1, time.k, {cst}_year) # {s.get_comment(cst)} " eq_date = f"{cst}_date = {date} # Date of {cst} change"
eq_year = f"{cst}_year = {year} # Date of {cst} change" eq_cst1 = f"{cst}1 = {initval} # {s.get_comment({cst})} before {cst}_date"
eq_cst1 = f"{cst}1 = {initval} # {s.get_comment({cst})} before {cst}_year" eq_cst2 = f"{cst}2 = {new_value} # {s.get_comment({cst})} after {cst}_date"
eq_cst2 = f"{cst}2 = {val2} # {s.get_comment({cst})} after {cst}_year"
# Change calls from cst to var # Change calls from cst to var
change_cst_to_var(s, cst) change_cst_to_var(s, cst)
system_from_lines([eq_clip, eq_year, eq_cst1, eq_cst2], s=s, prepare=True) # Insert new equations
system_from_lines([eq_clip, eq_date, eq_cst1, eq_cst2], s=s, prepare=True)
def new_table_politic(s, var, year, val2): def new_table_politic(s, var, date, new_value):
"""Add new equations to change table from year""" """Add a new equations to `s` for the table used by the variable `var`. The table changes from its former value to `new_value` after the `date`.
Parameters
----------
s : System
The system to implement the new politic.
var : str
Variable name.
date : float
Date of politic application.
new_value : list(float)
The new value that the table will take after the date `date`.
"""
assert f'tabhl' in s.eqs['update'][var]['args']['fun'], f"{var} hasn't tabhl function" assert f'tabhl' in s.eqs['update'][var]['args']['fun'], f"{var} hasn't tabhl function"
table_name = s.eqs['update'][var]['args']['fun']['tabhl']['table'] table_name = s.eqs['update'][var]['args']['fun']['tabhl']['table']
table_init_val = s.eqs['cst'][table_name]['line'] table_init_val = s.eqs['cst'][table_name]['line']
var_line = s.eqs['update'][var]['raw_line'] var_line = s.eqs['update'][var]['raw_line']
eq_table_1 = f"{table_name}1 = {table_init_val} # {s.get_comment(table_name)} before {year}" # Define new equations
eq_table_2 = f"{table_name}2 = {list(val2)} # {s.get_comment(table_name)} after {year}" eq_table_1 = f"{table_name}1 = {table_init_val} # {s.get_comment(table_name)} before {date}"
eq_table_2 = f"{table_name}2 = {list(new_value)} # {s.get_comment(table_name)} after {date}"
eq_var_1 = var_line.replace(f'{var}.k', f'{var}1.k').replace(table_name, table_name + '1') eq_var_1 = var_line.replace(f'{var}.k', f'{var}1.k').replace(table_name, table_name + '1')
eq_var_2 = var_line.replace(f'{var}.k', f'{var}2.k').replace(table_name, table_name + '2') eq_var_2 = var_line.replace(f'{var}.k', f'{var}2.k').replace(table_name, table_name + '2')
eq_var = f"{var}.k = clip({var}2.k, {var}1.k, time.k, {year}) # {s.get_comment(var)}" eq_var = f"{var}.k = clip({var}2.k, {var}1.k, time.k, {date}) # {s.get_comment(var)}"
# Insert new equations
system_from_lines([eq_table_1, eq_table_2, eq_var_1, eq_var_2, eq_var], s=s) system_from_lines([eq_table_1, eq_table_2, eq_var_1, eq_var_2, eq_var], s=s)
def new_var_politic(s, var, date, eq2):
"""Add a new equations to `s` for the variable `cst`, which changes from its former value to `new_value` after the `date`.
Parameters
----------
s : System
The system to implement the new politic.
var : str
Variable name.
date : float
Date of politic application.
new_value : str
The new value, written with the pydynamo syntax, that the variable `var` will take after the date `date`.
"""
assert var in s.eqs['update'], f"{var} is not a variable"
first_line = s.eqs['update'][var]['raw_line'].split('=')[1]
line_init = None
if var in s.eqs['init']:
line_init = s.eqs['init'][var]['raw_line'].split('=')[1]
del s.eqs['update'][var]
# Define new equations
eq_clip = f"{var}.k = clip({var}2.k, {var}1.k, time.k, {var}_date) # {s.get_comment(var)}"
eq_date = f"{var}_date = {date} # Date of {var} change"
eq_var1 = f"{var}1.k = {first_line} # {s.get_comment(var)} before {var}_date"
eq_var2 = f"{var}2.k = {eq2} # {s.get_comment(var)} after {var}_date"
eq_init1 = ''
eq_init2 = ''
if line_init:
eq_init1 = f"{var}1.i = {line_init}"
eq_init2 = f"{var}2.i = 0"
# Insert new equations
system_from_lines([eq_clip, eq_date, eq_var1, eq_var2, eq_init1, eq_init2], s=s, prepare=True)
def change_cst_to_var(s, cst): def change_cst_to_var(s, cst):
# TODO: FAIRE AVEC INIT # TODO: FAIRE AVEC INIT
"""Change a constant for a variable""" """Change a constant for a variable. TODO: Write documentation."""
s.nodes['cst'].remove(cst) s.nodes['cst'].remove(cst)
for v, args in s.eqs['update'].items(): for v, args in s.eqs['update'].items():
if cst in args['args']['cst']: if cst in args['args']['cst']:
...@@ -154,7 +322,7 @@ def change_cst_to_var(s, cst): ...@@ -154,7 +322,7 @@ def change_cst_to_var(s, cst):
def change_var_to_cst(s, var): def change_var_to_cst(s, var):
"""Change a variable for a constant""" """Change a variable for a constant. TODO: Write documentation."""
pass pass
# s.nodes['var'].remove(var) # s.nodes['var'].remove(var)
...@@ -166,34 +334,3 @@ def change_var_to_cst(s, var): ...@@ -166,34 +334,3 @@ def change_var_to_cst(s, var):
# new_raw_line = re.sub(f'(?<!\w){var}(?!\w)', f'{var}.k', args['raw_line']) # new_raw_line = re.sub(f'(?<!\w){var}(?!\w)', f'{var}.k', args['raw_line'])
# args['raw_line'] = new_raw_line # args['raw_line'] = new_raw_line
# args['line'] = new_line # args['line'] = new_line
def new_system(raw, s=None):
"""returns a new System object from the data included in raw"""
if callable(raw):
return system_from_fun(raw, s=s)
elif isinstance(raw, list):
return system_from_lines(raw, s=s)
elif isinstance(raw, str):
return system_from_file(raw, s=s)
def new_var_politic(s, var, year, eq2):
"""Add new equations for var, which becomes a variable changing from initval to endval during time interval from year"""
assert var in s.eqs['update'], f"{var} is not a variable"
first_line = s.eqs['update'][var]['raw_line'].split('=')[1]
line_init = None
if var in s.eqs['init']:
line_init = s.eqs['init'][var]['raw_line'].split('=')[1]
del s.eqs['update'][var]
# New equations
eq_clip = f"{var}.k = clip({var}2.k, {var}1.k, time.k, {var}_year) # {s.get_comment(var)}"
eq_year = f"{var}_year = {year} # Date of {var} change"
eq_var1 = f"{var}1.k = {first_line} # {s.get_comment(var)} before {var}_year"
eq_var2 = f"{var}2.k = {eq2} # {s.get_comment(var)} after {var}_year"
eq_init1 = ''
eq_init2 = ''
if line_init:
eq_init1 = f"{var}1.i = {line_init}"
eq_init2 = f"{var}2.i = 0"
system_from_lines([eq_clip, eq_year, eq_var1, eq_var2, eq_init1, eq_init2], s=s, prepare=True)
...@@ -182,8 +182,8 @@ def plot_world_03(s, title=None, with_legend=False): ...@@ -182,8 +182,8 @@ def plot_world_03(s, title=None, with_legend=False):
scales = [scale_03_state, scale_03_life, scale_03_indices] scales = [scale_03_state, scale_03_life, scale_03_indices]
try: try:
time = s.time time = s.time
except KeyError: except AttributeError as e:
raise Exception("System must be ran before plot !") raise Exception("System must be run before plot !") from e
if with_legend: if with_legend:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment