Source code for graviti.dataframe.sql.array

#!/usr/bin/env python3
#
# Copyright 2022 Graviti. Licensed under MIT License.
#

"""The implementation of the search related array."""

from typing import Any, Callable, ClassVar, Dict, Type, TypeVar

import pyarrow as pa

import graviti.portex as pt
from graviti.dataframe.sql.container import _E, ArrayContainer, SearchContainerRegister
from graviti.dataframe.sql.scalar import (
    NUMERICAL_PRIORITIES,
    BooleanScalar,
    DateScalar,
    EnumScalar,
    NumberScalar,
    RowSeries,
    StringScalar,
    TimedeltaScalar,
    TimeScalar,
    TimestampScalar,
)

_LOM = TypeVar("_LOM", bound="LogicalOperatorsMixin")
_EOM = TypeVar("_EOM", bound="EqualOperatorsMixin")
_COM = TypeVar("_COM", bound="ComparisonOperatorsMixin")
_AOM = TypeVar("_AOM", bound="ArithmeticOperatorsMixin")
_NA = TypeVar("_NA", bound="NumberArray")
_EA = TypeVar("_EA", bound="EnumArray")
_TAB = TypeVar("_TAB", bound="TemporalArrayBase")  # pylint: disable=invalid-name


[docs]class LogicalOperatorsMixin(ArrayContainer): """A mixin for dynamically implementing logical operators.""" _LOCICAL_OPERATORS: ClassVar[Dict[str, str]] = { "__and__": "and", "__or__": "or", } def __init_subclass__(cls) -> None: super().__init_subclass__() for meth, opt in cls._LOCICAL_OPERATORS.items(): setattr(cls, meth, cls._get_logical_operator(opt)) @classmethod def _get_logical_operator(cls: Type[_LOM], opt: str) -> Callable[[_LOM, Any], "BooleanArray"]: def func(self: _LOM, other: Any) -> "BooleanArray": other_expr = other.expr if isinstance(other, ArrayContainer) else other expr = {f"${opt}": [self.expr, other_expr]} return BooleanArray(expr, pt.boolean(), self.upper_expr) return func
[docs]class EqualOperatorsMixin(ArrayContainer): """A mixin for dynamically implementing euqal operators.""" _EQUAL_OPERATORS: ClassVar[Dict[str, str]] = { "__eq__": "eq", "__ne__": "ne", } def __init_subclass__(cls) -> None: super().__init_subclass__() for meth, opt in cls._EQUAL_OPERATORS.items(): setattr(cls, meth, cls._get_equal_operator(opt)) @classmethod def _get_equal_operator(cls: Type[_EOM], opt: str) -> Callable[[_EOM, Any], "BooleanArray"]: def func(self: _EOM, other: Any) -> "BooleanArray": other_expr = other.expr if isinstance(other, ArrayContainer) else other expr = {f"${opt}": [self.expr, other_expr]} return BooleanArray(expr, pt.boolean(), self.upper_expr) return func
[docs]class ComparisonOperatorsMixin(ArrayContainer): """A mixin for dynamically implementing comparison operators.""" _COMPARISON_OPERATORS: ClassVar[Dict[str, str]] = { "__gt__": "gt", "__ge__": "gte", "__lt__": "lt", "__le__": "lte", } def __init_subclass__(cls) -> None: super().__init_subclass__() for meth, opt in cls._COMPARISON_OPERATORS.items(): setattr(cls, meth, cls._get_comparison_operator(opt)) @classmethod def _get_comparison_operator( cls: Type[_COM], opt: str ) -> Callable[[_COM, Any], "BooleanArray"]: def func(self: _COM, other: Any) -> "BooleanArray": if isinstance(other, ArrayContainer): if not isinstance(other, type(self)): raise TypeError( f"Invalid '{opt}' operation between {self.schema} and {other.schema}" ) other_expr = other.expr else: other_expr = other expr = {f"${opt}": [self.expr, other_expr]} return BooleanArray(expr, pt.boolean(), self.upper_expr) return func
[docs]class ArithmeticOperatorsMixin(ArrayContainer): """A mixin for dynamically implementing arithmetic operators.""" _ARITHMETIC_OPERATORS: ClassVar[Dict[str, str]] = { "__div__": "div", "__mod__": "mod", "__pow__": "pow", "__sub__": "sub", "__mul__": "mult", "__add__": "add", } def __init_subclass__(cls) -> None: super().__init_subclass__() for meth, opt in cls._ARITHMETIC_OPERATORS.items(): setattr(cls, meth, cls._get_arithmetic_operator(opt)) @classmethod def _get_arithmetic_operator(cls: Type[_AOM], opt: str) -> Callable[[_AOM, Any], Any]: raise NotImplementedError
[docs]class Array(ArrayContainer): """One-dimensional array for portex builtin type array.""" prefix = "$."
[docs] def query(self, func: Callable[[Any], Any]) -> "Array": """Query the data of an ArraySeries with a lambda function. Arguments: func: The query function. Returns: The ArraySeries with the query expression. """ item_container = self.schema.search_container.item_container expr = { "$filter": [ self.upper_expr, func(item_container.from_upper(self.expr, self.schema)).expr, ] } return Array(expr, self.schema, self.upper_expr)
[docs] def any(self) -> BooleanScalar: """Whether any element is True. Returns: The BooleanSeries with the any expression. """ expr = {"$any_match": [self.upper_expr, self.expr]} return BooleanScalar(expr)
[docs] def all(self) -> BooleanScalar: """Whether all elements are True. Returns: The BooleanSeries with the all expression. """ expr = {"$all_match": [self.upper_expr, self.expr]} return BooleanScalar(expr)
@SearchContainerRegister(pt.boolean)
[docs]class BooleanArray(Array, LogicalOperatorsMixin, EqualOperatorsMixin): """One-dimensional array for portex builtin type array with the boolean items.""" item_container = BooleanScalar
@SearchContainerRegister(pt.string)
[docs]class StringArray(Array, LogicalOperatorsMixin, EqualOperatorsMixin): """One-dimensional array for portex builtin type array with the string and enum items.""" item_container = StringScalar
@SearchContainerRegister(pt.enum)
[docs]class EnumArray(Array, EqualOperatorsMixin): """One-dimensional array for portex builtin type array with the string and enum items.""" item_container = EnumScalar @classmethod def _get_equal_operator(cls: Type[_EA], opt: str) -> Callable[[_EA, Any], "BooleanArray"]: def func(self: _EA, other: Any) -> "BooleanArray": if isinstance(other, ArrayContainer): other_expr: Any = other.expr else: enum: pt.enum = self.schema.to_builtin() # type: ignore[assignment] other_expr = enum.values.value_to_index[other] expr = {f"${opt}": [self.expr, other_expr]} return BooleanArray(expr, pt.boolean(), self.upper_expr) return func
[docs]class TemporalArrayBase(Array, EqualOperatorsMixin, ComparisonOperatorsMixin): """One-dimensional array for portex builtin temporal types.""" def _time_to_int(self, value: Any) -> int: return pa.scalar(value, self.schema.to_pyarrow()).value # type: ignore[no-any-return] @classmethod def _get_equal_operator(cls: Type[_TAB], opt: str) -> Callable[[_TAB, Any], "BooleanArray"]: def func(self: _TAB, other: Any) -> "BooleanArray": if isinstance(other, ArrayContainer): other_expr: Any = other.expr else: other_expr = self._time_to_int(other) # pylint: disable=protected-access expr = {f"${opt}": [self.expr, other_expr]} return BooleanArray(expr, pt.boolean(), self.upper_expr) return func @classmethod def _get_comparison_operator( cls: Type[_TAB], opt: str ) -> Callable[[_TAB, Any], "BooleanArray"]: def func(self: _TAB, other: Any) -> "BooleanArray": if isinstance(other, ArrayContainer): if not isinstance(other, type(self)): raise TypeError( f"Invalid '{opt}' operation between {self.schema} and {other.schema}" ) other_expr: Any = other.expr else: other_expr = self._time_to_int(other) # pylint: disable=protected-access expr = {f"${opt}": [self.expr, other_expr]} return BooleanArray(expr, pt.boolean(), self.upper_expr) return func
@SearchContainerRegister(pt.date)
[docs]class DateArray(TemporalArrayBase): """One-dimensional array for portex builtin date type.""" item_container = DateScalar def _time_to_int(self, value: Any) -> int: scalar = pa.scalar(value, self.schema.to_pyarrow()) return scalar.cast(pa.int32()).as_py() # type: ignore[no-any-return]
@SearchContainerRegister(pt.time)
[docs]class TimeArray(TemporalArrayBase): """One-dimensional array for portex builtin time type.""" item_container = TimeScalar
@SearchContainerRegister(pt.timestamp)
[docs]class TimestampArray(TemporalArrayBase): """One-dimensional array for portex builtin timestamp type.""" item_container = TimestampScalar
@SearchContainerRegister(pt.timedelta)
[docs]class TimedeltaArray(TemporalArrayBase): """One-dimensional array for portex builtin timedelta type.""" item_container = TimedeltaScalar
@SearchContainerRegister( pt.float32, pt.float64, pt.int32, pt.int64, )
[docs]class NumberArray(Array, ComparisonOperatorsMixin, ArithmeticOperatorsMixin): """One-dimensional array for portex builtin type array with the numerical items.""" item_container = NumberScalar @classmethod def _get_arithmetic_operator(cls: Type[_NA], opt: str) -> Callable[[_NA, Any], _NA]: def func(self: _NA, other: Any) -> _NA: if isinstance(other, ArrayContainer): if not isinstance(other, type(self)): raise TypeError( f"Invalid '{opt}' operation between {self.schema} and {other.schema}" ) other_expr = other.expr schema = ( self.schema if NUMERICAL_PRIORITIES[self.schema.__class__] > NUMERICAL_PRIORITIES[other.schema.__class__] else other.schema ) else: other_expr = other schema = self.schema expr = {f"${opt}": [self.expr, other_expr]} return cls(expr, schema, self.upper_expr) return func def _check_upper_expr(self, opt: str) -> None: if self.expr != self.prefix: raise TypeError(f"{opt} operation only support for numerical array.")
[docs] def size(self) -> NumberScalar: """Get the length of array series. Returns: The NumberScalar with the size expression. """ self._check_upper_expr("size") return NumberScalar({"$size": [self.upper_expr]}, self.schema)
[docs] def max(self) -> "NumberScalar": """Get the max value of array series. Returns: The NumberScalar with the max expression. """ self._check_upper_expr("max") return NumberScalar({"$max": [self.upper_expr]}, self.schema)
[docs] def min(self) -> NumberScalar: """Get the min value of array series. Returns: The NumberScalar with the min expression. """ self._check_upper_expr("min") return NumberScalar({"$min": [self.upper_expr]}, self.schema)
[docs] def sum(self) -> NumberScalar: """Get the sum of array series. Returns: The NumberScalar with the sum expression. """ self._check_upper_expr("sum") return NumberScalar({"$sum": [self.upper_expr]}, self.schema)
@SearchContainerRegister(pt.record)
[docs]class DataFrame(ArrayContainer): """The Two-dimensional array for the search.""" prefix = "$" item_container = RowSeries schema: pt.PortexRecordBase def __getitem__(self, key: str) -> ArrayContainer: field = self.schema[key] return field.search_container(f"{self.expr}.{key}", field, self.upper_expr)
[docs] def query(self, func: Callable[[Any], Any]) -> "DataFrame": """Query the data of an ArraySeries with a lambda function. Arguments: func: The query function. Returns: The DataFrame with the query expression. """ expr = {"$filter": [self.upper_expr, func(RowSeries(self.schema)).expr]} return DataFrame(expr, self.schema, self.upper_expr)
[docs]class ArrayDistributor(ArrayContainer): """A distributor to instance DataFrame, ArrayScalar by different array items.""" @classmethod
[docs] def from_upper(cls, expr: _E, schema: pt.PortexType) -> "ArrayContainer": """Instantiate a Search object from the upper level. Arguments: expr: The expression of the search. schema: The schema of the series. Returns: The loaded object. """ items: pt.PortexType = schema.to_builtin().items # type: ignore[attr-defined] return items.search_container.from_upper(expr, items)
@SearchContainerRegister(pt.array)
[docs]class ArraySeries(ArrayContainer): """The One-dimensional array for the search.""" item_container = ArrayDistributor