#!/usr/bin/env python3
#
# Copyright 2022 Graviti. Licensed under MIT License.
#
"""Definitions of different operations on a DataFrame."""
from typing import TYPE_CHECKING, Dict, Iterable, List, Tuple
from tqdm import tqdm
from graviti.exception import ObjectCopyError
from graviti.file import File, FileBase, RemoteFile
from graviti.openapi import (
RECORD_KEY,
add_data,
copy_objects,
delete_data,
update_data,
update_schema,
)
from graviti.operation.common import get_schema
from graviti.portex import record
from graviti.utility import chunked, submit_multithread_tasks
if TYPE_CHECKING:
from graviti.dataframe import DataFrame
from graviti.dataframe.column.series import NumberSeries
from graviti.manager import Dataset, ObjectPermissionManager
_MAX_BATCH_SIZE = 2048
_MAX_ITEMS = 60000
[docs]class DataFrameOperation:
"""This class defines the basic method of the operation on a DataFrame."""
[docs] def get_file_count(self) -> int:
"""Get the file amount to be uploaded.
Returns:
The file amount to be uploaded.
"""
return 0
[docs] def get_data_count(self) -> int:
"""Get the data amount to be uploaded.
Returns:
The data amount to be uploaded.
"""
return 0
[docs] def execute(
self,
dataset: "Dataset",
*,
draft_number: int,
sheet: str,
jobs: int,
data_pbar: tqdm,
file_pbar: tqdm,
) -> None:
"""Execute the OpenAPI create sheet.
Arguments:
dataset: The Dataset instance.
draft_number: The draft number.
sheet: The sheet name.
jobs: The number of the max workers in multi-thread operation.
data_pbar: The process bar for uploading structured data.
file_pbar: The process bar for uploading binary files.
Raises:
NotImplementedError: The method of the base class should not be called.
"""
raise NotImplementedError
[docs]class DataOperation(DataFrameOperation): # pylint: disable=abstract-method
"""This class defines the basic method of the data operation on a DataFrame."""
def __init__(self, data: "DataFrame") -> None:
self._data = data
def _get_max_batch_size(self) -> int:
return min(
_MAX_BATCH_SIZE,
_MAX_ITEMS // self._data.schema._get_column_count(), # pylint: disable=protected-access
)
[docs] def get_file_count(self) -> int:
"""Get the file amount to be uploaded.
Returns:
The file amount to be uploaded.
"""
return sum(map(len, self._data._generate_file_series())) # pylint: disable=protected-access
[docs] def get_data_count(self) -> int:
"""Get the data amount to be uploaded.
Returns:
The data amount to be uploaded.
"""
return len(self._data)
[docs]class AddData(DataOperation):
"""This class defines the operation that add data to a DataFrame.
Arguments:
data: The data to be added.
"""
[docs] def execute(
self,
dataset: "Dataset",
*,
draft_number: int,
sheet: str,
jobs: int,
data_pbar: tqdm,
file_pbar: tqdm,
) -> None:
"""Execute the OpenAPI add data.
Arguments:
dataset: The Dataset instance.
draft_number: The draft number.
sheet: The sheet name.
jobs: The number of the max workers in multi-thread operation.
data_pbar: The process bar for uploading structured data.
file_pbar: The process bar for uploading binary files.
"""
batch_size = self._get_max_batch_size()
df = self._data
for i in range(0, len(df), batch_size):
batch = df.iloc[i : i + batch_size]
# pylint: disable=protected-access
local_files, remote_files = _separate_files(batch._generate_file(), dataset)
if remote_files:
_copy_files(dataset, remote_files, file_pbar)
if local_files:
_upload_files(local_files, dataset.object_permission_manager, file_pbar, jobs)
_workspace = dataset.workspace
add_data(
_workspace.access_key,
_workspace.url,
_workspace.name,
dataset.name,
draft_number=draft_number,
sheet=sheet,
data=batch.to_pylist(_to_backend=True),
)
data_pbar.update(len(batch))
[docs]class UpdateSchema(DataFrameOperation):
"""This class defines the operation that update the schema of a DataFrame.
Arguments:
schema: New portex schema after updated.
"""
def __init__(self, schema: record, data: "DataFrame") -> None:
self._data = data
self.schema = schema
[docs] def execute(
self,
dataset: "Dataset",
*,
draft_number: int,
sheet: str,
jobs: int,
data_pbar: tqdm,
file_pbar: tqdm,
) -> None:
"""Execute the OpenAPI update schema.
Arguments:
dataset: The Dataset instance.
draft_number: The draft number.
sheet: The sheet name.
jobs: The number of the max workers in multi-thread operation.
data_pbar: The process bar for uploading structured data.
file_pbar: The process bar for uploading binary files.
"""
portex_schema, avro_schema, arrow_schema = get_schema(self.schema)
# Request data between "Update Schema" and "Update Data" will make Graviti backend report
# error. So here to move the "request data" before "UpdateSchema" to bypass this issue.
record_keys: "NumberSeries" = self._data[RECORD_KEY] # type: ignore[assignment]
for page in record_keys._data._pages: # pylint: disable=protected-access
page.get_array()
_workspace = dataset.workspace
update_schema(
_workspace.access_key,
_workspace.url,
_workspace.name,
dataset.name,
draft_number=draft_number,
sheet=sheet,
_schema=portex_schema,
_avro_schema=avro_schema,
_arrow_schema=arrow_schema,
)
[docs]class UpdateData(DataOperation):
"""This class defines the operation that updates the data of a DataFrame.
Arguments:
data: The data for updating.
"""
[docs] def execute(
self,
dataset: "Dataset",
*,
draft_number: int,
sheet: str,
jobs: int,
data_pbar: tqdm,
file_pbar: tqdm,
) -> None:
"""Execute the OpenAPI add data.
Arguments:
dataset: The Dataset instance.
draft_number: The draft number.
sheet: The sheet name.
jobs: The number of the max workers in multi-thread operation.
data_pbar: The process bar for uploading structured data.
file_pbar: The process bar for uploading binary files.
"""
batch_size = self._get_max_batch_size()
df = self._data
for i in range(0, len(df), batch_size):
batch = df.iloc[i : i + batch_size]
# pylint: disable=protected-access
local_files, remote_files = _separate_files(batch._generate_file(), dataset)
if remote_files:
_copy_files(dataset, remote_files, file_pbar)
if local_files:
_upload_files(local_files, dataset.object_permission_manager, file_pbar, jobs)
_workspace = dataset.workspace
update_data(
_workspace.access_key,
_workspace.url,
_workspace.name,
dataset.name,
draft_number=draft_number,
sheet=sheet,
data=batch.to_pylist(_to_backend=True),
)
data_pbar.update(len(batch))
[docs]class DeleteData(DataFrameOperation):
"""This class defines the operation that delete the data of a DataFrame.
Arguments:
record_keys: The record keys of the data to be deleted.
"""
def __init__(self, record_keys: List[str]) -> None:
self.record_keys = record_keys
[docs] def execute(
self,
dataset: "Dataset",
*,
draft_number: int,
sheet: str,
jobs: int,
data_pbar: tqdm,
file_pbar: tqdm,
) -> None:
"""Execute the OpenAPI delete data.
Arguments:
dataset: The Dataset instance.
draft_number: The draft number.
sheet: The sheet name.
jobs: The number of the max workers in multi-thread operation.
data_pbar: The process bar for uploading structured data.
file_pbar: The process bar for uploading binary files.
"""
_workspace = dataset.workspace
delete_data(
_workspace.access_key,
_workspace.url,
_workspace.name,
dataset.name,
draft_number=draft_number,
sheet=sheet,
record_keys=self.record_keys,
)
def _copy_files(
dataset: "Dataset",
remote_files: Dict[str, List[RemoteFile]],
pbar: tqdm,
) -> None:
_workspace = dataset.workspace
for source_dataset, files in remote_files.items():
for batch in chunked(files, _MAX_BATCH_SIZE):
keys = copy_objects(
_workspace.access_key,
_workspace.url,
_workspace.name,
dataset.name,
source_dataset=source_dataset,
keys=[file.key for file in batch],
)["keys"]
for file, key in zip(batch, keys):
file._post_key = key # pylint: disable=protected-access
pbar.update(len(batch))
def _upload_files(
files: Iterable[File],
object_permission_manager: "ObjectPermissionManager",
pbar: tqdm,
jobs: int = 8,
) -> None:
submit_multithread_tasks(
lambda file: _upload_file(file, object_permission_manager, pbar),
files,
jobs=jobs,
)
def _upload_file(
file: File,
object_permission_manager: "ObjectPermissionManager",
pbar: tqdm,
) -> None:
post_key = f"{object_permission_manager.prefix}{file.get_checksum()}"
object_permission_manager.put_object(post_key, file.path)
file._post_key = post_key # pylint: disable=protected-access
pbar.update()
def _separate_files(
files: Iterable[FileBase], target_dataset: "Dataset"
) -> Tuple[List[File], Dict[str, List[RemoteFile]]]:
local_files: List[File] = []
remote_files: Dict[str, List[RemoteFile]] = {}
for file in files:
# pylint: disable=protected-access
if isinstance(file, File):
local_files.append(file)
elif isinstance(file, RemoteFile):
source_dataset = file._object_permission._dataset
if source_dataset._id == target_dataset._id:
file._post_key = file._key
continue
try:
remote_files[source_dataset.name].append(file)
except KeyError:
if source_dataset.workspace._id != target_dataset.workspace._id:
raise ObjectCopyError(
"It is not allowed to copy object between diffenent workspaces.\n"
" Source:\n"
f' workspace: "{source_dataset.workspace.name}"\n'
f' dataset: "{source_dataset.name}"\n'
f' object key: "{file.key}"\n'
" Target:\n"
f' workspace: "{target_dataset.workspace.name}"\n'
f' dataset: "{target_dataset.name}"\n'
) from None
if source_dataset.storage_config.name != target_dataset.storage_config.name:
raise ObjectCopyError(
"It is not allowed to copy object between datasets "
"with different storage configs.\n"
" Source:\n"
f' workspace: "{source_dataset.workspace.name}"\n'
f' dataset: "{source_dataset.name}"\n'
f' storage config: "{source_dataset.storage_config.name}"\n'
f' object key: "{file.key}"\n'
" Target:\n"
f' workspace: "{target_dataset.workspace.name}"\n'
f' dataset: "{target_dataset.name}"\n'
f' storage config: "{target_dataset.storage_config.name}"\n'
) from None
remote_files[source_dataset.name] = [file]
else:
raise TypeError("The file instance is neither 'File' nor 'RemoteFile'")
return local_files, remote_files