Source code for graviti.manager.search

#!/usr/bin/env python3
#
# Copyright 2022 Graviti. Licensed under MIT License.
#

"""The implementation of the SearchHistory and SearchManager."""

from typing import TYPE_CHECKING, Any, Dict, Generator, Optional

import graviti.portex as pt
from graviti.dataframe import DataFrame
from graviti.dataframe.sql.operator import get_type, infer_type
from graviti.exception import CriteriaError
from graviti.manager.commit import Commit
from graviti.manager.common import LIMIT
from graviti.manager.lazy import LazyPagingList
from graviti.openapi import (
    create_search_history,
    delete_search_history,
    get_commit_sheet,
    get_draft_sheet,
    get_search_history,
    get_search_record_count,
    list_search_histories,
    list_search_records,
)
from graviti.paging import LazyLowerCaseFactory
from graviti.utility import (
    CachedProperty,
    ReprMixin,
    SortParam,
    check_type,
    convert_iso_to_datetime,
)

if TYPE_CHECKING:
    from graviti.manager.dataset import Dataset


[docs]class SearchHistory(ReprMixin): # pylint: disable=too-many-instance-attributes """This class defines the structure of the search history of Graviti Data Platform. Arguments: dataset: Class :class:`~graviti.dataset.dataset.Dataset` instance. response: The response of the OpenAPI associated with the search history:: { "id": <str> "commit_id": <str>, "draft_number": <int>, "sheet": <str>, "criteria": <dict>, "record_count": <int>, "creator": <str>, "created_at": <str>, }, Attributes: search_id: The id of this search history. commit: The commit of this search history. draft_number: The draft number of this search history. sheet: The sheet name of this search history. criteria: The criteria of this search history. creator: The creator of this search history. created_at: The create time of this search history. """ _repr_attrs = ( "commit", "draft_number", "sheet", "criteria", "record_count", "creator", "created_at", ) def __init__(self, dataset: "Dataset", response: Dict[str, Any]) -> None: self._dataset = dataset self.search_id: str = response["id"] if "commit_id" in response: self.commit = Commit(dataset, response["commit_id"]) else: self.draft_number: int = response["draft_number"] self.sheet: str = response["sheet"] self.criteria: Dict[str, Any] = response["criteria"] record_count = response["record_count"] if record_count is not None: self.record_count = record_count self.creator: str = response["creator"] self.created_at = convert_iso_to_datetime(response["created_at"]) def _repr_head(self) -> str: return f'{self.__class__.__name__}("{self.search_id}")' def _get_sheet_schema(self) -> pt.record: _dataset = self._dataset _workspace = _dataset.workspace if hasattr(self, "commit"): return pt.record.from_yaml( get_commit_sheet( _workspace.access_key, _workspace.url, _workspace.name, _dataset.name, commit_id=self.commit.commit_id, # type: ignore[arg-type] sheet=self.sheet, )["schema"] ) return pt.record.from_yaml( get_draft_sheet( _workspace.access_key, _workspace.url, _workspace.name, _dataset.name, draft_number=self.draft_number, sheet=self.sheet, )["schema"] ) def _infer_schema(self, sheet_schema: pt.record) -> pt.record: select = self.criteria.get("select") if not select: return sheet_schema schema = pt.record([]) for item in select: if isinstance(item, str) and item.startswith("$."): column = item[2:] pttype = get_type(sheet_schema, item) elif isinstance(item, dict): column, expr = item.copy().popitem() pttype = infer_type(sheet_schema, expr) else: raise CriteriaError(f"Invalid column '{item}' in 'select'") names = column.split(".") target = schema for name in names[:-1]: target = target.setdefault(name, pt.record([])) # type: ignore[assignment] if not isinstance(target, pt.PortexRecordBase): raise CriteriaError(f"Expression '{item}' conflicts with other columns") target[names[-1]] = pttype return schema @CachedProperty
[docs] def record_count(self) -> int: # pylint: disable=method-hidden """Get the record count of the search. Returns: The record count of the search. """ _dataset = self._dataset _workspace = _dataset.workspace return get_search_record_count( _workspace.access_key, _workspace.url, _workspace.name, _dataset.name, search_id=self.search_id, )
@CachedProperty
[docs] def schema(self) -> pt.record: """Get the schema of the search result. Returns: The schema of the search result. """ sheet_schema = self._get_sheet_schema() return self._infer_schema(sheet_schema)
[docs] def run(self) -> DataFrame: """Run the search and get the result DataFrame. Returns: The search result DataFrame. """ _dataset = self._dataset _workspace = _dataset.workspace schema = self.schema factory = LazyLowerCaseFactory( self.record_count, LIMIT, lambda offset, limit: list_search_records( _workspace.access_key, _workspace.url, _workspace.name, _dataset.name, search_id=self.search_id, offset=offset, limit=limit, )["records"], schema.to_pyarrow(_to_backend=True), # pylint: disable=no-member ) df = DataFrame._from_factory( # pylint: disable=protected-access factory, schema, object_permission_manager=_dataset.object_permission_manager ) df.search_history = self return df
[docs]class SearchManager: """This class defines the operations on the searches on Graviti. Arguments: access_key: User's access key. url: The URL of the graviti website. """ def __init__(self, dataset: "Dataset") -> None: self._dataset = dataset def _generate( self, offset: int, limit: int, *, commit_id: Optional[str], draft_number: Optional[int], sheet: Optional[str], sort: SortParam, ) -> Generator[SearchHistory, None, int]: _dataset = self._dataset _workspace = _dataset.workspace response = list_search_histories( _workspace.access_key, _workspace.url, _workspace.name, _dataset.name, commit_id=commit_id, draft_number=draft_number, sheet=sheet, sort=sort, limit=limit, offset=offset, ) for item in response["searches"]: yield SearchHistory(_dataset, item) return response["total_count"] # type: ignore[no-any-return] def _create( self, commit_id: Optional[str], draft_number: Optional[int], sheet: str, criteria: Dict[str, Any], ) -> SearchHistory: _dataset = self._dataset _workspace = _dataset.workspace response = create_search_history( _workspace.access_key, _workspace.url, _workspace.name, _dataset.name, commit_id=commit_id, draft_number=draft_number, sheet=sheet, criteria=criteria, ) return SearchHistory(_dataset, response)
[docs] def get(self, search_id: str) -> SearchHistory: """Get a Graviti search history with given search id. Arguments: search_id: The id of the search history. Returns: The requested :class:`~graviti.manager.search.SearchManager` instance. """ check_type("search_id", search_id, str) _dataset = self._dataset _workspace = _dataset.workspace response = get_search_history( _workspace.access_key, _workspace.url, _workspace.name, _dataset.name, search_id=search_id, ) return SearchHistory(_dataset, response)
[docs] def list( self, *, commit_id: Optional[str] = None, draft_number: Optional[int] = None, sheet: Optional[str] = None, sort: SortParam = None, ) -> LazyPagingList[SearchHistory]: """List Graviti search histories. Arguments: commit_id: The commit id. draft_number: The draft number. sheet: The name of the sheet. sort: The column and the direction the list result sorted by. Returns: The LazyPagingList of :class:`~graviti.manager.search.SearchHistory` instances. """ return LazyPagingList( lambda offset, limit: self._generate( offset, limit, commit_id=commit_id, draft_number=draft_number, sheet=sheet, sort=sort, ), LIMIT, )
[docs] def delete(self, search_id: str) -> None: """Delete a Graviti search history with given search id. Arguments: search_id: The id of the search history. """ check_type("search_id", search_id, str) _workspace = self._dataset.workspace delete_search_history( _workspace.access_key, _workspace.url, _workspace.name, self._dataset.name, search_id=search_id, )