Source code for graviti.dataframe.column.series

#!/usr/bin/env python3
#
# Copyright 2022 Graviti. Licensed under MIT License.
#

"""The implementation of the Graviti Series."""


from itertools import islice
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Dict,
    Iterable,
    Iterator,
    List,
    Optional,
    Tuple,
    Type,
    TypeVar,
    Union,
    overload,
)

import pyarrow as pa

import graviti.portex as pt
from graviti.dataframe.column.indexing import ColumnSeriesILocIndexer, ColumnSeriesLocIndexer
from graviti.dataframe.container import Container
from graviti.file import FileBase
from graviti.openapi import RECORD_KEY
from graviti.operation import UpdateData
from graviti.paging import (
    LazyFactoryBase,
    MappedPagingList,
    PagingList,
    PagingListBase,
    PyArrowPagingList,
)
from graviti.portex.enum import EnumValueType
from graviti.utility import MAX_REPR_ROWS, ModuleMocker

try:
    import pandas as pd
except ModuleNotFoundError:
[docs] pd = ModuleMocker("No module named 'pandas'")
if TYPE_CHECKING: import pandas from graviti.dataframe.frame import DataFrame from graviti.manager.permission import ObjectPermissionManager _S = TypeVar("_S", bound="Series") _P = TypeVar("_P", bound="PyarrowSeries") _A = TypeVar("_A", bound="ArraySeries") _F = TypeVar("_F", bound="FileSeries") _E = TypeVar("_E", bound="EnumSeries") _OPM = Optional["ObjectPermissionManager"]
[docs]class Series(Container): """One-dimensional array. Arguments: data: The data that needs to be stored in Series. Could be ndarray or Iterable. schema: Data type to force. If None, will be inferred from ``data``. Examples: Constructing Series from a list. >>> d = [1,2,3,4] >>> series = Series(data=d) >>> series 0 1 1 2 2 3 3 4 """ schema: pt.PortexType _data: PagingListBase[Any] def __new__( cls: Type[_S], data: Union[Iterable[Any], _S], schema: Optional[pt.PortexType] = None, ) -> Any: """One-dimensional data with schema. Arguments: data: The data that needs to be stored in column series. schema: The schema of the column series. If None, will be inferred from `data`. Raises: ValueError: When both given schema and Series data. ValueError: When using DataFrame as data to construct a Series. Returns: The created Series object. """ if isinstance(data, cls): if schema is not None: raise ValueError("Only support the schema when data is not Series") return data.copy() if data.__class__.__name__ == "DataFrame": raise ValueError("Do not support Constructing a Series through a DataFrame") if schema is None: array = pa.array(data) schema = pt.PortexType.from_pyarrow(array) elif issubclass(schema.container, FileSeries): return FileSeries._from_iterable(data, schema) else: array = cls._pylist_to_pyarrow(data, schema) return schema.container._from_pyarrow(array, schema) def __repr__(self) -> str: indices = list(self._get_repr_indices()) indice_width = len(str(max(indices, default=1))) body = [] body_item_width = 0 for i in indices: item = self.loc[i] name = item._repr_folding() if hasattr(item, "_repr_folding") else str(item) body.append(name) body_item_width = max(len(name), body_item_width) lines = [] for indice, value in zip(indices, body): lines.append(f"{indice:<{indice_width+2}}{value:<{body_item_width+2}}") if self.__len__() > MAX_REPR_ROWS: lines.append(f"...({self.__len__()})") return "\n".join(lines) def __len__(self) -> int: return self._data.__len__() @overload def __getitem__(self: _S, key: slice) -> _S: ... @overload def __getitem__(self, key: int) -> Any: ... # @overload # def __getitem__(self, key: Iterable[int]) -> "Series": # ... def __getitem__(self: _S, key: Union[int, slice]) -> Union[Any, _S]: if isinstance(key, int): return self._get_item_by_location(key) return self._get_slice_by_location(key, self.schema.copy()) @overload def __setitem__(self, key: slice, value: Union[Iterable[Any], _S]) -> None: ... @overload def __setitem__(self, key: int, value: Any) -> None: ... def __setitem__( self, key: Union[int, slice], value: Union[Iterable[Any], _S, Any], ) -> None: if isinstance(key, int): series = self._from_iterable([value], self.schema) key = slice(key, key + 1) elif not isinstance(value, self.__class__): series = self._from_pyarrow(self._pylist_to_pyarrow(value, self.schema), self.schema) elif self.schema.to_pyarrow().equals(value.schema.to_pyarrow()): series = value else: raise TypeError("The schema of the given Series is mismatched") self._set_slice(key, series) if self._root is not None and self._root.operations is not None: value = series.copy() root = self._root name = self._name df = root._create(pt.record({name[0]: series.schema.copy()}), None, name[1:]) df._columns = {name[0]: series} df[RECORD_KEY] = root._record_key[key] # type: ignore[index, assignment] root.operations.append(UpdateData(df)) # type: ignore[union-attr] def __delitem__(self, key: Union[int, slice]) -> None: if self._root is not None: raise TypeError( "'__delitem__' is not supported for the Series which is a member of a DataFrame" ) self._del_item_by_location(key) def __iter__(self) -> Iterator[Any]: return self._data.__iter__() @staticmethod def _pylist_to_pyarrow(values: Iterable[Any], schema: pt.PortexType) -> pa.StructArray: if isinstance(values, Iterator): values = list(values) processor, need_process = Series._process_array(values, schema) if not need_process: return pa.array(values, schema.to_pyarrow(_to_backend=True)) return pa.array(processor(values), schema.to_pyarrow(_to_backend=True)) @staticmethod def _process_array( values: Iterable[Any], schema: pt.PortexType ) -> Tuple[Callable[[Any], Any], Optional[bool]]: if not values: return lambda x: x, None for value in values: if value is None: continue processor, need_process = Series._get_process(value, schema) if need_process is None: continue return lambda x: list(map(processor, x)), need_process return lambda x: x, False @staticmethod def _get_process( value: Any, schema: pt.PortexType ) -> Tuple[Callable[[Any], Any], Optional[bool]]: container = schema.container if value.__class__.__name__ == "DataFrame": return lambda x: x.to_pylist(_to_backend=True), True if container == ArraySeries: return Series._process_array( value, schema.to_builtin().items # type: ignore[attr-defined] ) if container == FileSeries: return Series._process_file(value) if container == EnumSeries: value_to_index = schema.to_builtin().values.value_to_index # type: ignore[attr-defined] return lambda x: value_to_index[x], True return lambda x: x, False @staticmethod def _process_file(values: Iterable[Any]) -> Tuple[Callable[[Any], Any], bool]: if isinstance(values, FileBase): return lambda x: x.to_pyobj(), True return lambda x: x, False @classmethod def _from_factory( cls: Type[_S], factory: LazyFactoryBase, schema: pt.PortexType, root: Optional["DataFrame"] = None, name: Tuple[str, ...] = (), *, object_permission_manager: _OPM = None, ) -> _S: obj = cls._create(schema, root, name) obj._refresh_data_from_factory(factory, object_permission_manager) return obj @classmethod def _from_iterable( cls: Type[_S], array: Iterable[Any], schema: pt.PortexType, ) -> _S: return cls(array, schema) def _repr_folding(self) -> str: return f"{self.__class__.__name__}({len(self)})" def _get_repr_indices(self) -> Iterable[int]: return islice(range(len(self)), MAX_REPR_ROWS) def _get_item_by_location(self, key: int) -> Any: return self._data.get_item(key) def _get_slice_by_location( self: _S, key: slice, schema: pt.PortexType, root: Optional["DataFrame"] = None, name: Tuple[str, ...] = (), ) -> _S: obj = self._create(schema, root, name) obj._data = self._data.get_slice(key) # pylint: disable=protected-access return obj def _set_slice(self: _S, key: slice, value: _S) -> None: self._data.set_slice(key, value._data) # pylint: disable=protected-access def _del_item_by_location(self, key: Union[int, slice]) -> None: del self._data[key] def _refresh_data_from_factory(self, factory: LazyFactoryBase, _: _OPM) -> None: self._data = factory.create_pyarrow_list() def _extend(self: _S, values: _S) -> None: """Extend Series to itself row by row. Arguments: values: A series that needs to be extended. Examples: >>> s1 = Series.from_pyarrow(pa.array([1,2,3])) >>> s1 0 1 1 2 2 3 >>> s2 = Series.from_pyarrow(pa.array([4,5,6])) >>> s2 0 4 1 5 2 6 >>> s1.extend(s2) >>> s1 0 1 1 2 2 3 3 1 4 2 5 3 """ self._data.extend(values._data) # pylint: disable=protected-access def _copy( self: _S, schema: pt.PortexType, root: Optional["DataFrame"] = None, name: Tuple[str, ...] = (), ) -> _S: obj = self._create(schema, root, name) obj._data = self._data.copy() # pylint: disable=protected-access return obj def _to_pandas_series(self) -> "pandas.Series": return self.to_pandas() @classmethod
[docs] def from_pyarrow(cls, array: pa.Array) -> "Series": """Instantiate a Series backed by an pyarrow array. Arguments: array: The input pyarrow array. Raises: TypeError: When the input pyarrow type is not supported. Returns: The loaded :class:`~graviti.dataframe.column.Series` instance. """ pyarrow_type = array.type # pylint: disable=protected-access if pa.types.is_dictionary(pyarrow_type): schema: pt.PortexType = pt.enum(array.dictionary.to_pylist()) return EnumSeries._from_pyarrow(array.indices, schema) schema = pt.PortexType.from_pyarrow(array) container = schema.container if not issubclass(container, Series): raise TypeError( f"pyarrow type '{pyarrow_type}' is not supported converting to a graviti Series" ) return container._from_pyarrow(array, schema)
@classmethod
[docs] def from_pandas(cls, series: "pandas.Series") -> "Series": """Convert a pandas Series to a graviti Series. Arguments: series: The input pandas Series. Returns: The converted graviti Series. """ array = pa.Array.from_pandas(series) return cls.from_pyarrow(array)
@property
[docs] def iloc(self) -> ColumnSeriesILocIndexer: """Purely integer-location based indexing for selection by position. Allowed inputs are: - An integer, e.g. ``5``. - A list or array of integers, e.g. ``[4, 3, 0]``. - A slice object with ints, e.g. ``1:7``. - A boolean array of the same length as the axis being sliced. Returns: The instance of the ILocIndexer. Examples: >>> series = Series([1, 2, 3]) >>> series.loc[0] 1 >>> df.loc[[0]] 0 1 dtype: int64 """ return ColumnSeriesILocIndexer(self)
@property
[docs] def loc(self) -> ColumnSeriesLocIndexer: """Access a group of rows and columns by indexes or a boolean array. Allowed inputs are: - A single index, e.g. ``5``. - A list or array of indexes, e.g. ``[4, 3, 0]``. - A slice object with indexes, e.g. ``1:7``. - A boolean array of the same length as the axis being sliced. Returns: The instance of the LocIndexer. Examples: >>> series = Series([1, 2, 3]) >>> series.loc[0] 1 >>> df.loc[[0]] 0 1 dtype: int64 """ return ColumnSeriesLocIndexer(self)
[docs] def to_pylist(self, *, _to_backend: bool = False) -> List[Any]: """Convert the container to a python list. Raises: NotImplementedError: The method of the base class should not be called. """ raise NotImplementedError
[docs] def to_pandas(self) -> "pandas.Series": """Convert the graviti Container to a pandas Series or DataFrame. Raises: NotImplementedError: The method of the base class should not be called. """ raise NotImplementedError
[docs]class PyarrowSeries(Series): """Pyarrow based one-dimensional array.""" _data: PyArrowPagingList[Any] @classmethod def _from_pyarrow( cls: Type[_P], array: pa.Array, schema: pt.PortexType, root: Optional["DataFrame"] = None, name: Tuple[str, ...] = (), *, object_permission_manager: _OPM = None, ) -> _P: obj = cls._create(schema, root, name) obj._data = PyArrowPagingList.from_pyarrow(array) return obj def __iter__(self) -> Iterator[Any]: return (i.as_py() for i in self._data) def _get_item_by_location(self, key: int) -> Any: return self._data.get_item(key).as_py()
[docs] def to_pylist(self, *, _to_backend: bool = False) -> List[Any]: """Convert the Series to a python list. Returns: The python list representing the Series. """ return self._data.to_pyarrow().to_pylist() # type: ignore[no-any-return]
[docs] def to_pandas(self) -> "pandas.Series": """Convert the graviti Series to a pandas Series. Returns: The converted pandas Series. """ return self._data.to_pyarrow().to_pandas()
@pt.ContainerRegister( pt.boolean, pt.float32, pt.float64, pt.int32, pt.int64, )
[docs]class NumberSeries(PyarrowSeries): """One-dimensional array for portex builtin number type."""
@pt.ContainerRegister(pt.string)
[docs]class StringSeries(PyarrowSeries): """One-dimensional array for portex builtin string type."""
@pt.ContainerRegister(pt.binary)
[docs]class BinarySeries(PyarrowSeries): """One-dimensional array for portex builtin binary type."""
@pt.ContainerRegister(pt.array)
[docs]class ArraySeries(Series): """One-dimensional array for portex builtin type array.""" _data: MappedPagingList[Any] _item_schema: pt.PortexType _object_permission_manager: _OPM = None @classmethod def _from_pyarrow( cls: Type[_A], array: pa.ListArray, schema: pt.PortexType, root: Optional["DataFrame"] = None, name: Tuple[str, ...] = (), *, object_permission_manager: _OPM = None, ) -> _A: builtin_schema: pt.array = schema.to_builtin() # type: ignore[assignment] _item_schema = builtin_schema.items _item_creator = _item_schema.container._from_pyarrow # pylint: disable=protected-access obj = cls._create(schema, root, name) obj._data = MappedPagingList.from_array( array, lambda scalar: _item_creator( scalar.values, _item_schema, object_permission_manager=object_permission_manager ), ) obj._item_schema = _item_schema obj._object_permission_manager = object_permission_manager return obj @classmethod def _from_iterable( cls: Type[_A], array: Iterable[Any], schema: pt.PortexType, ) -> _A: obj: _A = object.__new__(cls) items_schema = schema.to_builtin().items # type: ignore[attr-defined] items_container = items_schema.container array = ( items_container._from_iterable(value, items_schema) # pylint: disable=protected-access for value in array ) obj._data = MappedPagingList(array) obj.schema = schema obj._item_schema = items_schema return obj def _get_slice_by_location( self: _A, key: slice, schema: pt.PortexType, root: Optional["DataFrame"] = None, name: Tuple[str, ...] = (), ) -> _A: obj = super()._get_slice_by_location(key, schema, root, name) # pylint: disable=protected-access obj._object_permission_manager = self._object_permission_manager return obj._copy(schema, root, name) def _refresh_data_from_factory( self, factory: LazyFactoryBase, object_permission_manager: _OPM ) -> None: builtin_schema: pt.array = self.schema.to_builtin() # type: ignore[assignment] _item_schema = builtin_schema.items _item_creator = _item_schema.container._from_pyarrow # pylint: disable=protected-access self._data = factory.create_mapped_list( lambda scalar: _item_creator( scalar.values, _item_schema, object_permission_manager=object_permission_manager ) ) self._item_schema = _item_schema self._object_permission_manager = object_permission_manager def _extract_paging_list(self: _A, values: _A) -> MappedPagingList[Any]: # pylint: disable=protected-access _item_schema = self._item_schema if values._item_schema is _item_schema and values is not self: return values._data _item_creator = _item_schema.container._from_pyarrow return values._data.copy( lambda df: df._copy(_item_schema), lambda scalar: _item_creator( scalar.values, _item_schema, object_permission_manager=values._object_permission_manager, ), ) def _set_item_by_slice(self: _A, key: slice, value: _A) -> None: self._data.set_slice(key, self._extract_paging_list(value)) def _extend(self: _A, values: _A) -> None: self._data.extend(self._extract_paging_list(values)) def _copy( self: _A, schema: pt.PortexType, root: Optional["DataFrame"] = None, name: Tuple[str, ...] = (), ) -> _A: obj = self._create(schema, root, name) builtin_schema: pt.array = schema.to_builtin() # type: ignore[assignment] _item_schema = builtin_schema.items _item_creator = _item_schema.container._from_pyarrow # pylint: disable=protected-access _object_permission_manager = self._object_permission_manager # pylint: disable=protected-access obj._item_schema = _item_schema obj._data = self._data.copy( lambda df: df._copy(_item_schema), lambda scalar: _item_creator( scalar.values, _item_schema, object_permission_manager=_object_permission_manager ), ) obj._object_permission_manager = _object_permission_manager return obj
[docs] def to_pylist(self, *, _to_backend: bool = False) -> List[Any]: """Convert the Series to a python list. Returns: The python list representing the Series. """ return [item.to_pylist(_to_backend=_to_backend) for item in self._data]
[docs] def to_pandas(self) -> "pandas.Series": """Convert the graviti Series to a pandas Series. Returns: The converted pandas Series. """ return pd.Series(item.to_pylist() for item in self._data)
@pt.ExternalContainerRegister( pt.STANDARD_URL, "main", "file.File", "file.Audio", "file.Image", "file.PointCloud", "file.PointCloudBin", "label.Mask", )
[docs]class FileSeries(Series): """One-dimensional array for file.""" _data: PagingList[Any] @classmethod def _from_iterable( cls: Type[_F], array: Iterable[FileBase], schema: pt.PortexType, ) -> _F: obj: _F = object.__new__(cls) obj._data = PagingList(array) obj.schema = schema return obj @classmethod def _from_pyarrow( cls: Type[_F], array: pa.StructArray, schema: pt.PortexType, root: Optional["DataFrame"] = None, name: Tuple[str, ...] = (), *, object_permission_manager: _OPM = None, ) -> _F: if object_permission_manager is None: raise ValueError( "The object permission manager is needed to create FileSeries from pyarrow" ) obj = cls._create(schema, root, name) file_type: FileBase = schema.element # type: ignore[assignment] # pylint: disable=protected-access obj._data = PagingList( file_type._from_pyarrow(item, object_permission_manager) for item in array ) return obj def _refresh_data_from_factory( self, factory: LazyFactoryBase, object_permission_manager: _OPM ) -> None: file_type = self.schema.element self._data = factory.create_list( lambda scalar: file_type( **scalar.as_py(), object_permission_manager=object_permission_manager ) )
[docs] def to_pylist(self, *, _to_backend: bool = False) -> List[Any]: """Convert the BinaryFileSeries to python list. Returns: The python list. """ if _to_backend: return [file._to_post_data() for file in self._data] # pylint: disable=protected-access return list(self._data)
[docs] def to_pandas(self) -> "pandas.Series": """Convert the graviti Series to a pandas Series. Returns: The converted pandas Series. """ return pd.Series(self._data)
@pt.ContainerRegister(pt.enum)
[docs]class EnumSeries(PyarrowSeries): """One-dimensional array for portex builtin type enum.""" _index_to_value: Dict[Optional[int], EnumValueType] def _get_item_by_location(self, key: int) -> Any: return self._index_to_value[self._data[key].as_py()] def _get_slice_by_location( self: _E, key: slice, schema: pt.PortexType, root: Optional["DataFrame"] = None, name: Tuple[str, ...] = (), ) -> _E: obj = super()._get_slice_by_location(key, schema, root, name) enum_values = self.schema.to_builtin().values # type: ignore[attr-defined] obj._index_to_value = enum_values.index_to_value # pylint: disable=protected-access return obj def _copy( self, schema: pt.PortexType, root: Optional["DataFrame"] = None, name: Tuple[str, ...] = (), ) -> "EnumSeries": obj = super()._copy(schema, root, name) enum_values = self.schema.to_builtin().values # type: ignore[attr-defined] obj._index_to_value = enum_values.index_to_value # pylint: disable=protected-access return obj @classmethod def _from_pyarrow( cls: Type[_E], array: Union[pa.IntegerArray, pa.DictionaryArray], schema: pt.PortexType, root: Optional["DataFrame"] = None, name: Tuple[str, ...] = (), *, object_permission_manager: _OPM = None, ) -> _E: obj = cls._create(schema, root, name) if isinstance(array, pa.DictionaryArray): array = array.indices obj._data = PyArrowPagingList.from_pyarrow(array) enum_values = schema.to_builtin().values # type: ignore[attr-defined] obj._index_to_value = enum_values.index_to_value return obj def _refresh_data_from_factory(self, factory: LazyFactoryBase, _: _OPM) -> None: self._data = factory.create_pyarrow_list() enum_values = self.schema.to_builtin().values # type: ignore[attr-defined] self._index_to_value = enum_values.index_to_value
[docs] def to_pylist(self, *, _to_backend: bool = False) -> List[Any]: """Convert the Series to a python list. Returns: The python list representing the Series. """ if _to_backend: return super().to_pylist() _index_to_value = self._index_to_value return [_index_to_value[i.as_py()] for i in self._data]
[docs] def to_pandas(self) -> "pandas.Series": """Convert the graviti EnumSeries to a pandas Categorical Series. Returns: The converted pandas Categorical Series. """ enum_values = self.schema.to_builtin().values # type: ignore[attr-defined] dictionary = enum_values.to_pyarrow() array = self._data.to_pyarrow().combine_chunks() dictionary_array = pa.DictionaryArray.from_arrays(array, dictionary) return dictionary_array.to_pandas()
@pt.ContainerRegister(pt.date, pt.time, pt.timestamp, pt.timedelta)
[docs]class TimeSeries(PyarrowSeries): """One-dimensional array for portex builtin temporal type."""
[docs] def to_pylist(self, *, _to_backend: bool = False) -> List[Any]: """Convert the Series to a python list. Returns: The python list representing the Series. """ if _to_backend: array = self._data.to_pyarrow() bit_width = array.type.bit_width target_type = pa.int32() if bit_width == 32 else pa.int64() return array.cast(target_type).to_pylist() # type: ignore[no-any-return] return super().to_pylist()