# -*- coding: UTF-8 -*-
##############################################################################
# Institute for the Design of Advanced Energy Systems Process Systems
# Engineering Framework (IDAES PSE Framework) Copyright (c) 2018, by the
# software owners: The Regents of the University of California, through
# Lawrence Berkeley National Laboratory, National Technology & Engineering
# Solutions of Sandia, LLC, Carnegie Mellon University, West Virginia
# University Research Corporation, et al. All rights reserved.
#
# Please see the files COPYRIGHT.txt and LICENSE.txt for full copyright and
# license information, respectively. Both files are also available online
# at the URL "https://github.com/IDAES/idaes".
##############################################################################
"""
This module contains miscellaneous utility functions that of general use in
IDAES models.
"""
import pyutilib.services
from pyomo.environ import Block, Param
__author__ = "Qi Chen, John Siirola, Andrew Lee"
[docs]def doNothing(*args, **kwargs):
"""Do nothing.
This function is useful for instances when you want to call a function, if
it exists. For example: getattr(unit, 'possibly_defined_function',
getNothing)()
Args:
*args (anything): accepts any argument
**kwargs (anything): accepts any keyword arguments
Returns:
None
"""
pass
[docs]def get_time(results):
"""Retrieve the solver-reported elapsed time, if available."""
time = getattr(results.solver, 'time', None)
if time is not None:
return time
time = getattr(results.solver, 'wallclock_time', None)
if time is not None:
return time
time = getattr(results.solver, 'User time', None)
if time is not None:
return time
else:
raise NotImplementedError('Could not determine solver time')
[docs]def round_(n, *args, **kwargs):
"""Round the number.
This function duplicates the functionality of round, but when passed
positive or negative infinity, simply returns the argument.
"""
try:
return round(n, *args, **kwargs)
except OverflowError:
return n
[docs]def smooth_abs(a, eps=1e-4):
"""General function for creating an expression for a smooth minimum or
maximum.
Args:
a : term to get absolute value from (Pyomo component, float or int)
eps : smoothing parameter (Param, float or int) (default=1e-4)
Returns:
An expression for the smoothed absolute value operation.
"""
# Check type of eps
if not (isinstance(eps, (float, int, Param))):
raise TypeError("smooth_abs eps argument must be a float, int or "
"Pyomo Param")
# Create expression
try:
expr = (a**2 + eps**2)**0.5
except TypeError:
raise TypeError("Unsupported argument type for smooth_abs. Must be "
"a Pyomo Var, Param or Expression, or a float or int.")
return expr
[docs]def smooth_minmax(a, b, eps=1e-4, sense='max'):
"""General function for creating an expression for a smooth minimum or
maximum.
Args:
a : first term in mix or max function (Pyomo component, float or int)
b : second term in min or max function (Pyomo component, float or int)
eps : smoothing parameter (Param, float or int) (default=1e-4)
sense : 'mim' or 'max' (default = 'max')
Returns:
An expression for the smoothed minimum or maximum operation.
"""
# Check type of eps
if not (isinstance(eps, (float, int, Param))):
raise TypeError("Smooth {} eps argument must be a float, int or "
"Pyomo Param".format(sense))
# Set sense of expression
if sense == 'max':
mm = 1
elif sense == 'min':
mm = -1
else:
raise ValueError("Unrecognised sense argument to smooth_minmax. "
"Must be 'min' or 'max'.")
# Create expression
try:
expr = 0.5*(a+b+mm*smooth_abs(a-b, eps))
except TypeError:
raise TypeError("Unsupported argument type for smooth_{}. Must be "
"a Pyomo Var, Param or Expression, or a float or int."
.format(sense))
return expr
[docs]def smooth_max(a, b, eps=1e-4):
"""Smooth maximum operator.
Args:
a : first term in max function
b : second term in max function
eps : smoothing parameter (Param or float, default = 1e-4)
Returns:
An expression for the smoothed maximum operation.
"""
expr = smooth_minmax(a, b, eps, sense='max')
return expr
[docs]def smooth_min(a, b, eps=1e-4):
"""Smooth minimum operator.
Args:
a : first term in min function
b : second term in min function
eps : smoothing parameter (Param or float, default = 1e-4)
Returns:
An expression for the smoothed minimum operation.
"""
expr = smooth_minmax(a, b, eps, sense='min')
return expr
[docs]def category(*args):
"""Decorate tests to enable tiered testing.
Suggested categories:
1. frequent
2. nightly
3. expensive
4. research
Args:
*args (tuple of strings): categories to which the test belongs
Returns:
function: Either the original test function or skip
"""
import os
import unittest
if 'idaes_test_level' in os.environ:
_active_categories = os.environ['idaes_test_level'].strip()
try:
# See if the environment variable is a number. If yes, then run all
# the categories up and including that number.
test_level = int(_active_categories)
categories = ('frequent', 'nightly', 'expensive', 'research')
_active_categories = categories[:test_level]
except ValueError:
# For now, only support entry of one category here
_active_categories = (_active_categories,)
else:
_active_categories = ('frequent',)
if 'idaes_test_exclude' in os.environ:
_exclude_categories = os.environ['idaes_test_exclude'].strip()
_exclude_categories = (_exclude_categories,)
else:
_exclude_categories = ()
if any(cat in args for cat in _exclude_categories):
return unittest.skip(
'Test categories {} contains one of excluded categories "{}"'
.format(sorted(args), sorted(_exclude_categories)))
elif any(cat in args for cat in _active_categories):
def wrapper(func):
return func
return wrapper
else:
return unittest.skip(
'Test categories {} do not match active categories "{}"'.format(
sorted(args), sorted(_active_categories)))
[docs]def requires_solver(solver):
"""Decorate test to skip if a solver isn't available."""
from pyomo.opt import SolverFactory
import unittest
if not SolverFactory(solver).available():
return unittest.skip(
'Required solver {} is not available.'.format(solver))
else:
def wrapper(func):
return func
return wrapper
[docs]def get_pyomo_tmp_files():
"""
Make Pyomo write it's temporary files to the current working directory,
useful for checking nl, sol, and log files for ASL solvers without needing
to track down the temporary file location.
"""
pyutilib.services.TempfileManager.tempdir = './'
[docs]def hhmmss(sec_in):
"""
Convert elapsed time in seconds to "d days hh:mm:ss.ss" format.
This is nice for things that take a long time.
"""
h = int(sec_in // 3600)
m = int(sec_in % 3600 // 60)
s = sec_in % 3600 % 60
if h < 24:
hstr = "{0:0>2}".format(h)
elif h >= 24 and h < 48:
hstr = "1 day {0:0>2}".format(h % 24)
else:
hstr = "{0} days {1:0>2}".format(h // 24, h % 24)
return "{0}:{1:0>2}:{2:0>5.2f}".format(hstr, m, s)
[docs]def dict_set(v, d, pre_idx=None, post_idx=None, fix=False):
"""
Set the values of array variables based on the values stored in a
dictionary. There may already be a better way to do this. Should
look into it.
The value of Pyomo variable element with index key is set to d[key]
Arguments:
v: Indexed Pyomo variable
d: dictonary to set the variable values from, keys should match a subset
of Pyomo variable indexes.
pre_idx: fixed indexes before elements to be set or None
post_idx: fixed indexes after elements to be set or None
fix: bool, fix the variables (otional)
"""
# TODO: improve doc string need to work out a good explaination <JCE>
if pre_idx is None and post_index is None:
for key in d:
v[key].value = d[key]
if fix:
v[key].fixed = fix
else:
if pre_idx is None:
pre_idx = ()
if post_idx is None:
post_idx = ()
if not isinstance(pre_idx, tuple):
pre_idx = (pre_idx,)
if not isinstance(post_idx, tuple):
post_idx = (post_idx,)
for key in d:
if not isinstance(key, tuple):
key2 = (key,)
else:
key2 = key
v[pre_idx + key2 + post_idx].value = d[key]
if fix:
v[pre_idx + key2 + post_idx].fixed = fix
# HACK, courtesy of J. Siirola
[docs]def solve_indexed_blocks(solver, blocks, **kwds):
"""
This method allows for solving of Indexed Block components as if they were
a single Block. A temporary Block object is created which is populated with
the contents of the objects in the blocks argument and then solved.
Args:
solve : a Pyomo solver object to use when solving the Indexed Block
blocks : an object which inherits from Block, or a list of Blocks
kwds : a dict of argumnets to be passed to the solver
Returns:
A Pyomo solver results object
"""
# Check blocks argument, and convert to a list of Blocks
if isinstance(blocks, Block):
blocks = [blocks]
try:
# Create a temporary Block
tmp = Block(concrete=True)
nBlocks = len(blocks)
# Iterate over indexed objects
for i, b in enumerate(blocks):
# Check that object is a Block
if not isinstance(b, Block):
raise TypeError("Trying to apply solve_indexed_blocks to "
"object containing non-Block objects")
# Append components of BlockData to temporary Block
try:
tmp._decl["block_%s" % i] = i
tmp._decl_order.append((b, i+1 if i < nBlocks-1 else None))
except:
raise Exception("solve_indexed_blocks method failed adding "
"components to temporary block.")
# Set ctypes on temporary Block
tmp._ctypes[Block] = [0, nBlocks-1, nBlocks]
# Solve temporary Block
results = solver.solve(tmp, **kwds)
finally:
# Clean up temporary Block contents so they are not removed when Block
# is garbage collected.
tmp._decl = {}
tmp._decl_order = []
tmp._ctypes = {}
# Return results
return results
[docs]def add_object_ref(local_block, local_name, external_component):
"""
Add a reference in a model to non-local Pyomo component. This is used when
one Block needs to make use of a component in another Block as if it were
part of the local block.
Args:
local_block : Block in which to add reference
local_name : str name for referenced object to use in local_block
external_component : external component being referenced
Returns:
None
"""
# Check if local_name already exists in local_block
if hasattr(local_block, local_name):
raise AttributeError("Cannot add object reference. Object {} already "
"has an attribute {}."
.format(local_block, local_name))
# If no Exception raised, add reference to local_block
object.__setattr__(local_block, local_name, external_component)
[docs]def fix_port(port, var, comp=None, value=None, port_idx=None):
"""
Method for fixing Vars in Ports.
Args:
port : Port object in which to fix Vars
var : variable name to be fixed (as str)
comp : index of var to be fixed (if applicable, default = None)
value : value to use when fixing var (default = None)
port_idx : list of Port elements at which to fix var. Must be list
of valid indices,
Returns:
None
"""
if port_idx is None:
if comp is None:
if value is None:
port[...].vars[var].fix()
else:
port[...].vars[var].fix(value)
else:
if value is None:
port[...].vars[var][comp].fix()
else:
port[...].vars[var][comp].fix(value)
else:
for k in port_idx:
if comp is None:
if value is None:
port[k].vars[var].fix()
else:
port[k].vars[var].fix(value)
else:
if value is None:
port[k].vars[var][comp].fix()
else:
port[k].vars[var][comp].fix(value)
[docs]def unfix_port(port, var, comp=None, port_idx=None):
"""
Method for unfixing Vars in Ports.
Args:
port : Port object in which to unfix Vars
var : variable name to be unfixed (as str)
comp : index of var to be unfixed (if applicable, default = None)
port_idx : list of Port elements at which to unfix var. Must be
list of valid indices,
Returns:
None
"""
if port_idx is None:
if comp is None:
port[...].vars[var].unfix()
else:
port[...].vars[var][comp].unfix()
else:
for k in port_idx:
if comp is None:
port[k].vars[var].unfix()
else:
port[k].vars[var][comp].unfix()