blob: 51019062e2bdd8ad1acf2ffce0ea5f7f5d7411c0 [file] [log] [blame]
"""schema is a library for validating Python data structures, such as those
obtained from config-files, forms, external services or command-line
parsing, converted from JSON/YAML (or something else) to Python data-types."""
import re
__version__ = '0.6.6'
__all__ = ['Schema',
'And', 'Or', 'Regex', 'Optional', 'Use',
'SchemaError',
'SchemaWrongKeyError',
'SchemaMissingKeyError',
'SchemaUnexpectedTypeError']
class SchemaError(Exception):
"""Error during Schema validation."""
def __init__(self, autos, errors=None):
self.autos = autos if type(autos) is list else [autos]
self.errors = errors if type(errors) is list else [errors]
Exception.__init__(self, self.code)
@property
def code(self):
"""
Removes duplicates values in auto and error list.
parameters.
"""
def uniq(seq):
"""
Utility function that removes duplicate.
"""
seen = set()
seen_add = seen.add
# This way removes duplicates while preserving the order.
return [x for x in seq if x not in seen and not seen_add(x)]
data_set = uniq(i for i in self.autos if i is not None)
error_list = uniq(i for i in self.errors if i is not None)
if error_list:
return '\n'.join(error_list)
return '\n'.join(data_set)
class SchemaWrongKeyError(SchemaError):
"""Error Should be raised when an unexpected key is detected within the
data set being."""
pass
class SchemaMissingKeyError(SchemaError):
"""Error should be raised when a mandatory key is not found within the
data set being vaidated"""
pass
class SchemaUnexpectedTypeError(SchemaError):
"""Error should be raised when a type mismatch is detected within the
data set being validated."""
pass
class And(object):
"""
Utility function to combine validation directives in AND Boolean fashion.
"""
def __init__(self, *args, **kw):
self._args = args
assert set(kw).issubset(['error', 'schema', 'ignore_extra_keys'])
self._error = kw.get('error')
self._ignore_extra_keys = kw.get('ignore_extra_keys', False)
# You can pass your inherited Schema class.
self._schema = kw.get('schema', Schema)
def __repr__(self):
return '%s(%s)' % (self.__class__.__name__,
', '.join(repr(a) for a in self._args))
def validate(self, data):
"""
Validate data using defined sub schema/expressions ensuring all
values are valid.
:param data: to be validated with sub defined schemas.
:return: returns validated data
"""
for s in [self._schema(s, error=self._error,
ignore_extra_keys=self._ignore_extra_keys)
for s in self._args]:
data = s.validate(data)
return data
class Or(And):
"""Utility function to combine validation directives in a OR Boolean
fashion."""
def validate(self, data):
"""
Validate data using sub defined schema/expressions ensuring at least
one value is valid.
:param data: data to be validated by provided schema.
:return: return validated data if not validation
"""
x = SchemaError([], [])
for s in [self._schema(s, error=self._error,
ignore_extra_keys=self._ignore_extra_keys)
for s in self._args]:
try:
return s.validate(data)
except SchemaError as _x:
x = _x
raise SchemaError(['%r did not validate %r' % (self, data)] + x.autos,
[self._error.format(data) if self._error else None] +
x.errors)
class Regex(object):
"""
Enables schema.py to validate string using regular expressions.
"""
# Map all flags bits to a more readable description
NAMES = ['re.ASCII', 're.DEBUG', 're.VERBOSE', 're.UNICODE', 're.DOTALL',
're.MULTILINE', 're.LOCALE', 're.IGNORECASE', 're.TEMPLATE']
def __init__(self, pattern_str, flags=0, error=None):
self._pattern_str = pattern_str
flags_list = [Regex.NAMES[i] for i, f in # Name for each bit
enumerate('{0:09b}'.format(flags)) if f != '0']
if flags_list:
self._flags_names = ', flags=' + '|'.join(flags_list)
else:
self._flags_names = ''
self._pattern = re.compile(pattern_str, flags=flags)
self._error = error
def __repr__(self):
return '%s(%r%s)' % (
self.__class__.__name__, self._pattern_str, self._flags_names
)
def validate(self, data):
"""
Validated data using defined regex.
:param data: data to be validated
:return: return validated data.
"""
e = self._error
try:
if self._pattern.search(data):
return data
else:
raise SchemaError('%r does not match %r' % (self, data), e)
except TypeError:
raise SchemaError('%r is not string nor buffer' % data, e)
class Use(object):
"""
For more general use cases, you can use the Use class to transform
the data while it is being validate.
"""
def __init__(self, callable_, error=None):
assert callable(callable_)
self._callable = callable_
self._error = error
def __repr__(self):
return '%s(%r)' % (self.__class__.__name__, self._callable)
def validate(self, data):
try:
return self._callable(data)
except SchemaError as x:
raise SchemaError([None] + x.autos,
[self._error.format(data)
if self._error else None] + x.errors)
except BaseException as x:
f = _callable_str(self._callable)
raise SchemaError('%s(%r) raised %r' % (f, data, x),
self._error.format(data)
if self._error else None)
COMPARABLE, CALLABLE, VALIDATOR, TYPE, DICT, ITERABLE = range(6)
def _priority(s):
"""Return priority for a given object."""
if type(s) in (list, tuple, set, frozenset):
return ITERABLE
if type(s) is dict:
return DICT
if issubclass(type(s), type):
return TYPE
if hasattr(s, 'validate'):
return VALIDATOR
if callable(s):
return CALLABLE
else:
return COMPARABLE
class Schema(object):
"""
Entry point of the library, use this class to instantiate validation
schema for the data that will be validated.
"""
def __init__(self, schema, error=None, ignore_extra_keys=False):
self._schema = schema
self._error = error
self._ignore_extra_keys = ignore_extra_keys
def __repr__(self):
return '%s(%r)' % (self.__class__.__name__, self._schema)
@staticmethod
def _dict_key_priority(s):
"""Return priority for a given key object."""
if isinstance(s, Optional):
return _priority(s._schema) + 0.5
return _priority(s)
def validate(self, data):
Schema = self.__class__
s = self._schema
e = self._error
i = self._ignore_extra_keys
flavor = _priority(s)
if flavor == ITERABLE:
data = Schema(type(s), error=e).validate(data)
o = Or(*s, error=e, schema=Schema, ignore_extra_keys=i)
return type(data)(o.validate(d) for d in data)
if flavor == DICT:
data = Schema(dict, error=e).validate(data)
new = type(data)() # new - is a dict of the validated values
coverage = set() # matched schema keys
# for each key and value find a schema entry matching them, if any
sorted_skeys = sorted(s, key=self._dict_key_priority)
for key, value in data.items():
for skey in sorted_skeys:
svalue = s[skey]
try:
nkey = Schema(skey, error=e).validate(key)
except SchemaError:
pass
else:
try:
nvalue = Schema(svalue, error=e,
ignore_extra_keys=i).validate(value)
except SchemaError as x:
k = "Key '%s' error:" % nkey
raise SchemaError([k] + x.autos, [e] + x.errors)
else:
new[nkey] = nvalue
coverage.add(skey)
break
required = set(k for k in s if type(k) is not Optional)
if not required.issubset(coverage):
missing_keys = required - coverage
s_missing_keys = \
', '.join(repr(k) for k in sorted(missing_keys, key=repr))
raise \
SchemaMissingKeyError('Missing keys: ' + s_missing_keys, e)
if not self._ignore_extra_keys and (len(new) != len(data)):
wrong_keys = set(data.keys()) - set(new.keys())
s_wrong_keys = \
', '.join(repr(k) for k in sorted(wrong_keys, key=repr))
raise \
SchemaWrongKeyError(
'Wrong keys %s in %r' % (s_wrong_keys, data),
e.format(data) if e else None)
# Apply default-having optionals that haven't been used:
defaults = set(k for k in s if type(k) is Optional and
hasattr(k, 'default')) - coverage
for default in defaults:
new[default.key] = default.default
return new
if flavor == TYPE:
if isinstance(data, s):
return data
else:
raise SchemaUnexpectedTypeError(
'%r should be instance of %r' % (data, s.__name__),
e.format(data) if e else None)
if flavor == VALIDATOR:
try:
return s.validate(data)
except SchemaError as x:
raise SchemaError([None] + x.autos, [e] + x.errors)
except BaseException as x:
raise SchemaError(
'%r.validate(%r) raised %r' % (s, data, x),
self._error.format(data) if self._error else None)
if flavor == CALLABLE:
f = _callable_str(s)
try:
if s(data):
return data
except SchemaError as x:
raise SchemaError([None] + x.autos, [e] + x.errors)
except BaseException as x:
raise SchemaError(
'%s(%r) raised %r' % (f, data, x),
self._error.format(data) if self._error else None)
raise SchemaError('%s(%r) should evaluate to True' % (f, data), e)
if s == data:
return data
else:
raise SchemaError('%r does not match %r' % (s, data),
e.format(data) if e else None)
class Optional(Schema):
"""Marker for an optional part of the validation Schema."""
_MARKER = object()
def __init__(self, *args, **kwargs):
default = kwargs.pop('default', self._MARKER)
super(Optional, self).__init__(*args, **kwargs)
if default is not self._MARKER:
# See if I can come up with a static key to use for myself:
if _priority(self._schema) != COMPARABLE:
raise TypeError(
'Optional keys with defaults must have simple, '
'predictable values, like literal strings or ints. '
'"%r" is too complex.' % (self._schema,))
self.default = default
self.key = self._schema
def _callable_str(callable_):
if hasattr(callable_, '__name__'):
return callable_.__name__
return str(callable_)