Source code for idaes.dmf.validate

##############################################################################
# Institute for the Design of Advanced Energy Systems Process Systems
# Engineering Framework (IDAES PSE Framework) Copyright (c) 2018, by the
# software owners: The Regents of the University of California, through
# Lawrence Berkeley National Laboratory,  National Technology & Engineering
# Solutions of Sandia, LLC, Carnegie Mellon University, West Virginia
# University Research Corporation, et al. All rights reserved.
# 
# Please see the files COPYRIGHT.txt and LICENSE.txt for full copyright and
# license information, respectively. Both files are also available online
# at the URL "https://github.com/IDAES/idaes".
##############################################################################
"""
XXX: This module is going way soon -dang 10/26/18
"""
# stdlib
import json
import logging
import os
import random
import re
import shutil
import tempfile
import textwrap
# third-party
from jinja2 import Environment, PackageLoader, select_autoescape
import jsonschema
import six
import yaml as _yaml
# package
from .util import get_logger, get_file

_log = get_logger('validate')

TEMPLATE_EXT = '.template'
SCHEMA_EXT = '.json'


[docs]class JsonSchemaValidator(object): """Validate JSON documents against schemas defined in this package. The schemas are in the "schemas/" directory of the package. They are first processed as Jinja2 templates, to allow for flexible re-use of common schema elements. The actual resulting schema is stored in a temporary directory that is removed when this class is deleted. Example usage:: vdr = JsonSchemaValidator() # Validate document against the "foobar" schema. ok, msg = vdr.validate({'foo': '1', 'bar': 2}, 'foobar') if ok: print("Success!") else: print("Failed: {}".format(msg)) # Validate input YAML file against the "config" schema ok, msg = vdr.validate('/path/to/my_config.yaml', 'config', yaml=True) if ok: print("Success!") else: print("Failed: {}".format(msg)) """ def __init__(self, modpath='idaes.dmf', directory='schemas', do_not_cache=False): """Constructor. Args: modpath (str): Package/module path directory (str): Directory at module path location do_not_cache (bool): If True, do not cache anything. This is equivalent to calling :meth:`reset` before every call to :meth:`validate`. """ self._init_cache() # configure Jinja2 self._jinja_env = Environment( loader=PackageLoader(modpath, directory), autoescape=select_autoescape(disabled_extensions=[ TEMPLATE_EXT]) ) self._always_reset = do_not_cache def __del__(self): self._clear_cache()
[docs] def validate(self, doc, schema, yaml=False): """Validate a JSON file against a schema. Args: doc (str|file|list|dict): Input filename or object. May be JSON or YAML. Also may be a list/dict, which is assumed to represent parsed JSON. schema (str): Name of schema in this package. This will be the name, without the `.template` suffix, of a file in the 'schemas/' directory. yaml (bool): If true, use the YAML parser instead of the JSON parser on the input file. Returns: (bool, str) Pair whose first value is whether it validated and second is set to the error message if it did not. Raises: IOError if either file cannot be opened. ValueError if either file cannot be parsed. """ if self._always_reset: self.reset() if doc is None: doc = {} elif isinstance(doc, list) or isinstance(doc, dict): pass else: try: j_f = get_file(doc) except IOError as err: raise IOError('Cannot open JSON file to validate: {}' .format(err)) if yaml: try: doc = _yaml.load(j_f) except _yaml.YAMLError as err: raise ValueError('Cannot parse YAML file to validate: {}' .format(err)) else: try: doc = json.load(j_f) except json.JSONDecodeError as err: raise ValueError('Cannot parse JSON file to validate: {}' .format(err)) sch = self._load_schema(schema) if _log.isEnabledFor(logging.DEBUG): preview = json.dumps(doc) if len(preview) > 155: preview = preview[:155] + '...' _log.debug('validating instance: "{}"'.format(preview)) try: jsonschema.validate(doc, sch) result = (True, '') except jsonschema.ValidationError: errlist = jsonschema.Draft4Validator(sch).iter_errors(doc) result = (False, self._describe_validation_error(errlist)) return result
[docs] def reset(self): """Clear cached schemas, so that changes in the base templates are picked up by the validation code. """ self._clear_cache() self._init_cache()
[docs] def get_schema(self, schema): """Load the schema and return it as a Python (dict) object. See :meth:`validate` for details. Args: schema (str): Schema name. Same as `schema` arg to :meth:`validate` Returns: dict: Parsed schema Raises: IOError if file cannot be opened. ValueError if file cannot be parsed. """ return self._load_schema(schema)
def _init_cache(self): self._cache_dir = tempfile.mkdtemp() self._cached_schemas = {} def _clear_cache(self): # delete cache dir manually try: shutil.rmtree(self._cache_dir) except IOError: pass self._cached_schemas = {} def _load_schema(self, schema_name): schema = self._cached_schemas.get(schema_name, None) if schema is None: schema_filename = self._schema_file(schema_name) if not os.path.exists(schema_filename): self._build_schema(schema_name) try: schema = json.load(open(schema_filename)) except json.JSONDecodeError as err: _log.debug('Failed schema:\n{}'.format( open(schema_filename).read())) raise ValueError('Cannot parse JSON Schema: {}'.format(err)) # print('Loaded schema:\n{}'.format(open(schema_filename).read())) self._cached_schemas[schema_name] = schema return schema def _build_schema(self, name): template = self._jinja_env.get_template(name + TEMPLATE_EXT) template.stream().dump(self._schema_file(name)) def _schema_file(self, name): return os.path.join(self._cache_dir, name + SCHEMA_EXT) @staticmethod def _describe_validation_error(errlist): e = jsonschema.exceptions.best_match(errlist) path_str = '/'.join([str(x) for x in e.absolute_path]) err_str = textwrap.fill(e.message, 60) return 'Document path: /{}\nError: {}'.format(path_str, err_str)
[docs] def instances(self, schema, param_file): params = _yaml.load(param_file) gen = InstanceGenerator(self.get_schema(schema), params) return gen
[docs]class InstanceGenerator(object): indent = 2 keywords = '$schema', 'id', 'definitions' root_var = 'root' default_arr_len = 1 bplate_div = 'DO NOT MODIFY BEYOND THIS POINT' def __init__(self, schema, params=None): self._s, self._p = schema, params or {}
[docs] def create_script(self, output_file, preserve_old=True, **kwargs): user, bplate = self.get_script(**kwargs) if os.path.exists(output_file) and preserve_old: # Set 'user' section to the one in the existing file, # thus preserving the existing content. user_lines = [] with open(output_file, 'r') as f: for line in f: if self.bplate_div in line: break user_lines.append(line[:-1]) user = '\n'.join(user_lines) with open(output_file, 'w') as f: f.write(user) f.write('\n') f.write(bplate)
[docs] def get_script(self, n=1, output_files='/tmp/file{i}.json'): """Code to load & generate `n` schemas as a Python string template with the spot for the variables as '{variables}'. Returns: (str, str): Pair of strings, first is user-modifiable part and second is boilerplate with the template data. This allows separate modification of these 2 sections. """ divider = '#' * 78 user = [ '#!/usr/bin/env python', 'from jinja2 import Template', 'output_files = "{pattern}"', 'num_to_generate = {num}', divider, '# Set dynamic values for i-th instance', divider, 'def modify(v, i):', ' # modify dict "v" for i-th', ' return v', divider, '# Set static values for all instances', divider, 'template_vars = {variables}', divider, '' # keep ] bplate = [ '# ' + self.bplate_div, divider, 'template = Template("""{template}""")', divider, 'for i in range(num_to_generate):', ' cur_vars = modify(template_vars.copy(), i)', ' instance = template.render(**cur_vars)', ' f = open(output_files.format(i=i), "w")', ' f.write(instance)', '', 'print("Wrote {{n:d}} to {{p}}".format(n=num_to_generate,' ' p=output_files))', '' ] user_str = '\n'.join(user).format(variables=self.get_variables(), pattern=output_files, num=n) bplate_str = '\n'.join(bplate).format(template=self.get_template()) return user_str, bplate_str
[docs] def get_variables(self, commented=True): results = self._var_visit(self._s, depth=0) if results[0].startswith('['): results[0] = '{{ \'{}\': ['.format(self.root_var) results[-1] = ']}' if commented: results = [results[0]] + \ ['# {}'.format(r) for r in results[1:-1]] + \ [results[-1]] return '\n'.join(results)
[docs] def get_template(self): """Generate a new template for the instance. Returns: str: JSON of the instance """ return self._visit(self._s, path='', depth=0)
def _visit(self, val, path=None, depth=None): # print('@@ visit value: {}'.format(val)) type_, val = self._get_type(val) simple, compound = None, None if type_ == 'string': simple = '"str"' elif type_ == 'number': simple = '{:.2f}'.format(random.random()) elif type_ == 'integer': simple = str(random.randint(0, 100)) elif type_ == 'boolean': simple = 'False' elif type_ == 'object': lines = [] if 'properties' in val: for key, val in six.iteritems(val['properties']): name = path + '.' + key if path else key v = self._visit(val, path=name, depth=depth + 1) lines.append('"{}": {}'.format(key, v)) compound = '{{{o}}}'.format(o=', '.join(lines)) elif type_ == 'array': n = self.default_arr_len # TODO: determine dynamically lines = [] name_pfx = self.root_var if depth == 0 else path if isinstance(val['items'], list): for i in range(len(val['items'])): name = '{}[{}]'.format(name_pfx, i) v = self._visit(val['items'][i], name, depth + 1) lines.append(v) arr = ','.join(lines) else: for i in range(n): name = '{}[{}]'.format(name_pfx, i) v = self._visit(val['items'], name, depth + 1) lines.append(v) arr = ','.join(lines) # Surround lines with if/else to avoid overriding empty # arrays set by the user. f1 = '{{%- if {name} is defined and {name}|length > 0 %}}'\ .format(name=name_pfx) f2 = '{% endif %}' compound = '[{if_}{a}{endif}]'.format(a=arr, if_=f1, endif=f2) else: raise ValueError('Unknown value type: {}'.format(type_)) if simple: quotes = '"' if type_ == 'string' else '' simple = '{q}{{{{ {p} if ({c}) else {d} }}}}{q}'.format( q=quotes, p=path, d=simple, c=self._variable_comp_expr(path)) return simple else: return compound def _var_visit(self, val, name=None, depth=0): # print('@@ var visit value: {}'.format(val)) idt = ' ' * depth idt_m1 = ' ' * max(depth - 1, 0) type_, val = self._get_type(val) if type_ == 'string': variable = ["'str'"] elif type_ == 'number': variable = ['0.0'] elif type_ == 'integer': variable = ['0'] elif type_ == 'boolean': variable = ['False'] elif type_ == 'object': if 'properties' in val: variable = ['{'] for key, val in six.iteritems(val['properties']): v = self._var_visit(val, name=key, depth=depth + 1) if len(v) > 1: variable.append("{}'{}': {}".format(idt, key, v[0])) variable.extend(v[1:-1]) variable.append(v[-1] + ',') else: variable.append("{}'{}': {},".format(idt, key, v[0])) variable.append('{}}}'.format(idt)) else: variable = ['{}'] elif type_ == 'array': variable = [] if isinstance(val['items'], list): variable.append('[') for idx, item in enumerate(val['items']): r = self._var_visit(item, depth=depth + 1) variable.append('{}{}'.format(idt, r[0])) variable.extend(r[1:]) variable[-1] += ',' variable[-1] = variable[-1].strip(',') + ']' # variable.append('{}]'.format(idt)) else: v = self._var_visit(val['items'], depth=depth + 1) variable.append('[{}'.format(v[0])) if len(v) > 1: variable.extend(v[1:]) variable.append('{}]'.format(idt_m1)) else: variable[-1] = variable[-1] + ']' else: raise ValueError('Unknown value type: {}'.format(type_)) result = variable return result def _get_type(self, val): try: type_ = val['type'] except KeyError: if '$ref' in val: defined_val = self._fetch_ref(val['$ref']) # swap value with definition contents, and continue val = defined_val type_ = val['type'] # no nested $refs else: type_ = 'string' return type_, val @staticmethod def _variable_comp_expr(v): """Make an expression out of the components of a dotted variable name (with possible array references), so that in Jinja2, if <this expression> will evaluate to true only if the "leaf" variable is defined. This is to deal with the fact that 'X[0].name|default()' will throw an error if X or X[0] is not defined. Args: v: Full variable path, with dots and array pieces. Returns: str: Expression "<part> and <part> and <part> .." """ comp = re.split('[[.]', v) paths = [] for c in comp: if c.endswith(']'): variable = paths[-1] + '[' + c else: if paths: variable = paths[-1] + '.' + c else: variable = c paths.append(variable) return ' and '.join(paths) def _fetch_ref(self, ref): if 'definitions' not in self._s: raise ValueError('Definitions must be in a ' 'section called "definitions". ' 'Cannot parse ref: {}'.format(ref)) defn = ref.split('/')[-1] if defn not in self._s['definitions']: raise KeyError('Could not find "{}" from ref "{}" ' 'in definitions section'.format(defn, ref)) return self._s['definitions'][defn]