Source code for graviti.portex.base

#!/usr/bin/env python3
#
# Copyright 2022 Graviti. Licensed under MIT License.
#
"""The base elements of Portex type."""


import json
from copy import deepcopy
from typing import TYPE_CHECKING, Any, ClassVar, Dict, Optional, Type, TypeVar

import pyarrow as pa
import yaml

from graviti.portex.package import Imports, Package
from graviti.portex.register import PyArrowConversionRegister
from graviti.utility import INDENT, PathLike, UserMutableMapping

if TYPE_CHECKING:
    from graviti.dataframe import Container
    from graviti.dataframe.sql.container import ArrayContainer
    from graviti.portex.builtin import PortexBuiltinType
    from graviti.portex.factory import ConnectedFieldsFactory
    from graviti.portex.field import ConnectedFields
    from graviti.portex.param import Params

[docs]PYARROW_TYPE_ID_TO_PORTEX_TYPE = PyArrowConversionRegister.PYARROW_TYPE_ID_TO_PORTEX_TYPE
_T = TypeVar("_T", bound="PortexType")
[docs]class PortexType: """The base class of portex type.""" nullable: bool package: ClassVar[Package[Any]] params: ClassVar["Params"] container: ClassVar[Type["Container"]] element: ClassVar[Type[Any]] search_container: ClassVar[Type["ArrayContainer"]] def __repr__(self) -> str: return self._repr1(0) @classmethod def _from_pyarrow(cls: Type[_T], paarray: pa.Array) -> _T: raise NotImplementedError def _repr1(self, level: int) -> str: with_params = False indent = level * INDENT lines = [f"{self.__class__.__name__}("] for name, parameter in self.params.items(): attr = getattr(self, name) if attr != parameter.default: with_params = True lines.append( f"{INDENT}{name}=" # pylint: disable=protected-access f"{attr._repr1(level + 1) if hasattr(attr, '_repr1') else repr(attr)}," ) if with_params: lines.append(")") return f"\n{indent}".join(lines) return f"{lines[0]})" def _get_column_count(self) -> int: """Get the total column count of the portex type. Returns: The total column count. """ return 1 @property
[docs] def imports(self) -> Imports: """Get the PortexType imports. Returns: The :class:`Imports` instance of this PortexType. """ imports = Imports() cls = self.__class__ imports[cls.__name__] = cls for name in self.params: argument = getattr(self, name) argument_imports = getattr(argument, "imports", None) if argument_imports: imports.update(argument_imports) return imports
@classmethod
[docs] def from_pyobj( cls: Type[_T], content: Dict[str, Any], _imports: Optional["Imports"] = None ) -> _T: """Create Portex type instance from python dict. Arguments: content: A python dict representing a Portex type. Returns: A Portex type instance created from the input python dict. """ if _imports is None: _imports = Imports.from_pyobj(content.get("imports", [])) class_: Type[_T] = _imports[content["type"]] # type: ignore[assignment] assert issubclass(class_, cls) kwargs = {} for name, param in class_.params.items(): kwarg = content.get(name, ...) if kwarg is not ...: kwargs[name] = param.load(kwarg, _imports) type_ = class_(**kwargs) return type_
@classmethod
[docs] def from_pyarrow(cls: Type[_T], paarray: pa.Array) -> _T: """Create Portex type instance from PyArrow type. Arguments: paarray: The PyArrow array. Raises: TypeError: When the PyArrow type is not supported. Returns: The created Portex type instance. """ patype = paarray.type try: portex_type = PYARROW_TYPE_ID_TO_PORTEX_TYPE[patype.id] except KeyError: raise TypeError(f'Not supported PyArrow type "{patype}"') from None # pylint: disable=protected-access return portex_type._from_pyarrow(paarray) # type: ignore[return-value]
@classmethod
[docs] def from_json(cls: Type[_T], content: str) -> _T: """Create Portex type instance from JSON string. Arguments: content: A JSON string representing a Portex type. Returns: A Portex type instance created from the input JSON string. """ return cls.from_pyobj(json.loads(content))
@classmethod
[docs] def from_yaml(cls: Type[_T], content: str) -> _T: """Create Portex type instance from YAML string. Arguments: content: A YAML string representing a Portex type. Returns: A Portex type instance created from the input YAML string. """ return cls.from_pyobj(yaml.load(content, yaml.Loader))
[docs] def to_pyobj(self, _with_imports: bool = True) -> Dict[str, Any]: """Dump the instance to a python dict. Returns: A python dict representation of the Portex type. """ pydict: Dict[str, Any] = {} if _with_imports: imports_pyobj = self.imports.to_pyobj() if imports_pyobj: pydict["imports"] = imports_pyobj pydict["type"] = self.__class__.__name__ for name, parameter in self.params.items(): attr = getattr(self, name) if attr != parameter.default: pydict[name] = parameter.dump(attr) return pydict
[docs] def to_json(self) -> str: """Dump the instance to a JSON string. Returns: A JSON representation of the Portex type. """ return json.dumps(self.to_pyobj(), ensure_ascii=False)
[docs] def to_yaml(self) -> str: """Dump the instance to a YAML string. Returns: A YAML representation of the Portex type. """ return yaml.dump( # type: ignore[no-any-return] self.to_pyobj(), sort_keys=False, allow_unicode=True )
[docs] def to_pyarrow(self, *, _to_backend: bool = False) -> pa.DataType: """Convert the Portex type to the corresponding builtin PyArrow DataType. Raises: NotImplementedError: The method of the base class should not be called. Return: The corresponding builtin PyArrow DataType. """ raise NotImplementedError
[docs] def to_builtin(self) -> "PortexBuiltinType": """Expand the top level type to Portex builtin type. Raises: NotImplementedError: The method of the base class should not be called. """ raise NotImplementedError
[docs] def copy(self: _T) -> _T: """Get a copy of the portex type. Returns: A copy of the portex type. """ return deepcopy(self)
[docs]class PortexRecordBase( PortexType, UserMutableMapping[str, PortexType] ): # pylint: disable=abstract-method """The base class of record like Portex types.""" _fields_factory: "ConnectedFieldsFactory" @property def _data(self) -> "ConnectedFields": # type: ignore[override] return self._fields_factory({name: getattr(self, name) for name in self.params}) def _get_column_count(self) -> int: """Get the total column count of the record base type. Returns: The total column count. """ return sum( portex_type._get_column_count() # pylint: disable=protected-access for portex_type in self._data.values() )
[docs] def insert(self, index: int, name: str, portex_type: PortexType) -> None: """Insert the name and portex_type at the index. Arguments: index: The index to insert the field. name: The name of the field to be inserted. portex_type: The portex_type of the field to be inserted. """ self._data.insert(index, name, portex_type)
[docs] def astype(self, name: str, portex_type: PortexType) -> None: """Convert the type of the field with the given name to the new PortexType. Arguments: name: The name of the field to convert. portex_type: The new PortexType of the field to convert to. """ self._data.astype(name, portex_type)
[docs] def rename(self, old_name: str, new_name: str) -> None: """Rename the name of a field. Arguments: old_name: The current name of the field to be renamed. new_name: The new name of the field to assign. """ self._data.rename(old_name, new_name)
[docs] def to_pyarrow(self, *, _to_backend: bool = False) -> pa.StructType: """Convert the Portex type to the corresponding builtin PyArrow StructType. Returns: The corresponding builtin PyArrow StructType. """ return pa.struct( [ pa.field(key, value.to_pyarrow(_to_backend=_to_backend)) for key, value in self._data.items() ] )
[docs]def read_yaml(path: PathLike) -> PortexType: """Read a yaml file into Portex type. Arguments: path: The path of the yaml file. Returns: A Portex type instance created from the input yaml file. """ with open(path, encoding="utf-8") as fp: return PortexType.from_pyobj(yaml.load(fp, yaml.Loader))
[docs]def read_json(path: PathLike) -> PortexType: """Read a json file into Portex type. Arguments: path: The path of the json file. Returns: A Portex type instance created from the input json file. """ with open(path, encoding="utf-8") as fp: return PortexType.from_pyobj(json.load(fp))