Source code for graviti.portex.field

#!/usr/bin/env python3
#
# Copyright 2022 Graviti. Licensed under MIT License.
#
"""Portex record field releated classes."""


from collections import ChainMap
from itertools import chain
from typing import Any
from typing import ChainMap as ChainMapType
from typing import (
    Dict,
    Iterable,
    Iterator,
    List,
    Mapping,
    MutableMapping,
    Optional,
    Set,
    Tuple,
    TypeVar,
    Union,
)

import pyarrow as pa

from graviti.portex.base import PortexType
from graviti.portex.package import Imports
from graviti.utility import INDENT, FrozenNameOrderedDict, NameOrderedDict

_T = TypeVar("_T", bound="Fields")


[docs]class FrozenFields(FrozenNameOrderedDict[PortexType]): """Represents a frozen fields dict.""" def __repr__(self) -> str: return self._repr1(0) def __setitem__(self, key: Union[int, str], value: PortexType) -> None: raise TypeError(f"Cannot set item '{key}' in {self.__class__.__name__}") def __delitem__(self, key: Union[int, str]) -> None: raise TypeError(f"Cannot delete item '{key}' in {self.__class__.__name__}") @classmethod def _check_value(cls, value: PortexType) -> None: if not isinstance(value, PortexType): raise TypeError(f'The value in "{cls.__name__}" should be a PortexType') def _setitem(self, key: str, value: PortexType) -> None: self._check_value(value) super()._setitem(key, value) def _repr1(self, level: int) -> str: lines = ["{"] for name, portex_type in self.items(): lines.append( f"{INDENT}'{name}': " # pylint: disable=protected-access f"{portex_type._repr1(level + 1)}," ) lines.append("}") return f"\n{level * INDENT}".join(lines)
[docs] def insert(self, index: int, name: str, portex_type: PortexType) -> None: """Insert the name and portex_type at the index. Arguments: index: The index to insert the field. name: The name of the field to be inserted. portex_type: The portex_type of the field to be inserted. Raises: TypeError: When calling this method of FrozenFields. """ raise TypeError(f"Cannot insert in {self.__class__.__name__}")
[docs] def astype(self, name: str, portex_type: PortexType) -> None: """Convert the type of the field with the given name to the new PortexType. Arguments: name: The name of the field to convert. portex_type: The new PortexType of the field to convert to. Raises: TypeError: When calling this method of FrozenFields. """ raise TypeError(f"Cannot change the type of '{name}' in {self.__class__.__name__}")
[docs] def rename(self, old_name: str, new_name: str) -> None: """Rename the name of a field. Arguments: old_name: The current name of the field to be renamed. new_name: The new name of the field to assign. Raises: TypeError: When calling this method of FrozenFields. """ raise TypeError(f"Cannot rename '{old_name}' in {self.__class__.__name__}")
[docs]class Fields(NameOrderedDict[PortexType], FrozenFields): # type: ignore[misc] """Represents a Portex ``record`` fields dict.""" def __init__( self, fields: Union[Iterable[Tuple[str, PortexType]], Mapping[str, PortexType], None] = None ): super().__init__(fields) @property
[docs] def imports(self) -> Imports: """Get the Fields imports. Returns: The :class:`Imports` instance of this Fields. """ imports = Imports() for portex_type in self._data.values(): imports.update(portex_type.imports) return imports
[docs] def insert(self, index: int, name: str, portex_type: PortexType) -> None: """Insert the name and portex_type at the index. Arguments: index: The index to insert the field. name: The name of the field to be inserted. portex_type: The portex_type of the field to be inserted. Raises: KeyError: When the name already exists in the Fields. """ self._check_key(name) self._check_value(portex_type) if name in self: raise KeyError(f'"{name}" already exists in the Fields') self._keys.insert(index, name) self._data[name] = portex_type
[docs] def astype(self, name: str, portex_type: PortexType) -> None: """Convert the type of the field with the given name to the new PortexType. Arguments: name: The name of the field to convert. portex_type: The new PortexType of the field to convert to. Raises: KeyError: When the name does not exist in the Fields. """ self._check_value(portex_type) if name not in self: raise KeyError(f'"{name}" does not exist in the Fields') self._data[name] = portex_type
[docs] def rename(self, old_name: str, new_name: str) -> None: """Rename the name of a field. Arguments: old_name: The current name of the field to be renamed. new_name: The new name of the field to assign. """ self._check_key(new_name) self._data[new_name] = self._data.pop(old_name) self._keys[self._keys.index(old_name)] = new_name
@classmethod
[docs] def from_pyobj( cls, content: List[Dict[str, Any]], imports: Optional[Imports] = None ) -> "Fields": """Create Portex fields dict instance from python list. Arguments: content: A python list representing a Portex fields dict. imports: The imports of the Portex fields dict. Returns: A Portex fields dict instance created from the input python list. """ return Fields((item["name"], PortexType.from_pyobj(item, imports)) for item in content)
[docs] def to_pyobj(self) -> List[Dict[str, Any]]: """Dump the instance to a python list. Returns: A Python List representation of the fields dict. """ return [{"name": name, **portex_type.to_pyobj(False)} for name, portex_type in self.items()]
[docs] def to_pyarrow(self, *, _to_backend: bool = False) -> List[pa.Field]: """Convert the fields to a list of PyArrow Field. Returns: A list of PyArrow Field representing the fields of Portex record. """ return [ pa.field(key, value.to_pyarrow(_to_backend=_to_backend)) for key, value in self.items() ]
[docs] def copy(self: _T) -> _T: """Get a copy of the fields. Returns: A copy of the fields. """ return self.__class__((name, portex_type.copy()) for name, portex_type in self.items())
[docs]UnionFields = Union[FrozenFields, Fields]
[docs]class ConnectedFields(MutableMapping[str, PortexType]): """Fields composed of FrozenFields and Fields. Raises: ValueError: When there as repeated field names. Arguments: multi_fields: The FrozenFields and Fields. """ def __init__(self, multi_fields: Iterable[UnionFields]) -> None: multi_fields = list(multi_fields) field_keys: Set[str] = set() for fields in multi_fields: keys = fields.keys() repeated_keys = field_keys & keys if repeated_keys: raise ValueError(f"Repeated field names '{repeated_keys}'") field_keys.update(keys) self._mapping: ChainMapType[str, PortexType] = ChainMap() self._mapping.maps = multi_fields # type: ignore[assignment] self._sequence = multi_fields def __len__(self) -> int: return sum(map(len, self._sequence)) def __getitem__(self, key: Union[int, str]) -> PortexType: if isinstance(key, int): i, j = self._get_indices(key) return self._sequence[i][j] return self._mapping[key] def __setitem__(self, key: Union[int, str], value: PortexType) -> None: if isinstance(key, int): i, key = self._get_indices(key) fields = self._sequence[i] else: try: fields = self._get_fields(key) except KeyError: fields = self._sequence[-1] fields[key] = value def __delitem__(self, key: Union[int, str]) -> None: if isinstance(key, int): i, key = self._get_indices(key) fields = self._sequence[i] else: fields = self._get_fields(key) del fields[key] def __iter__(self) -> Iterator[str]: yield from chain(*self._sequence) def _get_fields(self, key: str) -> UnionFields: for fields in self._sequence: if key in fields: return fields raise KeyError(key) def _get_indices(self, index: int) -> Tuple[int, int]: if index < 0: index = len(self) + index offset = index for i, length in enumerate(map(len, self._sequence)): if offset < length: return i, offset offset -= length raise IndexError("Index out of range")
[docs] def insert(self, index: int, name: str, portex_type: PortexType) -> None: """Insert the name and portex_type at the index. Arguments: index: The index to insert the field. name: The name of the field to be inserted. portex_type: The portex_type of the field to be inserted. Raises: ValueError: When the name already exists in the fields. TypeError: When trying to insert a field into FrozenFields. """ if name in self._mapping: raise ValueError(f"The '{name}' field already exists") i, j = self._get_indices(index) try: self._sequence[i].insert(j, name, portex_type) return except TypeError: pass if j == 0 and i > 0: fields = self._sequence[i - 1] try: fields[name] = portex_type return except TypeError: pass raise TypeError(f"Cannot insert at index {index} due to frozen fields") from None
[docs] def astype(self, name: str, portex_type: PortexType) -> None: """Convert the type of the field with the given name to the new PortexType. Arguments: name: The name of the field to convert. portex_type: The new PortexType of the field to convert to. """ self._get_fields(name).astype(name, portex_type)
[docs] def rename(self, old_name: str, new_name: str) -> None: """Rename the name of a field. Arguments: old_name: The current name of the field to be renamed. new_name: The new name of the field to assign. """ self._get_fields(old_name).rename(old_name, new_name)