Source code for xmodels.models

import logging
from six import with_metaclass

from .fields import BaseField, WrappedObjectField, ValidationException, \
    RequiredAttribute, AttributeField
from .constraints import Stores
from .utils import CommonEqualityMixin, MsgRecord


logger = logging.getLogger(__name__)


def error(logger_inst, message, **kwargs):
    kwargs['errors'].append(message)
    logger_inst.error(message)


class SequenceElement(CommonEqualityMixin):
    """
    Container to store xml tag, min_occurs and max_occurs. The property
    required is True when min_occurs > 0.
    """
    def __repr__(self):
        return '%s: %s (%d, %d)' % (self.__class__.__name__, self.tag,
                                    self.min_occurs, self.max_occurs)

    def __init__(self, tag, min_occurs=0, max_occurs=0):
        self.tag = tag
        self._min_occurs = min_occurs
        self._max_occurs = max_occurs
        if 0 < max_occurs < min_occurs:
            raise ValueError

    @property
    def min_occurs(self):
        return self._min_occurs

    @property
    def max_occurs(self):
        return self._max_occurs

    @property
    def required(self):
        return self.min_occurs > 0


class Choice(CommonEqualityMixin):
    """
    The Choice class describes the options of the xsd:choice element.
    The options attributes is a list of choices. Each option may either
    be a single SequenceElement or a list of SequenceElement (xsd:sequence).
    If required is True, the default, the set of value keys (value_key_set)
    in self.match_choice_keys must match at least one set of required keys
    listed in any option. If required is False, the set of value keys may
    be empty.
    """

    def __init__(self, options, required=True, **kwargs):
        assert isinstance(options, list)
        self.options = [option for option in options]
        self.required = required
        self._flat_options = self._flat_options_dict()
        self.all_keys_set = set(self._flat_options.keys())
        self.required_keys_sets = self.choice_to_key_sets(True)
        self.optional_keys_sets = self.choice_to_key_sets(False)

    def __str__(self):
        return 'Choice: %s' % self.choice_keys_str()

    def choice_keys_str(self):
        result = ''
        for option in self.options:
            if isinstance(option, list):
                result += ' (%s) ' % ','.join(item.tag for item in option)
            else:
                result += ' %s ' % option.tag
        return '(%s)' % ' | '.join(result.strip().split()).replace(',', ', ')

    def _flat_options_dict(self):
        flattened = []
        assert_msg = "Choice options may only be of type BaseField or " \
                     "list of BaseField"
        for option in self.options:
            if isinstance(option, list):
                for item in option:
                    assert isinstance(item, SequenceElement), assert_msg
                flattened.extend(option)
            else:
                assert isinstance(option, SequenceElement), assert_msg
                flattened.append(option)
        return dict((option.tag, option) for option in flattened)

    def choice_to_key_sets(self, required):
        key_sets = [set([]) for option in self.options]
        for index, option in enumerate(self.options):
            if isinstance(option, SequenceElement):
                if option.required == required:
                    key_sets[index].add(option.tag)
            elif isinstance(option, list):
                [key_sets[index].add(field.tag) for field in option
                 if field.required == required]
        return key_sets

    def match_choice_keys(self, value_key_set, **kwargs):
        no_match_msg = "Could not match keys: %s with: choices: %s" % (
            ', '.join(value_key_set), self.choice_keys_str())
        if value_key_set == set([]) and not self.required:
            return []
        max_key_sets = [self.required_keys_sets[i] | self.optional_keys_sets[i]
                        for i in range(len(self.options))]
        min_key_matches = [value_key_set >= min_keys
                           for min_keys in self.required_keys_sets]
        max_key_matches = [value_key_set <= max_keys
                           for max_keys in max_key_sets]
        if not any(min_key_matches):
            error(logger, no_match_msg, **kwargs)
        if not any(max_key_matches):
            error(logger, no_match_msg, **kwargs)
        if any(min_key_matches) and any(max_key_matches):
            matches = [i for i in range(len(self.options))
                       if min_key_matches[i] and max_key_matches[i]]
            if isinstance(self.options[matches[0]], SequenceElement):
                matched_fields = [self.options[matches[0]].tag]
            else:
                matched_fields = [field.tag
                                  for field in self.options[matches[0]]
                                  if field.tag in value_key_set]
            msg = "Matched keys: %s with option: %d" % \
                  (', '.join(value_key_set), matches[0])
            logger.debug(msg)
            return matched_fields
        return [self._flat_options[tag] for tag in value_key_set
                if tag in self._flat_options]


class Options(object):
    """
    Container for meta properties.
    """
    def __init__(self, meta):
        self.key_to_source = None
        self.source_to_key = None
        self.allow_extra_elements = False
        self.allow_extra_attributes = False
        if meta:
            for key, value in meta.items():
                self.__dict__[key] = value


