Source code for idaes.dmf.resourcedb

##############################################################################
# Institute for the Design of Advanced Energy Systems Process Systems
# Engineering Framework (IDAES PSE Framework) Copyright (c) 2018, by the
# software owners: The Regents of the University of California, through
# Lawrence Berkeley National Laboratory,  National Technology & Engineering
# Solutions of Sandia, LLC, Carnegie Mellon University, West Virginia
# University Research Corporation, et al. All rights reserved.
# 
# Please see the files COPYRIGHT.txt and LICENSE.txt for full copyright and
# license information, respectively. Both files are also available online
# at the URL "https://github.com/IDAES/idaes".
##############################################################################
"""
Resource database.
"""
# system
from datetime import datetime
# third party
import pendulum
import six
from tinydb import TinyDB, Query
# local
from .util import get_logger
from . import errors
from .resource import Resource
from .resource import Triple

_log = get_logger('resourcedb')


[docs]class ResourceDB(object): """A database interface to all the resources within a given DMF workspace. """ def __init__(self, dbfile=None, connection=None): """Initialize from DMF and given configuration field. Args: dbfile (str): DB location connection: If non-empty, this is an existing connection that should be re-used, instead of trying to connect to the location in `dbfile`. Raises: ValueError, if dbfile and connection are both None """ self._db = None self._gr = None if connection is not None: self._db = connection elif dbfile is not None: try: db = TinyDB(dbfile) except IOError: raise errors.FileError('Cannot open resource DB "{}"' .format(dbfile)) self._db = db def __len__(self): return len(self._db)
[docs] def find(self, filter_dict, id_only=False): """Find and return records based on the provided filter. Args: filter_dict (dict): Search filter. For syntax, see docs in :meth:`.dmf.DMF.find`. id_only (bool): If true, return only the identifier of each resource; otherwise a Resource object is returned. Returns: (list of int|Resource) Depending on the value of `id_only` """ def as_resource(_r): _r[Resource.ID_FIELD] = _r.eid return Resource.from_dict(_r) # with no filter, do a find-all if not filter_dict: for r in self._db.all(): if id_only: yield r.eid else: yield as_resource(r) return filter_expr = self._create_filter_expr(filter_dict) # return results for query _log.debug('Find resources matching: {}'.format(filter_expr)) results = self._db.search(filter_expr) for r in results: if id_only: yield r.eid else: yield as_resource(r)
@classmethod def _create_filter_expr(cls, filter_dict): # convert filter to expression for TinyDB q = Query() filter_expr = None def fadd(c): """Update filter, or set to 1st condition, & return new value.""" return c if filter_expr is None else filter_expr & c for k, v in six.iteritems(filter_dict): # A list value means find these value(s) in the # list value of the record. By default, any matching # value is a match. If the key has a '!' appended, then # require all values for a match. if isinstance(v, list): if k.endswith('!'): k = k[:-1] cond = q[k].all(tuple(v)) else: cond = q[k].any(tuple(v)) filter_expr = fadd(cond) else: # There are two types of non-list values: # - equality is just {key: value} # - inequalities are {key: {op: value, ...}} # operators are "$<shell test op>", such as "$lt", # just like in MongoDB; see _op_cond() method for details. if isinstance(v, dict): for op_key, op_value in six.iteritems(v): tv = cls._value_transform(op_value) cond = cls._op_cond(q, k, op_key, tv) filter_expr = fadd(cond) else: tv = cls._value_transform(v) cond = q[k] == tv filter_expr = fadd(cond) return filter_expr @staticmethod def _value_transform(v): if isinstance(v, datetime) or isinstance(v, pendulum.Pendulum): if isinstance(v, datetime): pv = pendulum.create(v.year, v.month, v.day, v.hour, v.minute, v.second, v.microsecond, v.tzname()) else: pv = v return pv.timestamp() else: return v @staticmethod def _op_cond(query, key, op, value): # just a clumsy switch statement.. if op == '$gt': cond = query[key] > value elif op == '$ge': cond = query[key] >= value elif op == '$lt': cond = query[key] < value elif op == '$le': cond = query[key] <= value elif op == '$ne': cond = query[key] != value else: raise ValueError('Unexpected operator: {}'.format(op)) return cond
[docs] def get(self, identifier): item = self._db.get(eid=identifier) if item is None: return None r = Resource.from_dict(item) r.id_ = identifier return r
[docs] def put(self, resource): eid = self._db.insert(resource.as_dict()) # important: set resource's ID to be the document id resource.id_ = eid
[docs] def delete(self, id_=None, idlist=None, filter_dict=None): """Delete one or more resources with given identifiers. Args: id_ (int): If given, delete this id. idlist (list): If given, delete ids in this list filter_dict (dict): If given, perform a search and delete ids it finds. Returns: (list[str]) Identifiers """ if filter_dict is not None: idlist = self.find(filter_dict, id_only=True) elif id_ is not None: idlist = [id_] return self._db.remove(eids=idlist)
[docs] def update(self, id_, new_dict): """Update the identified resource with new values. Args: id_ (int): Identifier of resource to update new_dict (dict): New dictionary of resource values, e.g. result of `Resource.as_dict()`. Returns: int: The id_ of the resource, or None if it was not updated. Raises: ValueError: If new resource is of wrong type """ old_dict = self._db.get(eid=id_) if old_dict is None: return None type_ = Resource.TYPE_FIELD new_type = new_dict.get(type_, old_dict[type_]) if new_type != old_dict[type_]: raise ValueError('New resource type="{}" does not ' 'match current resource type "{}"' .format(new_dict[type_], old_dict[type_])) changed = {k: v for k, v in six.iteritems(new_dict) if old_dict.get(k, None) != v} # note: above does not check for missing/new keys self._db.update(changed, eids=[id_]) return id_