Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add debug logging for hidden validation and other exceptions e.g. when #199

Open
github-actions bot opened this issue Sep 16, 2024 · 0 comments
Open
Labels

Comments

@github-actions
Copy link

concatenating `Model[int](123) + '234.'` (gives TypeError:

unsupported operand type(s) for +: 'Model[int]' and 'str'). ?

`Model[int](123) + '234'` works fine, returns `Model[int]('357')`.

# TODO: Add debug logging for hidden validation and other exceptions e.g. when

        else:
            if attr in ['contents']:
                contents_prop = getattr(self.__class__, attr)
                old_contents_id = id(contents_prop.__get__(self))
                is_new_contents = id(value) != old_contents_id

                if is_new_contents:
                    contents_prop.__set__(self, value)

                    if self.config.interactive_mode and self.has_snapshot():
                        self.snapshot_holder.schedule_deepcopy_content_ids_for_deletion(
                            old_contents_id)
            else:
                if self._is_non_omnipy_pydantic_model():
                    self._special_method(
                        '__setattr__',
                        MethodInfo(state_changing=True, returns_same_type=YesNoMaybe.NO),
                        attr,
                        value)
                else:
                    raise RuntimeError('Model does not allow setting of extra attributes')

    def _special_method(  # noqa: C901
            self, name: str, info: MethodInfo, *args: object, **kwargs: object) -> object:

        if info.state_changing and self.config.interactive_mode:

            def _call_special_method_and_return_self_if_inplace(*inner_args: object,
                                                                **inner_kwargs: object) -> object:
                return_val = self._call_special_method(name, *inner_args, **inner_kwargs)

                if id(return_val) == id(self.contents):  # in-place operator, e.g. model += 1
                    return_val = self

                return return_val

            reset_solution = self._prepare_reset_solution_take_snapshot_if_needed().reset_solution
            with reset_solution:
                ret = _call_special_method_and_return_self_if_inplace(*args, **kwargs)
                if ret is NotImplemented:
                    return ret

                self._validate_and_set_value(
                    new_contents=self.contents,
                    reset_solution=reset_solution,
                )

        elif name == '__iter__' and isinstance(self, Iterable):
            _per_element_model_generator = self._get_convert_full_element_model_generator(
                cast(Iterable, self.contents),
                level_up_type_arg_idx=0,
            )
            return _per_element_model_generator()
        else:
            ret = self._call_special_method(name, *args, **kwargs)
            if ret is NotImplemented:
                return ret

            if info.state_changing:
                self.validate_contents()

        if id(ret) != id(self) and info.returns_same_type:
            level_up = False
            if name == '__getitem__':
                assert len(args) == 1
                if not isinstance(args[0], slice):
                    level_up = True

            # We can do this with some ease of mind as all the methods except '__getitem__' with
            # integer argument are supposed to possibly return a result of the same type.
            ret = self._convert_to_model_if_reasonable(
                ret,
                level_up=level_up,
                level_up_arg_idx=-1,
                raise_validation_errors=(info.returns_same_type == YesNoMaybe.YES),
            )

        return ret

    def _call_special_method(  # noqa: C901
            self,
            name: str,
            *args: object,
            **kwargs: object,
    ) -> object:
        contents = self._get_real_contents()
        has_add_method = hasattr(contents, '__add__')
        has_radd_method = hasattr(contents, '__radd__')
        has_iadd_method = hasattr(contents, '__iadd__')

        if name == '__add__' and has_add_method:

            def _add(other):
                # try:
                #     return contents.__add__(self.__class__(other).contents)
                # except ValidationError:
                return contents.__add__(other)

            # return _add_new_other_model(*args, **kwargs)
            method = _add
            return self._call_single_arg_method_with_model_converted_other_first(
                name, method, *args, **kwargs)

        elif name == '__radd__' and (has_radd_method or has_add_method):

            def _radd(other):
                if has_radd_method:
                    return contents.__radd__(other)
                else:
                    return contents.__add__(other)

            def _radd_model_converted_other(other):
                if has_radd_method:
                    return contents.__radd__(self.__class__(other).contents)
                else:
                    return self.__class__(other).contents.__add__(self.contents)

            method = _radd
            model_converted_other_method = _radd_model_converted_other
            return self._call_single_arg_method_with_model_converted_other_first(
                name,
                method,
                *args,
                model_converted_other_method=model_converted_other_method,
                **kwargs,
            )

        elif name == '__iadd__' and (has_iadd_method or has_add_method):

            def _iadd(other):
                if has_iadd_method:
                    return contents.__iadd__(other)
                else:
                    return contents.__add__(other)

            method = _iadd
            return self._call_single_arg_method_with_model_converted_other_first(
                name, method, *args, **kwargs)
        else:
            try:
                method = cast(Callable, self._getattr_from_contents_obj(name))
            except AttributeError as e:
                if name in ('__int__', '__float__', '__complex__'):
                    raise ValueError from e
                if name == '__len__':
                    raise TypeError(f"object of type '{self.__class__.__name__}' has no len()")
                else:
                    return NotImplemented

            return self._call_method_with_unconverted_args_first(method, *args, **kwargs)

    def _call_single_arg_method_with_model_converted_other_first(
        self,
        name: str,
        method: Callable,
        *args: object,
        model_converted_other_method: Callable | None = None,
        **kwargs: object,
    ):
        if len(args) != 1:
            raise TypeError(f'expected 1 argument, got {len(args)}')

        if len(kwargs) > 0:
            raise TypeError(f'method {name} takes no keyword arguments')

        arg = args[0]

        try:
            try:
                if model_converted_other_method:
                    return model_converted_other_method(arg)
                else:
                    return method(self.__class__(arg).contents, **kwargs)
            except ValidationError:
                # TODO: Add debug logging for hidden validation and other exceptions e.g. when
                #       concatenating `Model[int](123) + '234.'` (gives TypeError:
                #       unsupported operand type(s) for +: 'Model[int]' and 'str'). ?
                #       `Model[int](123) + '234'` works fine, returns `Model[int]('357')`.
                return method(arg)
        except TypeError:
            return NotImplemented

    def _call_method_with_unconverted_args_first(
        self,
        method: Callable,
        *args: object,
        **kwargs: object,
    ):
        try:
            with hold_and_reset_prev_attrib_value(
                    self.config,
                    'dynamically_convert_elements_to_models',
            ):
                self.config.dynamically_convert_elements_to_models = False
                ret = method(*args, **kwargs)
        except TypeError as type_exc:
            try:
                ret = self._call_method_with_model_converted_args(method, *args, **kwargs)
            except ValidationError:
                raise type_exc
        if ret is NotImplemented:
            try:
                ret = self._call_method_with_model_converted_args(method, *args, **kwargs)
            except ValidationError:
                pass
        return ret

    def _call_method_with_model_converted_args(
        self,
        method: Callable,
        *args: object,
        **kwargs: object,
    ):
        model_args = [self.__class__(arg).contents for arg in args]
        return method(*model_args, **kwargs)

    def _get_convert_full_element_model_generator(
            self, elements: Iterable | None,
            level_up_type_arg_idx: int | slice) -> Callable[..., Generator]:
        def _convert_full_element_model_generator(elements=elements):
            for el in elements:
                yield self._convert_to_model_if_reasonable(
                    el,
                    level_up=True,
                    level_up_arg_idx=level_up_type_arg_idx,
                )

        return _convert_full_element_model_generator

    def _get_convert_element_value_model_generator(
            self, elements: Iterable | None) -> Callable[..., Generator]:
        def _convert_element_value_model_generator(elements=elements):
            for el in elements:
                yield (
                    el[0],
                    self._convert_to_model_if_reasonable(
                        el[1],
                        level_up=True,
                        level_up_arg_idx=1,
                    ),
                )

        return _convert_element_value_model_generator

    def _convert_to_model_if_reasonable(  # noqa: C901
        self,
        ret: Mapping[_KeyT, _ValT] | Iterable[_ValT] | _ReturnT | _RootT,
        level_up: bool = False,
        level_up_arg_idx: int = 1,
        raise_validation_errors: bool = False,
    ) -> ('Model[_KeyT] | Model[_ValT] | Model[tuple[_KeyT, _ValT]] '
          '| Model[_ReturnT] | Model[_RootT] | _ReturnT'):

        if level_up and not self.config.dynamically_convert_elements_to_models:
            ...
        elif is_model_instance(ret):
            ...
        else:
            outer_type = self.outer_type(with_args=True)
            # For double Models, e.g. Model[Model[int]], where _get_real_contents() have already
            # skipped the outer Model to get the `ret`, we need to do the same to compare the value
            # with the corresponding type.
            if lenient_issubclass(ensure_plain_type(outer_type), Model):
                outer_type = cast(Model, outer_type).outer_type(with_args=True)

            for type_to_check in all_type_variants(outer_type):
                plain_type_to_check = ensure_plain_type(type_to_check)
                if plain_type_to_check in (ForwardRef, TypeVar, None):
                    continue

                if level_up:
                    type_args = get_args(type_to_check)
                    if len(type_args) == 0:
                        type_args = (type_to_check,)
                    if type_args:
                        for level_up_type_to_check in all_type_variants(
                                type_args[level_up_arg_idx]):
                            level_up_type_to_check = self._fix_tuple_type_from_args(
                                level_up_type_to_check)
                            if self._is_instance_or_literal(
                                    ret,
                                    ensure_plain_type(level_up_type_to_check),
                                    level_up_type_to_check,
                            ):
                                try:
                                    return Model[level_up_type_to_check](ret)  # type: ignore
                                except ValidationError:
                                    if raise_validation_errors:
                                        raise
                                except TypeError:
                                    pass

                else:
                    if self._is_instance_or_literal(
                            ret,
                            plain_type_to_check,
                            type_to_check,
                    ):
                        try:
                            return self.__class__(ret)
                        except ValidationError:
                            if raise_validation_errors:
                                raise
                        except TypeError:
                            pass

        return cast(_ReturnT, ret)

    @staticmethod
    def _is_instance_or_literal(obj: object, plain_type: type, raw_type: type | GenericAlias):
        if plain_type is Literal:
            args = get_args(raw_type)
            for arg in args:
                if obj == arg:
                    return True
            return False
        else:
            return lenient_isinstance(obj, plain_type)

    def _fix_tuple_type_from_args(
        self, level_up_type_to_check: type | GenericAlias | tuple[type | GenericAlias, ...]
    ) -> type | GenericAlias:
        if isinstance(level_up_type_to_check, tuple):
            match len(level_up_type_to_check):
                case 1:
                    return level_up_type_to_check[0]
                case _:
                    return tuple[level_up_type_to_check]  # type: ignore[valid-type]
        else:
            return level_up_type_to_check

    def __getattr__(self, attr: str) -> Any:
        if self._is_non_omnipy_pydantic_model() and self._contents_obj_hasattr(attr):
            self._validate_and_set_value(self.contents)

        contents_attr = self._getattr_from_contents_obj(attr)

        if inspect.isroutine(contents_attr):
            reset_solution = self._prepare_reset_solution_take_snapshot_if_needed().reset_solution
            new_contents_attr = self._getattr_from_contents_obj(attr)

            def _validate_contents(ret: Any):
                self._validate_and_set_value(self.contents, reset_solution=reset_solution)
                return ret

            contents_attr = add_callback_after_call(new_contents_attr,
                                                    _validate_contents,
                                                    reset_solution)

        if attr in ('values', 'items'):
            match attr:
                case 'values':
                    _model_generator = self._get_convert_full_element_model_generator(
                        None,
                        level_up_type_arg_idx=1,
                    )
                case 'items':
                    _model_generator = self._get_convert_element_value_model_generator(None,)

            contents_attr = add_callback_after_call(contents_attr, _model_generator, no_context)

        return contents_attr

    def _is_non_omnipy_pydantic_model(self) -> bool:
        return is_non_omnipy_pydantic_model(self._get_real_contents())

    def _contents_obj_hasattr(self, attr) -> object:
        return hasattr(self._get_real_contents(), attr)

    def _getattr_from_contents_obj(self, attr) -> object:
        return getattr(self._get_real_contents(), attr)

    def _getattr_from_contents_cls(self, attr) -> object:
        return getattr(self._get_real_contents().__class__, attr)

    def _get_real_contents(self) -> object:
        if is_model_instance(self.contents):
            return self.contents.contents
        else:
@github-actions github-actions bot added the todo label Sep 16, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

0 participants