class ModelType(type):
    """Creates the metaclass for Model. The main function of this metaclass
    is to move all of fields into the _clsfields variable on the class and to
    combine/update the class variables of the inner class Meta into an Options
    instance which is stored under _meta.
    """

    def __new__(cls, name, bases, attrs):
        super_new = super(ModelType, cls).__new__

        if not any(b for b in bases if isinstance(b, ModelType)):
            return super_new(cls, name, bases, attrs)

        module = attrs.pop('__module__')
        new_class = super_new(cls, name, bases, {'__module__': module})
        new_class._clsfields = {}
        new_class._extra = None
        new_class._data = None
        new_class._non_empty_fields = None
        new_class._defaults = {}
        for key, value in attrs.items():
            if isinstance(value, BaseField):
                new_class._clsfields[key] = value
        for key in new_class._clsfields.keys():
            attrs.pop(key, None)
        for key, field in new_class._clsfields.items():
            if field.default is not None:
                new_class._defaults[key] = field.default
        base_meta = getattr(new_class, 'Meta')
        attr_meta = attrs.pop('Meta', None)
        options = Options(base_meta.__dict__)
        if attr_meta:
            for key, value in attr_meta.__dict__.items():
                if not key.startswith('__'):
                    setattr(options, key, value)
        new_class._meta = options
        # Add all attributes to the class.
        for obj_name, obj in attrs.items():
            setattr(new_class, obj_name, obj)
        return new_class


