#!/usr/bin/env python3
#
# Copyright 2022 Graviti. Licensed under MIT License.
#
"""Common tools."""
from collections import defaultdict
from datetime import datetime, timezone
from functools import wraps
from sys import version_info
from threading import Lock
from typing import Any, Callable, DefaultDict, Generic, Optional, Type, TypeVar, Union, overload
from typing_extensions import Protocol
_T = TypeVar("_T")
_S = TypeVar("_S")
_LA = TypeVar("_LA", bound="LazyAttr[Any]")
_CP = TypeVar("_CP", bound="CachedProperty[Any, Any]")
_CallableWithoutReturnValue = TypeVar("_CallableWithoutReturnValue", bound=Callable[..., None])
_WEEKS = ("Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun")
_MONTHS = ("Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec")
[docs]locks: DefaultDict[int, Lock] = defaultdict(Lock)
[docs]def urlnorm(url: str) -> str:
"""Normalized the input url by removing the trailing slash.
Arguments:
url: the url needs to be normalized.
Returns:
The normalized url.
"""
return url.rstrip("/")
[docs]def locked(func: _CallableWithoutReturnValue) -> _CallableWithoutReturnValue:
"""The decorator to add threading lock for methods.
Arguments:
func: The method needs to add threading lock.
Returns:
The method with theading locked.
"""
@wraps(func)
def wrapper(self: Any, *arg: Any, **kwargs: Any) -> None:
key = id(self)
lock = locks[key]
acquire = lock.acquire(blocking=False)
try:
if acquire:
func(self, *arg, **kwargs)
del locks[key]
else:
lock.acquire()
finally:
lock.release()
return wrapper # type: ignore[return-value]
[docs]def shorten(origin: str) -> str:
"""Return the first 7 characters of the original string.
Arguments:
origin: The string needed to be shortened.
Returns:
A string of length 7.
"""
return origin[:7]
if version_info >= (3, 7):
[docs] def convert_iso_to_datetime(date_string: str) -> datetime:
"""Convert iso 8601 format string to datetime format time with local timezone.
Arguments:
date_string: The iso 8601 format string.
Returns:
The datetime format time with local timezone.
"""
return datetime.fromisoformat(date_string.replace("Z", "+00:00")).astimezone()
else:
_DATE_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
def convert_iso_to_datetime(date_string: str) -> datetime:
"""Convert iso 8601 format string to datetime format time with local timezone.
Arguments:
date_string: The iso 8601 format string.
Returns:
The datetime format time with local timezone.
"""
return (
datetime.strptime(date_string, _DATE_FORMAT).replace(tzinfo=timezone.utc).astimezone()
)
[docs]def convert_datetime_to_gmt(utctime: datetime) -> str:
"""Convert datetime to gmt format string.
Arguments:
utctime: The datetime with utc timezone.
Returns:
The gmt format string.
"""
return (
f"{_WEEKS[utctime.weekday()]}, {utctime.day:02d} {_MONTHS[utctime.month - 1]}"
f" {utctime.year:04d} {utctime.hour:02d}:{utctime.minute:02d}:{utctime.second:02d} GMT"
)
class _LazyLoad(Protocol):
def _load(self) -> None:
...
[docs]class LazyAttr(Generic[_T]):
"""The descriptor for the lazy loaded attr."""
_name: str
def __set_name__(self, _: Type[_LazyLoad], name: str) -> None:
self._name = name
@overload
def __get__(self: _LA, obj: None, _: Type[_LazyLoad]) -> _LA:
...
@overload
def __get__(self, obj: _LazyLoad, _: Type[_LazyLoad]) -> _T:
...
def __get__(self: _LA, obj: Optional[_LazyLoad], _: Type[_LazyLoad]) -> Union[_LA, _T]:
if obj is None:
return self
obj._load()
return getattr(obj, self._name) # type: ignore[no-any-return]
[docs]class CachedProperty(Generic[_S, _T]):
"""The descriptor for the cached property.
Arguments:
func: the property function needs to be cached.
"""
_name: str
def __init__(self, func: Callable[[_S], _T]) -> None:
self._func = func
self.__doc__ = func.__doc__
def __set_name__(self, _: Type[_S], name: str) -> None:
self._name = name
@overload
def __get__(self: _CP, obj: None, _: Type[_S]) -> _CP:
...
@overload
def __get__(self, obj: _S, _: Type[_S]) -> _T:
...
def __get__(self: _CP, obj: Optional[_S], _: Type[_S]) -> Union[_CP, _T]:
if obj is None:
return self
value: _T = self._func(obj)
setattr(obj, self._name, value)
return value
[docs]class ModuleMocker:
"""A fake module to raise ``ModuleNotFoundError`` lazily.
Arguments:
message: The error message for the raised ``ModuleNotFoundError``.
"""
def __init__(self, message: str) -> None:
self._message = message
def __call__(self, *_: Any, **__: Any) -> Any:
"""Raise ``ModuleNotFoundError`` when called.
Arguments:
_: Useless positional arguments.
__: Useless keyword arguments
Raises:
ModuleNotFoundError: When called.
"""
raise ModuleNotFoundError(super().__getattribute__("_message"))
def __getattribute__(self, name: str) -> Any:
raise ModuleNotFoundError(super().__getattribute__("_message"))