#!/usr/bin/env python3
#
# Copyright 2022 Graviti. Licensed under MIT License.
#
"""The implementation of the Action and ActionManager."""
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any, Dict, Generator, List, Optional
from graviti.exception import ResourceNameError
from graviti.manager.common import LIMIT
from graviti.manager.lazy import LazyPagingList
from graviti.openapi import (
cancel_action_run,
create_action,
create_action_run,
delete_action,
get_action,
get_action_run,
list_action_runs,
list_actions,
update_action,
)
from graviti.utility import (
CachedProperty,
ReprMixin,
SortParam,
check_type,
convert_iso_to_datetime,
)
if TYPE_CHECKING:
from graviti.manager.dataset import Dataset
[docs]class Action(ReprMixin):
"""This class defines the structure of an action.
Arguments:
dataset: Class :class:`~graviti.dataset.dataset.Dataset` instance.
response: The response of the OpenAPI associated with the action::
{
"id": <str>,
"name": <str>,
"edition": <int>,
"state": <str>,
"payload": <str>,
},
Attributes:
name: The name of this action.
edition: The edition of this action.
state: The state of this action.
payload: The payload of this action.
"""
_repr_attrs = ("edition", "state")
def __init__(self, dataset: "Dataset", response: Dict[str, Any]) -> None:
self._dataset = dataset
self._id: str = response["id"]
self.name: str = response["name"]
self.edition: int = response["edition"]
self.state: str = response["state"]
self.payload: str = response["payload"]
def _repr_head(self) -> str:
return f'{self.__class__.__name__}("{self.name}")'
def _edit(
self,
*,
name: Optional[str] = None,
state: Optional[str] = None,
payload: Optional[str] = None,
) -> None:
_workspace = self._dataset.workspace
response = update_action(
_workspace.access_key,
_workspace.url,
_workspace.name,
self._dataset.name,
action=self.name,
name=name,
state=state,
payload=payload,
)
self.name = response["name"]
self.edition = response["edition"]
self.state = response["state"]
self.payload = response["payload"]
@property
[docs] def runs(self) -> "RunManager":
"""Get class :class:`~graviti.manager.action.RunManager` instance.
Returns:
Required :class:`~graviti.manager.action.RunManager` instance.
"""
return RunManager(self)
[docs] def edit(self, *, name: Optional[str] = None, payload: Optional[str] = None) -> None:
"""Update the action.
Arguments:
name: The new name of the action.
payload: The new paylaod of the action.
"""
self._edit(name=name, payload=payload)
[docs] def enable(self) -> None:
"""Enable the action."""
self._edit(state="ENABLED")
[docs] def disable(self) -> None:
"""Disable the action."""
self._edit(state="DISABLED")
[docs]class ActionManager:
"""This class defines the operations on the action in Graviti.
Arguments:
dataset: :class:`~graviti.manager.dataset.Dataset` instance.
"""
def __init__(self, dataset: "Dataset") -> None:
self._dataset = dataset
def _generate(
self, offset: int, limit: int, *, sort: SortParam
) -> Generator[Action, None, int]:
_dataset = self._dataset
_workspace = _dataset.workspace
response = list_actions(
_workspace.access_key,
_workspace.url,
_workspace.name,
_dataset.name,
sort=sort,
offset=offset,
limit=limit,
)
for item in response["actions"]:
yield Action(_dataset, item)
return response["total_count"] # type: ignore[no-any-return]
[docs] def create(self, name: str, payload: str) -> Action:
"""Create an action.
Arguments:
name: The name of the action.
payload: The payload of the action.
Returns:
The :class:`.Action` instance with the given name.
"""
_dataset = self._dataset
_workspace = _dataset.workspace
response = create_action(
_workspace.access_key,
_workspace.url,
_workspace.name,
_dataset.name,
name=name,
payload=payload,
)
return Action(_dataset, response)
[docs] def get(self, name: str) -> Action:
"""Get the action with the given name.
Arguments:
name: The required action name.
Raises:
ResourceNameError: When the given name is an empty string.
Returns:
The :class:`.Action` instance with the given name.
"""
check_type("name", name, str)
if not name:
raise ResourceNameError("action", name)
_dataset = self._dataset
_workspace = _dataset.workspace
response = get_action(
_workspace.access_key,
_workspace.url,
_workspace.name,
_dataset.name,
action=name,
)
return Action(_dataset, response)
[docs] def list(self, *, sort: SortParam = None) -> LazyPagingList[Action]:
"""List the information of actions.
Arguments:
sort: The column and the direction the list result sorted by.
Returns:
The LazyPagingList of :class:`actions<.Action>` instances.
"""
return LazyPagingList(
lambda offset, limit: self._generate(offset, limit, sort=sort),
LIMIT,
)
[docs] def delete(self, name: str) -> None:
"""Delete an action.
Arguments:
name: The name of the action to be deleted.
Raises:
ResourceNameError: When the given name is an empty string.
"""
check_type("name", name, str)
if not name:
raise ResourceNameError("action", name)
_workspace = self._dataset.workspace
delete_action(
_workspace.access_key,
_workspace.url,
_workspace.name,
self._dataset.name,
action=name,
)
[docs]class Run(ReprMixin): # pylint: disable=too-many-instance-attributes
"""This class defines the structure of an action run.
Arguments:
action: Class :class:`~graviti.manager.action.Action` instance.
response: The response of the OpenAPI associated with the action run::
{
"id": <str>,
"number": <int>,
"name": <str>,
"workflow_id": <str>,
"status": <int>,
"arguments": <dict>,
"started_at": <str>,
"ended_at": <str>,
},
Attributes:
number: The number of this action run.
name: The name of this action run.
status: The status of this action run.
arguments: The arguments of this action run.
started_at: The start time of this action run.
ended_at: The end time of this action run.
duration: The duration of this action run.
"""
_repr_attrs = ("status", "arguments", "started_at", "ended_at", "duration")
def __init__(self, action: Action, response: Dict[str, Any]) -> None:
self._action = action
self._workflow_id = response["workflow_id"]
self.number = response["number"]
self.name = response["name"]
self.status = response["status"]
self.arguments: Dict[str, Any] = response["arguments"]
self.started_at = convert_iso_to_datetime(response["started_at"])
ended_at = response["ended_at"]
if ended_at is not None:
self.ended_at: Optional[datetime] = convert_iso_to_datetime(ended_at)
self.duration: Optional[timedelta] = self.ended_at - self.started_at
else:
self.ended_at = None
self.duration = None
if "nodes" in response:
nodes: Any = [Node(self, item) for item in response["nodes"]]
self.nodes = nodes
def _repr_head(self) -> str:
return f'{self.__class__.__name__}("{self.name}")'
@CachedProperty
[docs] def nodes(self) -> List["Node"]: # pylint: disable=method-hidden
"""Get the nodes of the action run.
Returns:
The nodes of the action run.
"""
_action = self._action
_dataset = _action._dataset # pylint: disable=protected-access
_workspace = _dataset.workspace
response = get_action_run(
_workspace.access_key,
_workspace.url,
_workspace.name,
_dataset.name,
action=_action.name,
run_number=self.number,
)
return [Node(self, item) for item in response["nodes"]]
@property
[docs] def url(self) -> str:
"""Get the url of the action run.
Returns:
The url of the action run.
Raises:
ValueError: When the access key format is wrong.
"""
_action = self._action
_dataset = _action._dataset # pylint: disable=protected-access
_workspace = _dataset.workspace
if _workspace.access_key.startswith("Accesskey-"):
url = "https://gas.graviti.cn"
elif _workspace.access_key.startswith("ACCESSKEY-"):
url = "https://gas.graviti.com"
else:
raise ValueError("Wrong accesskey format!")
return (
f"{url}/dataset/{_workspace.name}/{_dataset.name}/actions/{_action.name}/"
f"{self._workflow_id[len(_action.name)+1 :]}?fullId={self._workflow_id}"
)
[docs] def cancel(self) -> None:
"""Cancel the action run."""
_action = self._action
_dataset = _action._dataset # pylint: disable=protected-access
_workspace = _dataset.workspace
response = cancel_action_run(
_workspace.access_key,
_workspace.url,
_workspace.name,
_dataset.name,
action=_action.name,
run_number=self.number,
)
self.status = response["status"]
self.ended_at = response["ended_at"]
nodes: Any = [Node(self, item) for item in response["nodes"]]
self.nodes = nodes
[docs]class RunManager:
"""This class defines the operations on the action run in Graviti.
Arguments:
action: :class:`~graviti.manager.action.Action` instance.
"""
def __init__(self, action: Action) -> None:
self._action = action
def _generate(self, offset: int, limit: int, *, sort: SortParam) -> Generator[Run, None, int]:
_action = self._action
_dataset = _action._dataset # pylint: disable=protected-access
_workspace = _dataset.workspace
response = list_action_runs(
_workspace.access_key,
_workspace.url,
_workspace.name,
_dataset.name,
action=_action.name,
sort=sort,
offset=offset,
limit=limit,
)
for item in response["runs"]:
yield Run(_action, item)
return response["total_count"] # type: ignore[no-any-return]
[docs] def create(self, arguments: Optional[Dict[str, Any]] = None) -> Run:
"""Run an action manually.
Arguments:
arguments: The arguments of the action run.
Returns:
The :class:`.Run` instance with the given arguments.
"""
_action = self._action
_dataset = _action._dataset # pylint: disable=protected-access
_workspace = _dataset.workspace
response = create_action_run(
_workspace.access_key,
_workspace.url,
_workspace.name,
_dataset.name,
action=_action.name,
arguments=arguments,
)
return Run(_action, response)
[docs] def get(self, run_number: int) -> Run:
"""Get the action with the given name.
Arguments:
run_number: The number of the action run.
Returns:
The :class:`.Action` instance with the given name.
"""
check_type("run_number", run_number, int)
_action = self._action
_dataset = _action._dataset # pylint: disable=protected-access
_workspace = _dataset.workspace
response = get_action_run(
_workspace.access_key,
_workspace.url,
_workspace.name,
_dataset.name,
action=_action.name,
run_number=run_number,
)
return Run(_action, response)
[docs] def list(self, *, sort: SortParam = None) -> LazyPagingList[Run]:
"""List the information of action runs.
Arguments:
sort: The column and the direction the list result sorted by.
Returns:
The LazyPagingList of :class:`runs<.Run>` instances.
"""
return LazyPagingList(
lambda offset, limit: self._generate(offset, limit, sort=sort),
LIMIT,
)
[docs]class Node(ReprMixin): # pylint: disable=too-many-instance-attributes
"""This class defines the structure of a node of action run.
Arguments:
run: Class :class:`~graviti.manager.action.Run` instance.
response: The response of the OpenAPI associated with the action run node::
{
"id": <str>,
"name": <str>,
"display_name": <str>,
"phase": <str>,
"started_at": <str>,
"ended_at": <str>,
"children": <list[str]>,
},
Attributes:
node_id: The id of this action run node.
name: The name of this action run node.
display_name: The display name of this action run node.
phase: The phase of this action run node.
started_at: The start time of this action run node.
ended_at: The end time of this action run node.
duration: The duration of this action run node.
children: The children of this action run node.
"""
_repr_attrs = ("node_id", "phase", "started_at", "ended_at", "duration")
def __init__(self, run: Run, response: Dict[str, Any]) -> None:
self._run = run
self.node_id: str = response["id"]
self.name: str = response["name"]
self.display_name: str = response["display_name"]
self.phase: str = response["phase"]
self.started_at = convert_iso_to_datetime(response["started_at"])
ended_at = response["ended_at"]
if ended_at is not None:
self.ended_at: Optional[datetime] = convert_iso_to_datetime(ended_at)
self.duration: Optional[timedelta] = self.ended_at - self.started_at
else:
self.ended_at = None
self.duration = None
self.children: List[str] = response["children"]
def _repr_head(self) -> str:
return f'{self.__class__.__name__}("{self.display_name}")'