Source code for goflow.models.murray

# @Author: Felix Kramer <kramer>
# @Date:   23-06-2021
# @Email:  kramer@mpi-cbg.de
# @Project: phd_network_remodelling
# @Last modified by:   felix
# @Last modified time: 2022-07-02T13:15:02+02:00

import numpy as np
from dataclasses import dataclass, field
# custom
from .base import model


[docs]@dataclass class murray(model): """ The network adaptation model according to Murray, based on flow dissipation-volume minimization as published in: Murray, The Physiological Principle of Minimum Work, PNAS, 1926 The system's cost function is given as:\n .. math:: \\Gamma = \\sum_{e}\\left(\\frac{f_e^2}{C_e} + ar_e^2\\right) For minimizing this cost one iteratively computes the flow f_e, pressure \\Delta p_e landscapes. The system's derived gradient descent dynamics are given as:\n .. math:: \\frac{dr_e}{dt} \\propto \\left[\\alpha_1\\left( \\frac{\\Delta p_er_e}{L_e}\\right)^2-\\alpha_0\\right] r_e Attributes: pars (dict):\n The specific model parameters \\alpha_0, \\alpha_1. \n ivp_options (dict):\n Information to generate the internal solver_options. Providing t0,t1 number of evaluation and x0.\n model_args (list):\n Model specific paramerets need to evaluate the update rules of the DS \n solver_options (dict):\n Specifying runtime and evaluation marks. \n events (dict):\n Events to consier, here in gernal flatlining events which allow for early termination of stiff simulations. \n null_decimals (in):\n description \n Methods: init() The model post_init function. update_event_func() Update the event function and ensures that solver_options are set. flatlining_default(t, x_0, *args) The default flatlining function for determining the terminal event of the dynamical system. flatlining_dynamic(t, x_0, *args) The dynamic flatlining function for determining the terminal event of the dynamical system. set_model_parameters(model_pars) Set internal model arguments array. set_solver_options(solv_opt) Set internal solver_options and update event function. calc_update_stimuli(t, x_0, *args) The dynamic system's temporal update rule, computing the gradient -dF for dx/dt. calc_cost_stimuli(t, x_0, *args) Computes the dynamic system's Lyapunov function and its gradient. get_stimuli_pars(self, flow, x_0) Update flow & pressure landscapes, recompute conductivity as well as squared quantities. """ pars: dict = field(default_factory=dict, init=True, repr=True) def __post_init__(self): self.init() self.model_args = [1., 1.] self.solver_options.update({'events': 'default'}) if self.pars: self.set_model_parameters(self.pars)
[docs] def set_model_parameters(self, model_pars): for k, v in model_pars.items(): if 'alpha_0' == k: self.model_args[0] = v if 'alpha_1' == k: self.model_args[1] = v
[docs] def set_solver_options(self, solv_opt): for k, v in solv_opt.items(): self.solver_options[k] = v self.update_event_func()
[docs] def calc_update_stimuli(self, t, x_0, flow, alpha_0, alpha_1): x_sq, p_sq = self.get_stimuli_pars(flow, x_0) s1 = 2.*alpha_1*np.multiply(p_sq, x_sq) s2 = alpha_0*np.ones(len(x_0)) ds = np.subtract(s1, s2) dx = 2*np.multiply(ds, x_0) return dx
[docs] def calc_cost_stimuli(self, t, x_0, flow, alpha_0, alpha_1): x_sq, p_sq = self.get_stimuli_pars(flow, x_0) f1 = alpha_1*np.multiply(p_sq, np.power(x_sq, 2)) f2 = alpha_0 * x_sq F = np.sum(np.add(f1, f2)) dF = -self.calc_update_stimuli(t, x_0, flow, alpha_0, alpha_1) return F, dF
[docs] def get_stimuli_pars(self, flow, x_0): """ Update flow & pressure landscapes, recompute conductivity as well as squared quantities. Invokes the internal flow circuit and recalcs flows and pressures as well as conductivities according to Hagen-Poiseuille's law of flows in cylindrical pipes. Args: flow (flow):\n A hailhydro flow object, describing the current flow landscape in the transport network.\n x_0 (array):\n Current state vector of the dynmic system.\n Returns: iterable: \n The squared state vector as well as the squared pressure landscape x_sq, p_sq """ k = flow.circuit.scales['conductance'] src = flow.circuit.nodes['source'] x_sq = np.power(x_0, 2) conductivity = flow.calc_conductivity_from_cross_section(x_sq, k) p_sq, q_sq = flow.calc_sq_flow(conductivity, src) return x_sq, p_sq