[docs]class Model(with_metaclass(ModelType)): """The Model is the main component of xmodels.models. It is the base class for AttributeModel and SequenceModel implementing common logic. Usually one defines a number of fields as class variables. Fields may have default values. All fields can be assigned and read. A read to a field which has not been set previously return the default value if one exists or None. The validate method validates all fields defined as class variables. Instance variables may be created in addition to the fields specified as class variables if Meta.allow_extra_elements is True. Otherwise the model validation fails. The validation results are stored in a logger instance. """ class Meta: allow_extra_elements = False allow_extra_attributes = False value_key = 'value' required_attributes = None initial = None sequence = None def __init__(self): self._extra = {} self._data = {} self._path = '' self._non_empty_fields = set([]) def __str__(self): return '%s(%s): %s' % (self.__class__.__name__, self.__class__.__base__.__name__, ', '.join(["'%s': %s" % (k, v) for k, v in sorted(self._fields.items())])) def __repr__(self): return self.__str__() def __getattr__(self, key): data = self._data.get(key) if data is None: data = self._extra.get(key) if data is None: data = self._defaults.get(key) return data def __setattr__(self, key, value): if key in self._clsfields.keys(): self._data[key] = value if value is not None: self._non_empty_fields.add(key) elif key.startswith('_'): self.__dict__[key] = value elif key[0] == '@' and self._meta.allow_extra_attributes: self._extra[key] = value elif key[0] != '@' and self._meta.allow_extra_elements: self._extra[key] = value else: raise AttributeError @classmethod def from_dict(cls, raw_data, **kwargs): """ This factory for :class:`Model` creates a Model from a dict object. """ instance = cls() instance.populate(raw_data, **kwargs) instance.validate(**kwargs) return instance def _gen_key_to_from_source(self, name_spaces): self._meta.source_to_key = {} default_prefix = '' if name_spaces and self._meta.name_space in name_spaces: default_prefix = ''.join([name_spaces[self._meta.name_space], ':']) for key, field in self._clsfields.items(): source = field.get_source(key, name_spaces, default_prefix) self._meta.source_to_key[source] = key self._meta.key_to_source = dict( [(value, key) for key, value in self._meta.source_to_key.items()]) def _find_field(self, name): if name in self._meta.source_to_key: key = self._meta.source_to_key[name] if key in self._fields: return key def _build_path(self, **kwargs): path = kwargs.get('path', '') index = kwargs.get('instance_index') if path: base_path = ''.join([path, '.', self.__class__.__name__]) else: base_path = self.__class__.__name__ if index is not None: base_path = ''.join([base_path, '[', str(index), ']']) kwargs['instance_index'] = None return base_path def populate(self, data, **kwargs): name_spaces = kwargs.get('name_spaces') self._gen_key_to_from_source(name_spaces) for name, value in data.items(): key = self._find_field(name) if key: field = self._clsfields[key] if value is not None or field.accept_none: if key is not None: self._non_empty_fields.add(key) if isinstance(field, WrappedObjectField): self._data[key] = field.populate(value, **kwargs) else: self._data[key] = value else: self._extra[name] = value def validate(self, **kwargs): self._path = self._build_path(**kwargs) for key, field in self._clsfields.items(): data = self._data.get(key) if data is not None: try: kwargs['path'] = self._path self._data[key] = field.validate(data, **kwargs) except ValidationException as e: msg_rec = MsgRecord(path=self._path, field=key, msg=e.msg) error(logger, msg_rec, **kwargs) if self._extra: extra_attributes = [key for key in self._extra.keys() if key.startswith('@')] extra_elements = [key for key in self._extra.keys() if key not in extra_attributes] if extra_attributes and not self._meta.allow_extra_attributes: attrs_str = ','.join(extra_attributes) msg = 'Found extra attribute fields: %s' % attrs_str msg_rec = MsgRecord(path=self._path, field='_extra', msg=msg) error(logger, msg_rec, **kwargs) if extra_elements and not self._meta.allow_extra_elements: els_str = ','.join(extra_elements) msg = 'Found extra element fields: %s' % els_str msg_rec = MsgRecord(path=self._path, field='_extra', msg=msg) error(logger, msg_rec, **kwargs) return self def deserialize(self, **kwargs): for key, field in self._fields.items(): data = self._data.get(key) if data is not None: try: kwargs['path'] = self._path self._data[key] = field.deserialize(data, **kwargs) except ValidationException as e: msg_rec = MsgRecord(path=self._path, field=key, msg=e.msg) error(logger, msg_rec, **kwargs) return self def serialize(self, **kwargs): name_spaces = kwargs.get('name_spaces') dict_constructor = kwargs.get('dict_constructor', dict) self._gen_key_to_from_source(name_spaces) result = dict_constructor() for key, value in self._get_fields_items(): field = self._fields[key] if value is not None: try: kwargs['path'] = self._path serialized_key = self._meta.key_to_source[key] serialized_data = field.serialize(value, **kwargs) if serialized_data == {}: result[serialized_key] = None else: result[serialized_key] = serialized_data except ValidationException as e: msg_rec = MsgRecord(path=self._path, field=key, msg=e.msg) error(logger, msg_rec, **kwargs) result.update(self._extra) return result @property def _fields(self): if self._extra: return dict(self._clsfields, **self._extra) return dict(self._clsfields) def _get_fields_items(self): return list(self._data.items())
class AttributeModel(Model): """Used to describe elements with attributes, an optional text value and no children. The key value used for the xml text #text is controlled by Meta.value_key. """ class Meta: value_key = 'value' required_attributes = None def __init__(self): super(AttributeModel, self).__init__() if self._meta.required_attributes is None: self._meta.required_attributes = [] cls_fields = {} for name, field in self._clsfields.items(): if name == self._meta.value_key: cls_fields[name] = self._clsfields[name] if not cls_fields[name].source: cls_fields[name].source = '#text' else: if name in self._meta.required_attributes: cls_fields[name] = RequiredAttribute(field) else: cls_fields[name] = AttributeField(field) self._clsfields = cls_fields
[docs]class SequenceModel(Model): """ The SequenceModel is used to describe xml elements using xsd:sequence. The sequence is described with a list of SequenceElement in Meta.sequence. The validation method checks if all required attributes and min_occurs > 0 elements are not None. The initial class variable is used for context initialization for identity constraints checking. """ class Meta: initial = None sequence = None def __init__(self): super(SequenceModel, self).__init__() self._data_sequence = None def validate(self, **kwargs): self._path = self._build_path(**kwargs) if self._meta.initial is not None: if kwargs.get('stores') is None: kwargs['stores'] = Stores() self._meta.initial.add_keys(path=self._path, stores=kwargs['stores']) super(SequenceModel, self).validate(**kwargs) element_tags = [] for tag in self._non_empty_fields: if tag in self._fields and self._data[tag] is not None: field = self._fields[tag] if not field.isAttribute: element_tags.append(tag) self._data_sequence = self.match_sequence(element_tags, **kwargs) def match_sequence(self, value_tags, **kwargs): result_sequence = [] path = kwargs.get('path', '') for field in self._meta.sequence: if isinstance(field, SequenceElement): if field.tag in value_tags: result_sequence.append(field.tag) elif field.required: msg = "Missing required key: %s %s" % (field.tag, path) msg_rec = MsgRecord(path=self._path, field=field.tag, msg=msg) error(logger, msg_rec, **kwargs) elif isinstance(field, Choice): choice_keys_sey = set(value_tags) & field.all_keys_set cs = field.match_choice_keys(choice_keys_sey, **kwargs) if cs: result_sequence.extend(cs) extra_tags = [tag for tag in value_tags if tag not in result_sequence] if extra_tags: msg = "Could not match tag(s): %s" % ', '.join(extra_tags) msg_rec = MsgRecord(path=self._path, field='_extra', msg=msg) error(logger, msg_rec, **kwargs) return result_sequence def _get_fields_items(self): attribute_keys = set(self._data.keys()) - set(self._data_sequence) attributes = [(key, self._data[key]) for key in attribute_keys] elements = [(key, self._data[key]) for key in self._data_sequence] return attributes + elements def from_xml(self, raw_data, **kwargs): name_spaces = kwargs.get('name_spaces', {}) keys_to_delete = [] root_data = next(iter(raw_data.values())) for key, value in root_data.items(): if key.startswith('@xmlns'): name_spaces[value] = key.split(':')[1] keys_to_delete.append(key) if 'http://www.w3.org/2001/XMLSchema-instance' in name_spaces: xsi = name_spaces['http://www.w3.org/2001/XMLSchema-instance'] schema_location_attr = '@%s:schemaLocation' % xsi if schema_location_attr in root_data: keys_to_delete.append(schema_location_attr) for key in keys_to_delete: del root_data[key] kwargs['errors'] = [] kwargs['name_spaces'] = name_spaces return self.populate(root_data, **kwargs) def serialize(self, **kwargs): # FIXME generate sequence return super(SequenceModel, self).serialize(**kwargs)