##############################################################################
# Institute for the Design of Advanced Energy Systems Process Systems
# Engineering Framework (IDAES PSE Framework) Copyright (c) 2018, by the
# software owners: The Regents of the University of California, through
# Lawrence Berkeley National Laboratory, National Technology & Engineering
# Solutions of Sandia, LLC, Carnegie Mellon University, West Virginia
# University Research Corporation, et al. All rights reserved.
#
# Please see the files COPYRIGHT.txt and LICENSE.txt for full copyright and
# license information, respectively. Both files are also available online
# at the URL "https://github.com/IDAES/idaes".
##############################################################################
"""
XXX: This module is going way soon -dang 10/26/18
"""
# stdlib
import json
import logging
import os
import random
import re
import shutil
import tempfile
import textwrap
# third-party
from jinja2 import Environment, PackageLoader, select_autoescape
import jsonschema
import six
import yaml as _yaml
# package
from .util import get_logger, get_file
_log = get_logger('validate')
TEMPLATE_EXT = '.template'
SCHEMA_EXT = '.json'
[docs]class JsonSchemaValidator(object):
"""Validate JSON documents against schemas defined in this package.
The schemas are in the "schemas/" directory of the package.
They are first processed as Jinja2 templates, to allow for flexible re-use
of common schema elements. The actual resulting schema is stored
in a temporary directory that is removed when this class is
deleted.
Example usage::
vdr = JsonSchemaValidator()
# Validate document against the "foobar" schema.
ok, msg = vdr.validate({'foo': '1', 'bar': 2}, 'foobar')
if ok:
print("Success!")
else:
print("Failed: {}".format(msg))
# Validate input YAML file against the "config" schema
ok, msg = vdr.validate('/path/to/my_config.yaml', 'config', yaml=True)
if ok:
print("Success!")
else:
print("Failed: {}".format(msg))
"""
def __init__(self, modpath='idaes.dmf', directory='schemas',
do_not_cache=False):
"""Constructor.
Args:
modpath (str): Package/module path
directory (str): Directory at module path location
do_not_cache (bool): If True, do not cache anything. This is
equivalent to calling :meth:`reset` before every call to
:meth:`validate`.
"""
self._init_cache()
# configure Jinja2
self._jinja_env = Environment(
loader=PackageLoader(modpath, directory),
autoescape=select_autoescape(disabled_extensions=[
TEMPLATE_EXT])
)
self._always_reset = do_not_cache
def __del__(self):
self._clear_cache()
[docs] def validate(self, doc, schema, yaml=False):
"""Validate a JSON file against a schema.
Args:
doc (str|file|list|dict): Input filename or object. May be JSON
or YAML. Also may be a list/dict, which is
assumed to represent parsed JSON.
schema (str): Name of schema in this package.
This will be the name, without the
`.template` suffix, of a file in the
'schemas/' directory.
yaml (bool): If true, use the YAML parser instead of the JSON
parser on the input file.
Returns:
(bool, str) Pair whose first value is whether it validated and
second is set to the error message if it did not.
Raises:
IOError if either file cannot be opened.
ValueError if either file cannot be parsed.
"""
if self._always_reset:
self.reset()
if doc is None:
doc = {}
elif isinstance(doc, list) or isinstance(doc, dict):
pass
else:
try:
j_f = get_file(doc)
except IOError as err:
raise IOError('Cannot open JSON file to validate: {}'
.format(err))
if yaml:
try:
doc = _yaml.load(j_f)
except _yaml.YAMLError as err:
raise ValueError('Cannot parse YAML file to validate: {}'
.format(err))
else:
try:
doc = json.load(j_f)
except json.JSONDecodeError as err:
raise ValueError('Cannot parse JSON file to validate: {}'
.format(err))
sch = self._load_schema(schema)
if _log.isEnabledFor(logging.DEBUG):
preview = json.dumps(doc)
if len(preview) > 155:
preview = preview[:155] + '...'
_log.debug('validating instance: "{}"'.format(preview))
try:
jsonschema.validate(doc, sch)
result = (True, '')
except jsonschema.ValidationError:
errlist = jsonschema.Draft4Validator(sch).iter_errors(doc)
result = (False, self._describe_validation_error(errlist))
return result
[docs] def reset(self):
"""Clear cached schemas, so that changes in the base templates
are picked up by the validation code.
"""
self._clear_cache()
self._init_cache()
[docs] def get_schema(self, schema):
"""Load the schema and return it as a Python (dict) object.
See :meth:`validate` for details.
Args:
schema (str): Schema name. Same as `schema` arg to :meth:`validate`
Returns:
dict: Parsed schema
Raises:
IOError if file cannot be opened.
ValueError if file cannot be parsed.
"""
return self._load_schema(schema)
def _init_cache(self):
self._cache_dir = tempfile.mkdtemp()
self._cached_schemas = {}
def _clear_cache(self):
# delete cache dir manually
try:
shutil.rmtree(self._cache_dir)
except IOError:
pass
self._cached_schemas = {}
def _load_schema(self, schema_name):
schema = self._cached_schemas.get(schema_name, None)
if schema is None:
schema_filename = self._schema_file(schema_name)
if not os.path.exists(schema_filename):
self._build_schema(schema_name)
try:
schema = json.load(open(schema_filename))
except json.JSONDecodeError as err:
_log.debug('Failed schema:\n{}'.format(
open(schema_filename).read()))
raise ValueError('Cannot parse JSON Schema: {}'.format(err))
# print('Loaded schema:\n{}'.format(open(schema_filename).read()))
self._cached_schemas[schema_name] = schema
return schema
def _build_schema(self, name):
template = self._jinja_env.get_template(name + TEMPLATE_EXT)
template.stream().dump(self._schema_file(name))
def _schema_file(self, name):
return os.path.join(self._cache_dir, name + SCHEMA_EXT)
@staticmethod
def _describe_validation_error(errlist):
e = jsonschema.exceptions.best_match(errlist)
path_str = '/'.join([str(x) for x in e.absolute_path])
err_str = textwrap.fill(e.message, 60)
return 'Document path: /{}\nError: {}'.format(path_str, err_str)
[docs] def instances(self, schema, param_file):
params = _yaml.load(param_file)
gen = InstanceGenerator(self.get_schema(schema), params)
return gen
[docs]class InstanceGenerator(object):
indent = 2
keywords = '$schema', 'id', 'definitions'
root_var = 'root'
default_arr_len = 1
bplate_div = 'DO NOT MODIFY BEYOND THIS POINT'
def __init__(self, schema, params=None):
self._s, self._p = schema, params or {}
[docs] def create_script(self, output_file, preserve_old=True, **kwargs):
user, bplate = self.get_script(**kwargs)
if os.path.exists(output_file) and preserve_old:
# Set 'user' section to the one in the existing file,
# thus preserving the existing content.
user_lines = []
with open(output_file, 'r') as f:
for line in f:
if self.bplate_div in line:
break
user_lines.append(line[:-1])
user = '\n'.join(user_lines)
with open(output_file, 'w') as f:
f.write(user)
f.write('\n')
f.write(bplate)
[docs] def get_script(self, n=1, output_files='/tmp/file{i}.json'):
"""Code to load & generate `n` schemas as a Python string template
with the spot for the variables as '{variables}'.
Returns:
(str, str): Pair of strings, first is user-modifiable part and
second is boilerplate with the template data.
This allows separate modification of these 2 sections.
"""
divider = '#' * 78
user = [
'#!/usr/bin/env python',
'from jinja2 import Template',
'output_files = "{pattern}"',
'num_to_generate = {num}',
divider,
'# Set dynamic values for i-th instance',
divider,
'def modify(v, i):',
' # modify dict "v" for i-th',
' return v',
divider,
'# Set static values for all instances',
divider,
'template_vars = {variables}',
divider,
'' # keep
]
bplate = [
'# ' + self.bplate_div,
divider,
'template = Template("""{template}""")',
divider,
'for i in range(num_to_generate):',
' cur_vars = modify(template_vars.copy(), i)',
' instance = template.render(**cur_vars)',
' f = open(output_files.format(i=i), "w")',
' f.write(instance)',
'',
'print("Wrote {{n:d}} to {{p}}".format(n=num_to_generate,'
' p=output_files))',
''
]
user_str = '\n'.join(user).format(variables=self.get_variables(),
pattern=output_files, num=n)
bplate_str = '\n'.join(bplate).format(template=self.get_template())
return user_str, bplate_str
[docs] def get_variables(self, commented=True):
results = self._var_visit(self._s, depth=0)
if results[0].startswith('['):
results[0] = '{{ \'{}\': ['.format(self.root_var)
results[-1] = ']}'
if commented:
results = [results[0]] + \
['# {}'.format(r) for r in results[1:-1]] + \
[results[-1]]
return '\n'.join(results)
[docs] def get_template(self):
"""Generate a new template for the instance.
Returns:
str: JSON of the instance
"""
return self._visit(self._s, path='', depth=0)
def _visit(self, val, path=None, depth=None):
# print('@@ visit value: {}'.format(val))
type_, val = self._get_type(val)
simple, compound = None, None
if type_ == 'string':
simple = '"str"'
elif type_ == 'number':
simple = '{:.2f}'.format(random.random())
elif type_ == 'integer':
simple = str(random.randint(0, 100))
elif type_ == 'boolean':
simple = 'False'
elif type_ == 'object':
lines = []
if 'properties' in val:
for key, val in six.iteritems(val['properties']):
name = path + '.' + key if path else key
v = self._visit(val, path=name, depth=depth + 1)
lines.append('"{}": {}'.format(key, v))
compound = '{{{o}}}'.format(o=', '.join(lines))
elif type_ == 'array':
n = self.default_arr_len # TODO: determine dynamically
lines = []
name_pfx = self.root_var if depth == 0 else path
if isinstance(val['items'], list):
for i in range(len(val['items'])):
name = '{}[{}]'.format(name_pfx, i)
v = self._visit(val['items'][i], name, depth + 1)
lines.append(v)
arr = ','.join(lines)
else:
for i in range(n):
name = '{}[{}]'.format(name_pfx, i)
v = self._visit(val['items'], name, depth + 1)
lines.append(v)
arr = ','.join(lines)
# Surround lines with if/else to avoid overriding empty
# arrays set by the user.
f1 = '{{%- if {name} is defined and {name}|length > 0 %}}'\
.format(name=name_pfx)
f2 = '{% endif %}'
compound = '[{if_}{a}{endif}]'.format(a=arr, if_=f1, endif=f2)
else:
raise ValueError('Unknown value type: {}'.format(type_))
if simple:
quotes = '"' if type_ == 'string' else ''
simple = '{q}{{{{ {p} if ({c}) else {d} }}}}{q}'.format(
q=quotes, p=path, d=simple, c=self._variable_comp_expr(path))
return simple
else:
return compound
def _var_visit(self, val, name=None, depth=0):
# print('@@ var visit value: {}'.format(val))
idt = ' ' * depth
idt_m1 = ' ' * max(depth - 1, 0)
type_, val = self._get_type(val)
if type_ == 'string':
variable = ["'str'"]
elif type_ == 'number':
variable = ['0.0']
elif type_ == 'integer':
variable = ['0']
elif type_ == 'boolean':
variable = ['False']
elif type_ == 'object':
if 'properties' in val:
variable = ['{']
for key, val in six.iteritems(val['properties']):
v = self._var_visit(val, name=key, depth=depth + 1)
if len(v) > 1:
variable.append("{}'{}': {}".format(idt, key, v[0]))
variable.extend(v[1:-1])
variable.append(v[-1] + ',')
else:
variable.append("{}'{}': {},".format(idt, key, v[0]))
variable.append('{}}}'.format(idt))
else:
variable = ['{}']
elif type_ == 'array':
variable = []
if isinstance(val['items'], list):
variable.append('[')
for idx, item in enumerate(val['items']):
r = self._var_visit(item, depth=depth + 1)
variable.append('{}{}'.format(idt, r[0]))
variable.extend(r[1:])
variable[-1] += ','
variable[-1] = variable[-1].strip(',') + ']'
# variable.append('{}]'.format(idt))
else:
v = self._var_visit(val['items'], depth=depth + 1)
variable.append('[{}'.format(v[0]))
if len(v) > 1:
variable.extend(v[1:])
variable.append('{}]'.format(idt_m1))
else:
variable[-1] = variable[-1] + ']'
else:
raise ValueError('Unknown value type: {}'.format(type_))
result = variable
return result
def _get_type(self, val):
try:
type_ = val['type']
except KeyError:
if '$ref' in val:
defined_val = self._fetch_ref(val['$ref'])
# swap value with definition contents, and continue
val = defined_val
type_ = val['type'] # no nested $refs
else:
type_ = 'string'
return type_, val
@staticmethod
def _variable_comp_expr(v):
"""Make an expression out of the components of a dotted
variable name (with possible array references), so that
in Jinja2, if <this expression> will evaluate to true only
if the "leaf" variable is defined. This is to deal with the
fact that 'X[0].name|default()' will throw an error if
X or X[0] is not defined.
Args:
v: Full variable path, with dots and array pieces.
Returns:
str: Expression "<part> and <part> and <part> .."
"""
comp = re.split('[[.]', v)
paths = []
for c in comp:
if c.endswith(']'):
variable = paths[-1] + '[' + c
else:
if paths:
variable = paths[-1] + '.' + c
else:
variable = c
paths.append(variable)
return ' and '.join(paths)
def _fetch_ref(self, ref):
if 'definitions' not in self._s:
raise ValueError('Definitions must be in a '
'section called "definitions". '
'Cannot parse ref: {}'.format(ref))
defn = ref.split('/')[-1]
if defn not in self._s['definitions']:
raise KeyError('Could not find "{}" from ref "{}" '
'in definitions section'.format(defn, ref))
return self._s['definitions'][defn]