Source code for graviti.portex.avro

#!/usr/bin/env python3

# pylint: disable=missing-class-docstring
# pylint: disable=missing-function-docstring
# flake8: noqa

"""Code converting PyArrow schema to Avro Schema."""

from typing import Any, Dict, Type

from graviti.portex.base import PortexType
from graviti.portex.builtin import (
    array,
    binary,
    boolean,
    date,
    enum,
    float32,
    float64,
    int32,
    int64,
    record,
    string,
    time,
    timedelta,
    timestamp,
)


[docs]class AvroSchema: def __init__(self, name: str, namespace: str, portex_type: PortexType) -> None: raise NotImplementedError
[docs] def to_json(self) -> Dict[str, Any]: raise NotImplementedError
[docs]class AvroField: def __init__( self, type_: AvroSchema, name: str, *, optional: bool = True, has_default: bool = False, default: Any = None, ) -> None: self._type = type_ self._name = name self._optional = optional self._has_default = has_default self._default = default
[docs] def to_json(self) -> Dict[str, Any]: if self._optional: result: Dict[str, Any] = { "name": self._name, "type": [ "null", self._type.to_json(), ], } else: result = { "name": self._name, "type": self._type.to_json(), } if self._has_default: result["default"] = self._default return result
[docs]class AvroPrimitiveSchema(AvroSchema): _PRIMITIVE_TYPES: Dict[Type[PortexType], str] = { int32: "int", int64: "long", float32: "float", float64: "double", boolean: "boolean", string: "string", binary: "bytes", } def __init__(self, name: str, namespace: str, portex_type: PortexType): try: self._type = self._PRIMITIVE_TYPES[type(portex_type)] except KeyError: raise Exception(f"unsupported type {portex_type}") from None
[docs] def to_json(self) -> str: # type: ignore[override] return self._type
[docs]class AvroRecord(AvroSchema): def __init__(self, name: str, namespace: str, portex_type: record): self._name = name self._namespace = namespace self._fields = [ AvroField(_avro_schema_creator(key, f"{namespace}.{name}", value), key) for key, value in portex_type.items() ]
[docs] def to_json(self) -> Dict[str, Any]: return { "type": "record", "name": self._name, "namespace": self._namespace, "fields": [field.to_json() for field in self._fields], }
[docs]class AvroArray(AvroSchema): def __init__(self, name: str, namespace: str, portex_type: array): self._items = _avro_schema_creator("items", f"{namespace}.{name}.items", portex_type.items)
[docs] def to_json(self) -> Dict[str, Any]: return { "type": "array", "items": self._items.to_json(), "default": [], }
[docs]class PortexEnum(AvroSchema): def __init__(self, name: str, namespace: str, portex_type: enum): self._values = portex_type.values
[docs] def to_json(self) -> Dict[str, Any]: min_index, max_index = self._values.index_scope avro_type = "int" if min_index >= -2147483648 and max_index <= 2147483647 else "long" return { "type": avro_type, "logicalType": "portex.enum", "values": self._values.to_pyobj(), }
[docs]class PortexDate(AvroSchema): def __init__(self, name: str, namespace: str, portex_type: PortexType) -> None: pass
[docs] def to_json(self) -> Dict[str, Any]: return { "type": "int", "logicalType": "portex.date", }
[docs]class PortexTime(AvroSchema): _AVRO_TYPES = { "s": "int", "ms": "int", "us": "long", "ns": "long", } def __init__(self, name: str, namespace: str, portex_type: time) -> None: self._unit = portex_type.unit
[docs] def to_json(self) -> Dict[str, Any]: return { "type": self._AVRO_TYPES[self._unit], "logicalType": "portex.time", "unit": self._unit, }
[docs]class PortexTimestamp(AvroSchema): def __init__(self, name: str, namespace: str, portex_type: timestamp): self._unit = portex_type.unit self._tz = portex_type.tz
[docs] def to_json(self) -> Dict[str, Any]: return { "type": "long", "logicalType": "portex.timestamp", "unit": self._unit, "tz": self._tz, }
[docs]class PortexTimedelta(AvroSchema): def __init__(self, name: str, namespace: str, portex_type: timedelta) -> None: self._unit = portex_type.unit
[docs] def to_json(self) -> Dict[str, Any]: return { "type": "long", "logicalType": "portex.timedelta", "unit": self._unit, }
_COMPLEX_TYPE_PROCESSERS: Dict[Type[PortexType], Type[AvroSchema]] = { record: AvroRecord, array: AvroArray, enum: PortexEnum, date: PortexDate, time: PortexTime, timestamp: PortexTimestamp, timedelta: PortexTimedelta, } def _avro_schema_creator(name: str, namespace: str, portex_type: PortexType) -> AvroSchema: builtin_portex_type = portex_type.to_builtin() processer = _COMPLEX_TYPE_PROCESSERS.get(type(builtin_portex_type), AvroPrimitiveSchema) return processer(name, namespace, builtin_portex_type)
[docs]def convert_portex_schema_to_avro(portex_type: record) -> Dict[str, Any]: return AvroRecord("root", "cn.graviti.portex", portex_type).to_json()