????JFIF??x?x????'
| Server IP : 172.67.174.47  /  Your IP : 216.73.216.145 Web Server : LiteSpeed System : Linux premium151.web-hosting.com 4.18.0-553.44.1.lve.el8.x86_64 #1 SMP Thu Mar 13 14:29:12 UTC 2025 x86_64 User : tempvsty ( 647) PHP Version : 8.0.30 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : OFF | Pkexec : OFF Directory : /././opt/cloudlinux/venv/lib/python3.11/site-packages/lvestats/plugins/generic/burster/ | 
| Upload File : | 
# coding=utf-8
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2023 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
import typing
import dataclasses
from pathlib import Path
from logging import Logger
from datetime import timedelta
from dataclasses import dataclass
from collections import ChainMap
from typing import Any, Generator, TypedDict, Mapping, Self, NamedTuple, Iterable, NotRequired
import sqlalchemy as sa
from ._logs import logger
# TODO(vlebedev): Extract from package metadata instead of hardcoding?
CONFIG_FILE = Path('/etc/sysconfig/lvestats.config/LveLimitsBurster.cfg')
FEATURE_FLAG_FILE = Path('/opt/cloudlinux/flags/enabled-flags.d/burstable-limits.flag')
class PluginConfig(TypedDict):
    bursting_enabled: NotRequired[str]
    server_id: NotRequired[str]
    bursting_quota_sec: str
    bursting_quota_window_sec: str
    bursting_idle_time_threshold: str
    bursting_cpu_multiplier: str
    bursting_io_multiplier: str
    bursting_database_dump_period_sec: str
    bursting_idle_time_samples_num: str
    bursting_debug_mode: NotRequired[str]
_all_burster_plugin_config_keys = frozenset(PluginConfig.__annotations__.keys())
@dataclass(frozen=True)
class Config:
    server_id: str
    bursting_quota: timedelta
    bursting_quota_window: timedelta
    bursting_cpu_multiplier: float
    bursting_io_multiplier: float
    idle_time_threshold: float
    db_dump_period: timedelta
    idle_time_samples: int
    fail_fast: bool = True
    def __post_init__(self) -> None:
        if self.bursting_quota > self.bursting_quota_window:
            raise ValueError('Bursting quota must be less than or equal to bursting quota window!')
_all_config_keys = frozenset(f.name for f in dataclasses.fields(Config))
def is_bursting_enabled(config_file=CONFIG_FILE) -> bool:
    try:
        raw_config = read_raw_config(config_file)
    except FileNotFoundError:
        return False
    raw_key = 'bursting_enabled'
    assert raw_key in _all_burster_plugin_config_keys
    try:
        raw_value = raw_config[raw_key]
    except KeyError:
        return False
    try:
        return get_boolean(raw_value)
    except ValueError:
        return False
def is_bursting_supported(feature_flag_file: Path = FEATURE_FLAG_FILE) -> bool:
    # NOTE(vlebedev): These imports requires some shared library to be present in order to succeed,
    #                 so deffer it until it's really needed to make unittests writing/running easier.
    from clcommon.utils import get_cl_version, is_ubuntu  # pylint: disable=import-outside-toplevel
    from clcommon.cpapi import Feature, is_panel_feature_supported  # pylint: disable=import-outside-toplevel
    if not is_panel_feature_supported(Feature.LVE):
        return False
    if is_ubuntu():
        return False
    cl_version = get_cl_version()
    if cl_version is None:
        return False
    try:
        if int(cl_version.removeprefix('cl').removesuffix('h')) < 8:
            return False
    except ValueError:
        return False
    return feature_flag_file.exists()
def _identity(raw_value: str) -> str:
    return raw_value
def get_boolean(raw_value: str) -> bool:
    value = raw_value.lower()
    if value not in {'true', 'false'}:
        raise ValueError(f'Unexpected value: {value}')
    return value == 'true'
def _get_timedelta_from_seconds(raw_value: str) -> timedelta:
    seconds = int(raw_value)
    return timedelta(seconds=seconds)
