Source code for graviti.dataframe.sql.scalar

#!/usr/bin/env python3
#
# Copyright 2022 Graviti. Licensed under MIT License.
#

"""The implementation of the search related Scalar."""

from typing import Any, Callable, ClassVar, Dict, Type, TypeVar, Union

import pyarrow as pa

import graviti.portex as pt
from graviti.dataframe.sql.container import _E, ArrayContainer, ScalarContainer

[docs]NUMERICAL_PRIORITIES: Dict[Type[pt.PortexType], int] = { pt.int32: 0, pt.int64: 1, pt.float32: 2, pt.float64: 3, }
_LOM = TypeVar("_LOM", bound="LogicalOperatorsMixin") _EOM = TypeVar("_EOM", bound="EqualOperatorsMixin") _COM = TypeVar("_COM", bound="ComparisonOperatorsMixin") _AOM = TypeVar("_AOM", bound="ArithmeticOperatorsMixin") _NS = TypeVar("_NS", bound="NumberScalar") _ES = TypeVar("_ES", bound="EnumScalar") _TSB = TypeVar("_TSB", bound="TemporalScalarBase") # pylint: disable=invalid-name
[docs]class LogicalOperatorsMixin(ScalarContainer): """A mixin for dynamically implementing logical operators.""" _LOCICAL_OPERATORS: ClassVar[Dict[str, str]] = { "__and__": "and", "__or__": "or", } def __init_subclass__(cls) -> None: super().__init_subclass__() for meth, opt in cls._LOCICAL_OPERATORS.items(): setattr(cls, meth, cls._get_logical_operator(opt)) @classmethod def _get_logical_operator(cls: Type[_LOM], opt: str) -> Callable[[_LOM, Any], "BooleanScalar"]: def func(self: _LOM, other: Any) -> "BooleanScalar": other_expr = other.expr if isinstance(other, ScalarContainer) else other expr = {f"${opt}": [self.expr, other_expr]} return BooleanScalar(expr) return func
[docs]class EqualOperatorsMixin(ScalarContainer): """A mixin for dynamically implementing equal operators.""" _EQUAL_OPERATORS: ClassVar[Dict[str, str]] = { "__eq__": "eq", "__ne__": "ne", } def __init_subclass__(cls) -> None: super().__init_subclass__() for meth, opt in cls._EQUAL_OPERATORS.items(): setattr(cls, meth, cls._get_equal_operator(opt)) @classmethod def _get_equal_operator(cls: Type[_EOM], opt: str) -> Callable[[_EOM, Any], "BooleanScalar"]: def func(self: _EOM, other: Any) -> "BooleanScalar": other_expr = other.expr if isinstance(other, ScalarContainer) else other expr = {f"${opt}": [self.expr, other_expr]} return BooleanScalar(expr) return func
[docs]class ComparisonOperatorsMixin(ScalarContainer): """A mixin for dynamically implementing comparison operators.""" _COMPARISON_OPERATORS: ClassVar[Dict[str, str]] = { "__gt__": "gt", "__ge__": "gte", "__lt__": "lt", "__le__": "lte", } def __init_subclass__(cls) -> None: super().__init_subclass__() for meth, opt in cls._COMPARISON_OPERATORS.items(): setattr(cls, meth, cls._get_comparison_operator(opt)) @classmethod def _get_comparison_operator( cls: Type[_COM], opt: str ) -> Callable[[_COM, Any], "BooleanScalar"]: def func(self: _COM, other: Any) -> "BooleanScalar": if isinstance(other, ScalarContainer): if not isinstance(other, type(self)): raise TypeError( f"Invalid '{opt}' operation between {self.schema} and {other.schema}" ) other_expr = other.expr else: other_expr = other expr = {f"${opt}": [self.expr, other_expr]} return BooleanScalar(expr) return func
[docs]class ArithmeticOperatorsMixin(ScalarContainer): """A mixin for dynamically implementing arithmetic operators.""" _ARITHMETIC_OPERATORS: ClassVar[Dict[str, str]] = { "__div__": "div", "__mod__": "mod", "__pow__": "pow", "__sub__": "sub", "__mul__": "mult", "__add__": "add", } def __init_subclass__(cls) -> None: super().__init_subclass__() for meth, opt in cls._ARITHMETIC_OPERATORS.items(): setattr(cls, meth, cls._get_arithemtic_operator(opt)) @classmethod def _get_arithemtic_operator(cls: Type[_AOM], opt: str) -> Callable[[_AOM, Any], Any]: raise NotImplementedError
[docs]class NumberScalar( LogicalOperatorsMixin, EqualOperatorsMixin, ComparisonOperatorsMixin, ArithmeticOperatorsMixin ): """One-dimensional array for numerical portex builtin type.""" @classmethod def _get_arithemtic_operator(cls: Type[_NS], opt: str) -> Callable[[_NS, Any], _NS]: def func(self: _NS, other: Any) -> _NS: if isinstance(other, ScalarContainer): if not isinstance(other, type(self)): raise TypeError( f"Invalid '{opt}' operation between {self.schema} and {other.schema}" ) other_expr = other.expr schema = ( self.schema if NUMERICAL_PRIORITIES[self.schema.__class__] > NUMERICAL_PRIORITIES[other.schema.__class__] else other.schema ) else: other_expr = other schema = self.schema expr = {f"${opt}": [self.expr, other_expr]} return cls(expr, schema) return func
[docs]class BooleanScalar(LogicalOperatorsMixin, EqualOperatorsMixin): """One-dimensional array for portex builtin type boolean.""" def __init__(self, expr: _E) -> None: super().__init__(expr, pt.boolean())
[docs]class StringScalar(LogicalOperatorsMixin, EqualOperatorsMixin): """One-dimensional array for portex builtin type string."""
[docs]class EnumScalar(EqualOperatorsMixin): """One-dimensional array for portex builtin type enum.""" @classmethod def _get_equal_operator(cls: Type[_ES], opt: str) -> Callable[[_ES, Any], "BooleanScalar"]: def func(self: _ES, other: Any) -> "BooleanScalar": if isinstance(other, ScalarContainer): other_expr: Any = other.expr else: enum: pt.enum = self.schema.to_builtin() # type: ignore[assignment] other_expr = enum.values.value_to_index[other] expr = {f"${opt}": [self.expr, other_expr]} return BooleanScalar(expr) return func
[docs]class TemporalScalarBase(EqualOperatorsMixin, ComparisonOperatorsMixin): """One-dimensional array for portex builtin temporal type.""" def _time_to_int(self, value: Any) -> int: return pa.scalar(value, self.schema.to_pyarrow()).value # type: ignore[no-any-return] @classmethod def _get_equal_operator(cls: Type[_TSB], opt: str) -> Callable[[_TSB, Any], "BooleanScalar"]: def func(self: _TSB, other: Any) -> "BooleanScalar": if isinstance(other, ScalarContainer): other_expr: Any = other.expr else: other_expr = self._time_to_int(other) # pylint: disable=protected-access expr = {f"${opt}": [self.expr, other_expr]} return BooleanScalar(expr) return func @classmethod def _get_comparison_operator( cls: Type[_TSB], opt: str ) -> Callable[[_TSB, Any], "BooleanScalar"]: def func(self: _TSB, other: Any) -> "BooleanScalar": if isinstance(other, ScalarContainer): if not isinstance(other, type(self)): raise TypeError( f"Invalid '{opt}' operation between {self.schema} and {other.schema}" ) other_expr: Any = other.expr else: other_expr = self._time_to_int(other) # pylint: disable=protected-access expr = {f"${opt}": [self.expr, other_expr]} return BooleanScalar(expr) return func
[docs]class DateScalar(TemporalScalarBase): """One-dimensional array for portex builtin date type.""" def _time_to_int(self, value: Any) -> int: scalar = pa.scalar(value, self.schema.to_pyarrow()) return scalar.cast(pa.int32()).as_py() # type: ignore[no-any-return]
[docs]class TimeScalar(TemporalScalarBase): """One-dimensional array for portex builtin time type."""
[docs]class TimestampScalar(TemporalScalarBase): """One-dimensional array for portex builtin timestamp type."""
[docs]class TimedeltaScalar(TemporalScalarBase): """One-dimensional array for portex builtin timedelta type."""
[docs]class RowSeries(ScalarContainer): """The One-dimensional array for the search.""" schema: pt.PortexRecordBase def __init__(self, schema: pt.PortexRecordBase) -> None: super().__init__("$", schema) def __getitem__(self, key: str) -> Union[ArrayContainer, ScalarContainer]: field = self.schema[key] return field.search_container.item_container.from_upper(f"{self.expr}.{key}", field)