_raw_key_to_spec = {
    'bursting_enabled': ('enabled', get_boolean),
    'server_id': ('server_id', _identity),
    'bursting_debug_mode': ('fail_fast', get_boolean),
    'bursting_quota_sec': ('bursting_quota', _get_timedelta_from_seconds),
    'bursting_quota_window_sec': ('bursting_quota_window', _get_timedelta_from_seconds),
    'bursting_cpu_multiplier': ('bursting_cpu_multiplier', float),
    'bursting_io_multiplier': ('bursting_io_multiplier', float),
    'bursting_idle_time_threshold': ('idle_time_threshold', float),
    'bursting_database_dump_period_sec': ('db_dump_period', _get_timedelta_from_seconds),
    'bursting_idle_time_samples_num': ('idle_time_samples', int),
}
_config_to_raw_key = {v[0]: k for k, v in _raw_key_to_spec.items()}
assert _raw_key_to_spec.keys() == _all_burster_plugin_config_keys
assert {k for k, _ in _raw_key_to_spec.values()}.issuperset(_all_config_keys)
def _process_raw_config(raw_config: Mapping[str, str]) -> dict[str, Any]:
    cfg_key_to_parsed_value, errors_by_cfg_key = {}, {}
    for config_key, raw_value in raw_config.items():
        try:
            _, extractor = _raw_key_to_spec[config_key]
        except KeyError:
            # NOTE(vlebedev): Currently config dict contains all the keys from _all_ .cfg files parsed by
            #                 lvestats. So there is no point as report fields not present in `Confg` typing
            #                 as "unknown" or something like that - they might well belong to some other plugin =/
            # errors_by_cfg_key[config_key] = f'Unknown config key'
            continue
        try:
            value = extractor(raw_value)
        except ValueError as e:
            errors_by_cfg_key[config_key] = str(e)
            continue
        cfg_key_to_parsed_value[config_key] = value
    if len(errors_by_cfg_key) > 0:
        logger.warning(
            "Failed to parse some config keys: \n%s",
            "\n".join(f"* {k}: {e}" for k, e in errors_by_cfg_key.items()),
        )
    result = {_raw_key_to_spec[k][0]: v for k, v in cfg_key_to_parsed_value.items()}
    return result
class MissingKeysInRawConfig(ValueError):
    def __init__(self, missing_raw_keys: Iterable[str]) -> None:
        missing_raw_keys = frozenset(missing_raw_keys)
        msg = "Missing config keys: " + ", ".join(missing_raw_keys) + "!"
        super().__init__(msg, missing_raw_keys)
    @property
    def missing_raw_keys(self) -> frozenset[str]:
        return typing.cast(frozenset[str], self.args[1])
class ConfigUpdate(NamedTuple):
    @classmethod
    def from_plugin_config(cls, config: PluginConfig) -> Self:
        assert all(isinstance(v, str) for v in config.values())
        external_params = _process_raw_config(typing.cast(Mapping[str, str], config))
        default_params = {
            'enabled': False,
            'server_id': 'localhost',
            'fail_fast': False,
        }
        if (defaults_used := default_params.keys() - external_params.keys()):
            logger.info('Using default values for: %s', defaults_used)
        params = ChainMap(external_params, default_params)
        missing_config_keys = _all_config_keys - params.keys()
        if missing_config_keys:
            raise MissingKeysInRawConfig(_config_to_raw_key[k] for k in missing_config_keys)
        return cls(
            enabled=params['enabled'],
            config=Config(**{k: params[k] for k in _all_config_keys})
        )
    enabled: bool
    config: Config
class StartupParams(NamedTuple):
    @classmethod
    def wait(cls) -> Generator[None, ConfigUpdate | sa.engine.Engine, Self]:
        required_keys = frozenset(cls._fields)
        result = {}
        enabled = False
        while enabled is False or result.keys() != required_keys:
            match (yield):
                case sa.engine.Engine() as engine:
                    result['engine'] = engine
                case ConfigUpdate(enabled=enabled, config=config):
                    result['config'] = config
        return cls(**result)
    engine: sa.engine.Engine
    config: Config
def read_raw_config(file: Path = CONFIG_FILE, _logger: Logger = logger) -> Mapping[str, str]:
    result = {}
    for line in file.read_text(encoding='utf-8').splitlines():
        try:
            key, value = line.split('=', maxsplit=1)
        except ValueError:
            _logger.warning('Failed to parse config line: %s', line)
            continue
        if key in result:
            _logger.warning('Duplicate key %s - latest value will be used', key)
        result[key] = value
    return result