????JFIF??x?x????'
Server IP : 172.67.174.47 / Your IP : 216.73.216.87 Web Server : LiteSpeed System : Linux premium151.web-hosting.com 4.18.0-553.44.1.lve.el8.x86_64 #1 SMP Thu Mar 13 14:29:12 UTC 2025 x86_64 User : tempvsty ( 647) PHP Version : 8.0.30 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : OFF | Pkexec : OFF Directory : /././opt/cloudlinux/venv/lib/python3.11/site-packages/clsummary/ |
Upload File : |
# coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT import datetime import hashlib import itertools import json import logging import math import os import platform import random import re import string import subprocess import sys import time import typing from copy import deepcopy from collections import Counter, defaultdict from functools import lru_cache, partial from multiprocessing import cpu_count from pathlib import Path from socket import getfqdn from typing import Any, AnyStr, Callable, Dict, List, Optional, Tuple, Union # NOQA import cldetectlib as detect import lvectllib import psutil import requests from cl_proc_hidepid import get_hidepid_typing_from_mounts from clcommon import cpapi from clcommon.clwpos_lib import ( find_wp_paths, get_wp_cache_plugin, get_wp_paths_with_enabled_module, ) from clcommon.const import Feature from clcommon.cpapi import is_panel_feature_supported, is_wp2_environment from clcommon.lib import MySQLGovernor from clcommon.lib.cledition import ( CLEditions, is_cl_shared_edition, is_cl_solo_edition, ) from clcommon.lib.cmt_utils import client_activation_data from clcommon.lib.consts import ACCELERATE_WP_INSTALLED_FROM_CM from clcommon.sysctl import SysCtlConf from clcommon.utils import ( ExternalProgramFailed, get_cl_version, get_rhn_systemid_value, get_virt_type, grep, is_litespeed_running, is_nginx_running, is_secureboot_enabled, is_testing_enabled_repo, is_ubuntu, run_command, ) from clconfig import cagefs_statistics_config, clconfig_utils, db_governor_lib from cldiaglib import is_email_notification_enabled from cli_utils import print_dictionary, replace_params from cllimitslib_v2 import DEFAULTS, LimitsDataStorage from clveconfig import EMPTY_LIMITS from clwizard.modules import ALL_MODULES from lve_utils import PKG_VERSION as LVE_UTILS_PKG_VERSION from lveapi import LvpMap from vendors_api.config import CONFIG_PATH, _read_config_file from cl_website_collector.website_collector import WebsiteCollector from cl_website_collector.feature_manager import FeatureManager from clsummary.utils import ( SummaryStatus, dummy_none_function, get_cl_plus_sender_status, get_client_data_from_jwt_token, get_packages_with_lve_extensions, get_statistics_send_status_from_file, is_active_cloudlinux_license, is_nodejs_selector_installed, is_php_selector_installed, is_python_selector_installed, is_ruby_selector_installed, is_sending_process_running, is_statistic_enabled, is_statistic_rpm_enabled, set_statistic_collection_enabled, set_statistic_rpm_collection_enabled, write_statistics_send_status_to_file, ) from clsummary.hardware_statistics import ( NotSupported, get_cpu_metrics, get_memory_metrics, ) from clsummary.rpm_packages_statistics import get_rpm_packages_info from .arg_parse import parse_cloudlinux_summary_opts from .storage import StatisticsDict try: # Package from lvemanager - can be absent from clselect.clselectctl import get_default_version from clselect.clselectstatistics import ( get_default_php_version, get_mode_of_php_selector, get_native_version_safe, get_php_selector_usage, get_versions_statistics, iter_server_applications, ) from clselector.selectorlib import CloudlinuxSelectorLib from lvemanager import PKG_RELEASE as LVEMANAGER_PKG_RELEASE from lvemanager import PKG_VERSION as LVEMANAGER_PKG_VERSION except ImportError: iter_server_applications = dummy_none_function get_mode_of_php_selector = dummy_none_function get_default_php_version = dummy_none_function get_default_version = dummy_none_function get_versions_statistics = dummy_none_function get_native_version_safe = dummy_none_function get_php_selector_usage = dummy_none_function CloudlinuxSelectorLib = None LVEMANAGER_PKG_VERSION = None LVEMANAGER_PKG_RELEASE = None try: from clflags import list_flags_info except ImportError: def list_flags_info(): return [] LOG_FILE = "/var/log/cloudlinux-summary.log" app_logger = logging.getLogger("cloudlinux-summary") UNKNOWN_RHN_ID = "unknown" INSTALLED = "installed" NOT_INSTALLED = "not_installed" NOT_INITIALIZED = "not_initialized" NOT_SELECTED = "not_selected" ENABLED = "enabled" DISABLED = "disabled" ERROR = "-42" def site_has_enabled_modules(site_dict: dict) -> bool: """ Checks if wordpress site has at least one enabled module """ return any(map(lambda module: module["enabled"], site_dict["modules"].values())) class CloudlinuxSummary: DASHBOARD_CERTIFICATE = "/var/lve/dashboard_certificate" CL_PLUS_CM_DISABLED_PATH = "/etc/cl_plus/.disabled" SELECTORS = itertools.compress( ["python", "ruby", "nodejs"], [ cpapi.is_panel_feature_supported(Feature.PYTHON_SELECTOR), cpapi.is_panel_feature_supported(Feature.RUBY_SELECTOR), cpapi.is_panel_feature_supported(Feature.NODEJS_SELECTOR), ], ) # Utility will send statistics to this server SUMMARY_URL = "https://stat-api.cloudlinux.com/api/clos-stat" WEBSITE_COLLECTOR_URL = "https://stat-api.cloudlinux.com/api/clos-upload" RPM_PACKAGES_URL = "https://stat-api.cloudlinux.com/api/rpm-stats" SETTINGS_URL = "https://repo.cloudlinux.com/static/cl-settings-v1.json" def __init__(self): self._opts = {} self._security_token = None self.statistics: StatisticsDict = StatisticsDict() self._lvpmap = None self._system_id = None self.is_process_not_limited = self._is_process_not_limited() self.packages_by_len = None self.sysctl = SysCtlConf() self.remote_settings = None @staticmethod def _is_process_not_limited(): """ Return true if process is running outside LVE or it's not running by utility `nice` """ is_running_by_nice = bool(os.environ.get("RUNNING_BY_NICE")) is_running_in_lve = bool(os.environ.get("RUNNING_IN_LVE")) return not is_running_by_nice and not is_running_in_lve @property def lvpmap(self): """ Load lvpmap only when needed """ if self._lvpmap is None: self._lvpmap = _get_lvpmap() return self._lvpmap @property def system_id(self) -> str: if self._system_id is None: self._system_id = get_rhn_systemid_value("system_id") return self._system_id @staticmethod def _get_platform(): return "ubuntu" if is_ubuntu() else "rhel_like" @staticmethod def _detect_secureboot(): return ENABLED if is_secureboot_enabled() else DISABLED @staticmethod def _generate_security_token(): range_for_random_choice = string.ascii_letters + string.digits security_token = "".join(random.choice(range_for_random_choice) for _ in range(64)) return security_token def _get_remote_data(self) -> dict: stat_data = {} if self.security_token is None: message = "Security token is empty" app_logger.error(message) self._error_and_exit({"result": message}) message = f"Getting statistics from server {self.SUMMARY_URL}" app_logger.info(message) params = { "system_id": self.system_id, "security_token": self.security_token, } response = None try: response = requests.get(self.SUMMARY_URL, params=params, timeout=60) except requests.RequestException as e: message = str(e) app_logger.error(message) self._error_and_exit({"result": message}) if not response.ok: message = f"Server answer is: HTTP code {response.status_code}; Reason: {response.reason}" app_logger.info(message) self._error_and_exit({"result": message}) app_logger.info("Received response from the server") try: stat_data = response.json()["result"] # extend remote statistics with Smart Advice statistics, # resolved LOCALLY (for details, please see XRAY-427) if isinstance(stat_data["result"], dict): stat_data["result"].update(self._get_smart_advice_statistics()) except (TypeError, ValueError): message = "Can't parse api response to json" app_logger.error(message) self._error_and_exit({"result": message}) except KeyError as e: app_logger.error('Invalid json response from server, field %s not found in "%s"', str(e), response.text) self._error_and_exit({"result": "Invalid response from server. " f"See {LOG_FILE} for details."}) else: app_logger.info("SUCCESS: received statistics from the server") return stat_data @property def security_token(self): if self._security_token is not None: return self._security_token if os.path.isfile(self.DASHBOARD_CERTIFICATE): self._security_token = self._read_token_from_file() else: # generate token if we do not have certificate file token = self._generate_security_token() self._security_token = token if self._write_token_to_file(token) else None return self._security_token def _write_token_to_file(self, token): """ Write security token to file and return success/fail status :param token: generated security token :return: T/F status """ try: with open(self.DASHBOARD_CERTIFICATE, "w", encoding="utf-8") as f: f.write(token) os.chmod(self.DASHBOARD_CERTIFICATE, 0o600) return True except (IOError, OSError) as e: app_logger.error("Error while writing secure token to file: %s", str(e)) return False def _read_token_from_file(self): try: with open(self.DASHBOARD_CERTIFICATE, "r", encoding="utf-8") as f: return f.read().strip() or None except (IOError, OSError) as e: app_logger.error("Error while reading file with secure token: %s", str(e)) return None @staticmethod def _detect_old_lve_integration() -> bool: """ Detect old LVE limits integration presence according to https://docs.cloudlinux.com/index.html?lve_limits_with_packages.html :return: True/False - present/absent """ # Try to get script name from config return detect.get_boolean_param( file_name=detect.CL_CONFIG_FILE, param_name="CUSTOM_GETPACKAGE_SCRIPT", separator="=", default_val=False, ) @staticmethod def _is_lsapi_present(): """ Detects presence/absence of lsapi :return: True/False """ return os.path.exists("/usr/bin/switch_mod_lsapi") @staticmethod def _get_status_of_selector(interpreter: str) -> str: """ Get selector status for nodejs, python, ruby and php selectors """ # Ruby cannot be disabled, so check on installation is enough if interpreter == "python": if not is_python_selector_installed(): return NOT_INSTALLED elif interpreter == "ruby": return ENABLED if is_ruby_selector_installed() else NOT_INSTALLED elif interpreter == "nodejs": if not is_nodejs_selector_installed(): return NOT_INSTALLED elif interpreter == "php" and not is_php_selector_installed(): return NOT_INSTALLED lib = CloudlinuxSelectorLib(interpreter) if lib is None: return NOT_INSTALLED if interpreter in ["nodejs", "python"]: try: return ENABLED if lib.get_selector_status()["selector_enabled"] else DISABLED except KeyError: return NOT_INSTALLED elif interpreter == "php": return DISABLED if lib.php_selector_is_disabled() else ENABLED raise ValueError(f"Unknown interpreter: {interpreter}") def _get_remote_settings(self, settings_url): if self.remote_settings: return self.remote_settings try: self.remote_settings = requests.get(settings_url, timeout=10).json() return self.remote_settings except requests.RequestException as e: app_logger.error("Request exception while getting remote settings: %s", str(e)) self._error_and_exit({"result": str(e)}) except (ValueError, TypeError) as e: app_logger.error("Error while parsing remote settings: %s", str(e)) return None def _is_statistics_enabled(self): """ Return cl-statistics status """ if self._opts.get("--force-collect"): return True # In Cloudlinux tests environment statistics always enabled is_test_environment = bool(os.environ.get("CL_TEST_SYSTEM")) if is_test_environment: return True settings = self._get_remote_settings(self.SETTINGS_URL) if settings is None: return False try: rollout_group = settings["cl-statistics"]["rollout-group"] return settings["cl-statistics"]["enabled"] and self._match_server(rollout_group) except (KeyError, TypeError) as e: app_logger.error("Error occurred while trying to get rollout group: %s", str(e)) self._error_and_exit({"result": str(e)}) def _website_collector_status(self, client_data): """ Return cl-website-collector status """ def _prepare_metadata(client_data): metadata = { "client_id": str(client_data.get("client_id", "")), "system_id": str(client_data.get("system_id", "")), "control_panel": "wp2" if is_wp2_environment() else detect.getCPName(), "domains_amount": self._get_total_domains_amount(), } return metadata # In Cloudlinux tests environment statistics always enabled is_test_environment = bool(os.environ.get("CL_TEST_SYSTEM")) if is_test_environment: return True metadata = _prepare_metadata(client_data) try: htaccess_collection = FeatureManager(app_logger=app_logger).get_decision(server_metadata=metadata, reason="htaccess") except Exception as e: app_logger.error("Error occurred while trying to get decision for htaccess collection=%s", str(e)) return {} return htaccess_collection @staticmethod def _to_number(hash_server): return int(hash_server, 16) def _match_server(self, url_num, salt=None): if self.system_id is None: # system_id is None if server is not registered, but we still need to collect statistics return True # Use salt to modify hash input if provided hash_input = self.system_id if salt: hash_input = f"{self.system_id}:{salt}" hash_server = hashlib.sha256(hash_input.encode()).hexdigest()[:20] return (self._to_number(hash_server) % 2 ** url_num) == 0 @staticmethod def _wait_for_background_process() -> None: """ Wait for running background process of cl-summary """ retries = 50 while retries and not is_sending_process_running(): retries -= 1 time.sleep(0.1) def _actions_before_run_process_as_limited(self): if self._opts["enable"] or self._opts["disable"]: # Enable/Disable collect statistics set_statistic_collection_enabled(self._opts["enable"]) # Print result data = {"timestamp": time.time(), "result": "success"} print_dictionary(data, True) return if self._opts["enable-rpm"] or self._opts["disable-rpm"]: # Enable/Disable collect statistics set_statistic_rpm_collection_enabled(self._opts["enable-rpm"]) # Print result data = {"timestamp": time.time(), "result": "success"} print_dictionary(data, True) return if self._opts["status"]: # show collecting status here and exit status = "collecting" if is_sending_process_running() else "ready" data = {"timestamp": time.time(), "status": status, "result": "success"} # Add last send statistics status data.update({"sending_status": get_statistics_send_status_from_file()}) print_dictionary(data, True) sys.exit(0) if self._opts.get("get-remote"): result = self._get_remote_data() # Append statistics collection status self._print_result_and_exit(data=result, is_statistic_enabled=is_statistic_enabled()) if self._opts.get("rpm-packages"): if not is_statistic_rpm_enabled() and not self._opts.get("--force-collect"): self._error_and_exit( { "result": "Sending RPM statistics is disabled by admin. " "Use --force-collect to ignore admin`s settings." }, error_code=0, ) self._get_rpm_packages_summary() if self._opts.get("--send"): self._send_statistics_and_save_status( summary=self.statistics, url=self.RPM_PACKAGES_URL, save_status=False, ) app_logger.info("RPM statistics sent") else: print_dictionary(self.statistics, True) return if not self._is_statistics_enabled(): status_dict = { "result": SummaryStatus.FAILED, "reason": "Statistics collection is disabled globally. " "Please, try again later or contact support if it happens again.", "timestamp": time.time(), } write_statistics_send_status_to_file(status_dict) self._error_and_exit( { "result": "Collecting statistics is disabled globally. " "Use --force-collect to ignore global settings" }, error_code=0, ) # check admin`s statistics settings before sending if self._opts.get("--send"): if not self._opts.get("--force-collect") and not is_statistic_enabled(): status_dict = { "result": SummaryStatus.FAILED, "reason": "Statistics collection is disabled by admin. " "Run `cloudlinux-summary enable` and then try again.", "timestamp": time.time(), } write_statistics_send_status_to_file(status_dict) self._error_and_exit( { "result": "Sending statistics is disabled by admin. " "Use --force-collect to ignore admin`s settings." }, error_code=0, ) if self.security_token is None: message = "Statistics was not sent, because security token is empty" app_logger.error(message) status_dict = { "result": SummaryStatus.FAILED, "reason": "We are not able to collect statistics because " f"we are not able to make a security token. Check {LOG_FILE} " "for details or contact support.", "timestamp": time.time(), } write_statistics_send_status_to_file(status_dict) self._error_and_exit({"result": message}) if self._opts.get("--async"): # Async start of collecting statistics if is_sending_process_running(): # Statistics already collecting # status field below may be absent due to race, and # we cannot fix this race because lock cannot be # acquired here due to async running of child process. # Lock will be released in parent process when it dies and # we cannot transfer the lock to child correctly data = {"timestamp": time.time(), "status": "collecting", "result": "success"} else: # temporary marker in order to know # when something crashed in collection process # this write needed to handle case when exec call # fails before reaching write in case of '--send' below write_statistics_send_status_to_file( {"result": SummaryStatus.IN_PROGRESS, "timestamp": time.time(), "reason": None}, ) # No background process found, start new collecting subprocess.run( "/usr/sbin/cloudlinux-summary --send --json &> /dev/null &", shell=True, executable="/bin/bash", check=False, ) self._wait_for_background_process() data = {"timestamp": time.time(), "result": "success"} print_dictionary(data, True) sys.exit(0) # Input logic description: # If --json and --send options are present and collection process found - print error # else - work as usual. # No need to check --json option because it is mandatory and arg parser will fail without it if self._opts["--send"]: if self.is_process_not_limited and is_sending_process_running(acquire_lock=True): # Checking/acquiring of lock is performed as unlimited process only. # Otherwise child process will not do the job due to busy lock. # Lock should be acquired in parent process in order to # avoid race and produce correct status below. # if collection process found (lock is busy) - print 'collecting' status data = {"timestamp": time.time(), "status": "collecting", "result": "success"} print_dictionary(data, True) sys.exit(0) else: # this write needed to handle case when we run --send without --async write_statistics_send_status_to_file( {"result": SummaryStatus.IN_PROGRESS, "timestamp": time.time(), "reason": None}, ) # TODO: we need this bicycle because method pylve.lve_enter_pid does not work properly (surprise!) # when we call lve_enter_pid, lve limits process only by cpu usage, other parameters are unlimited @staticmethod def _run_self_in_lve(args): """ Run same command in lve and set environ RUNNING_IN_LVE=1 in order to check it in child process. :return: """ settings = lvectllib.make_liblve_settings( lvectllib.create_liblve_settings( ls_cpu=15, # 15 percents of CPU (NOT core) ls_cpus=0, ls_memory_phy=1024 * 1024**2, # 1gb ) ) with lvectllib.temporary_lve(settings) as lve_id: args.extend(["--lve-id", str(lve_id)]) return subprocess.call( ["/bin/lve_suwrapper", "-n", str(lve_id), "/usr/sbin/cloudlinux-summary"] + args, env=dict( os.environ, RUNNING_IN_LVE="1", # we use /proc/cpuinfo to get cpu information, but unfortunately # it returns CURRENT cpu speed, which is different in lve environment # and we cannot get right speed value there CPU_DATA=json.dumps(lvectllib.CPUINFO_DATA), ), ) @staticmethod def _run_self_by_nice(args): """ Run same command using utility `nice` and set environ RUNNING_BY_NICE=1 in order to check it in child process. :return: """ return subprocess.call( ["/usr/bin/nice", "-n", "19", "/usr/sbin/cloudlinux-summary"] + args, env=dict( os.environ, RUNNING_BY_NICE="1", ), ) @staticmethod def _should_run_as_unlimited_process(opts: Dict[Any, Any]) -> bool: """ Check that passed command should run as unlimited process """ if any( opts[option] for option in ( "rpm-packages", "status", "get-remote", "enable", "disable", "enable-rpm", "disable-rpm", ) ): return True if any( opts[option] for option in ( "--send", "--async", "--json", "--force-collect", ) ): return False return False def run(self, argv): # get arguments self._opts = self._parse_args(argv) if self.is_process_not_limited: # The call does actions which don't require run in LVE: # - reading/writing status of statistics collection # - getting remote data # - enabling/disabling statistics collection # - processing async run of statistics collection # - running rpm statistics collection self._actions_before_run_process_as_limited() # We should run only main statistics collection in LVE # in other cases we skip run in LVE if self._should_run_as_unlimited_process(self._opts): sys.exit(0) elif not is_panel_feature_supported(Feature.LVE): rc = self._run_self_by_nice(argv) sys.exit(rc) else: try: rc = self._run_self_in_lve(argv) sys.exit(rc) except lvectllib.PyLveError as e: error_msg = f"failed to run task in lve, error: {e}" print(error_msg) log = logging.getLogger(__name__) log.exception(error_msg, exc_info=True) sys.exit(-1) else: if self._should_run_as_unlimited_process(self._opts): err_msg = ( "You shouldn't use env var " '"RUNNING_IN_LVE" or "RUNNING_BY_NICE" for run ' "of any command except collection " "of main statistics." ) data = { "timestamp": time.time(), "result": err_msg, } app_logger.error(err_msg, extra=self._opts) print_dictionary(data, True) sys.exit(1) self.run_get_summary() def run_get_summary(self): start_time = time.time() self._get_summary() running_time = time.time() - start_time self.statistics["cl_summary_execution_time"] = running_time if self._opts["--lve-id"]: self.statistics.add_metric( partial(self._get_max_memory, running_time), "str", "cl_summary_max_mem_used", "Can't get memory usage by cloudlinux-summary", ) if self._opts.get("--send"): # Send main statistics self._send_statistics_and_save_status( summary=self.statistics, url=self.SUMMARY_URL, save_status=True, ) app_logger.info("Main statistics sent") try: # Collect and send website collector data self.website_collector() except Exception as e: app_logger.error("Website collector failed: %s", e) else: # In non-send mode show main statistics only print_dictionary(self.statistics, True) @staticmethod def _save_status(timestamp: int, summary_result: AnyStr) -> None: """ Save status of sending statistics to json file """ # Also write summary result to file status_dict = {"result": SummaryStatus.SUCCESS, "timestamp": timestamp} if summary_result != "success": # Send error was happened, rewrite status according to LU-1013 status_dict["reason"] = summary_result status_dict["result"] = SummaryStatus.FAILED write_statistics_send_status_to_file(status_dict) def _send_statistics_and_save_status(self, summary: Dict[AnyStr, int], url: AnyStr, save_status: bool) -> None: """ Send statistics data to server and save status to file """ timestamp = int(time.time()) summary["timestamp"] = timestamp s_result = self._send_statistics(summary, url=url) result = {"result": s_result, "timestamp": timestamp} print_dictionary(result, True) if save_status: self._save_status( timestamp, s_result, ) def _get_max_memory(self, running_time): time_minutes = running_time / 60 if time_minutes < 1: return None cmd = [ "/usr/sbin/lveinfo", "--json", "--id", str(self._opts["--lve-id"]), "--show-columns", "mPMem", "--period", f"{int(math.ceil(time_minutes))}m", ] try: rc, json_str, _ = run_command(cmd, return_full_output=True) except ExternalProgramFailed as e: app_logger.warning("Unable to run lveinfo, error: %s", e) return None if rc == 0: parsed_data = json.loads(json_str) try: return max(x["mPMem"] for x in parsed_data["data"]) except (ValueError, KeyError): return None app_logger.error("lveinfo failed with exit code: %i, output: %s", rc, json_str) return None @staticmethod def _send_statistics(data, url): """ Sends statistics to server :param data: Statistics data dict :return: string - message for JSON 'result' key """ out_message = "success" try: message = f"Sending statictics to server {url}" app_logger.info(message) expected_err = requests.RequestException("Unknown exception while sending statistics") for i in range(5): try: response = requests.post(url, json=data, timeout=60) except requests.ConnectionError as err: expected_err = err time.sleep(4**i) else: break else: raise expected_err if response.status_code == 200: app_logger.info("Sending statictics OK") else: out_message = f"Server answer is: HTTP code {response.status_code}; Reason: {response.reason}" app_logger.info(out_message) except requests.RequestException as err: out_message = str(err) app_logger.error(out_message) return out_message def website_collector(self): """ Collect data via WebsiteCollector. """ # Check if system_id is set if not self.system_id: app_logger.debug("Website collector is disabled because system_id is not set") return None # Check if panel is cPanel if detect.getCPName() != cpapi.CPANEL_NAME: app_logger.debug("Website collector is disabled for non-cPanel") return None client_data = get_client_data_from_jwt_token(check_expiration=False) # Check if website collector is enabled via remote settings website_collector_status = self._website_collector_status(client_data) app_logger.info("Website collector status: %s", str(website_collector_status)) if not website_collector_status.get("decision") == "on": app_logger.debug("Website collector is disabled by remote settings") return None # Check if website collector is disabled via opt-out file if WebsiteCollector.is_collection_disabled(): app_logger.debug("Website collector is disabled via opt-out file") return None try: max_sites = website_collector_status.get("domains_limit") collector = WebsiteCollector(logger=app_logger, max_sites=max_sites) collector.send_data( system_id=self.system_id or UNKNOWN_RHN_ID, client_id=None if client_data is None else client_data.get("client_id", None), platform=self._get_platform(), panel="wp2" if is_wp2_environment() else detect.getCPName(), server=self._get_control_panel_apache(), api_url=self.WEBSITE_COLLECTOR_URL, remote_config_version=website_collector_status.get("config_version", "unknown") ) except Exception as e: app_logger.error("Website collector failed: %s", e) return None def _get_summary(self): result = {"version": 1, "timestamp": time.time()} self._prepare_cl_normal_statistics() self.statistics.update(result) return result def _get_rpm_packages_summary(self): result = {"version": 1, "timestamp": time.time()} self._fill_dict_with_rpm_packages_statistics() self.statistics.update(result) return result @staticmethod def _get_panel_version() -> str: """ Get version of control panel """ detect.getCP() return detect.CP_VERSION def _fill_mysql_governor_statistics(self) -> None: """ Fill dict with statistics by statistics about MySQL governor """ success, mysql_gov_mode = self.statistics.evaluate_safe( db_governor_lib.get_gov_mode_operation, "Can't get MySQL governor mode", ) if not success: self.statistics["mysql_governor_status"] = ERROR elif mysql_gov_mode is not None: self.statistics["mysql_governor_mode"] = mysql_gov_mode self.statistics.add_metric( lambda: MySQLGovernor().get_governor_version(), "str", "mysql_governor_version", "Can't get MySQL governor version", ) self.statistics.add_metric( lambda: MySQLGovernor().get_governor_status()[0], "str", "mysql_governor_status", "Can't get MySQL governor status", ) else: self.statistics["mysql_governor_status"] = NOT_INSTALLED def _fill_control_panel_statistics(self) -> None: """ Fill dict with statistics by statistics about control panel """ self.statistics.add_metric( detect.getCPName, "str", "control_panel_name", "Can't get control panel name", ) self.statistics.add_metric( lambda: [name for name, is_supported in cpapi.get_supported_cl_features().items() if is_supported], "str_list", "supported_cl_features", "Can't get list of supported cl features by control panel", ) self.statistics.add_metric( self._get_panel_version, "str", "control_panel_version", "Can't get control panel version", ) # control_panel_apache metric depends on control_panel_name metric self.statistics.add_metric( self._get_control_panel_apache, "str", "control_panel_apache", "Can't get control panel apache", ) self.statistics.add_metric(is_nginx_running, "int", "nginx_running", "Can't get control panel nginx") self.statistics.add_metric(lambda: len(cpapi.cpusers()), "int", "users_total", "Can't get amount of users") if detect.getCPName() == cpapi.PLESK_NAME: self.statistics.add_metric( self._is_installed_via_plesk_ext, "int", "installed_via_plesk_ext", "Can't determine if CL was installed from Plesk Extension", ) self.statistics.add_metric( self._is_login_via_whmcs_in_use, "int", "login_via_whmcs_in_use", "Can't determine if Login via WHMCS in use", ) self.statistics.add_metric( is_wp2_environment, "int", "is_wp2_environment", "Can't if that is WP2 environment" ) @staticmethod def _is_login_via_whmcs_in_use(): """ Determine whether the login method is via WHMCS """ status = -1 # Not used if detect.getCPName() != cpapi.CPANEL_NAME: return status command = ["/usr/local/cpanel/bin/whmapi1", "get_available_authentication_providers", "--output=jsonpretty"] result = subprocess.run(command, capture_output=True, text=True, check=True) data = json.loads(result.stdout) if "data" in data and "providers" in data["data"]: for provider in data["data"]["providers"]: if provider["provider_name"] == "whmcs": status = 0 # Disabled # If at least one checkbox is enabled in the settings of the login module via WHMCS, # then WHMCS is used if any( [ provider.get("whostmgrd_enabled", False), provider.get("cpaneld_enabled", False), provider.get("webmaild_enabled", False), ] ): status = 1 # Enabled break return status @staticmethod def _is_installed_via_plesk_ext(): """ cldeploy-plesk-ext.log is always created by cldeploy-precheck when it downloads the cldeploy script, if it exists it means there was an attempt to launch cldeploy from the Plesk Extension. Since this function is in package lve-utils, which depends on lve, it means that the current OS is CloudLinux, therefore the conversion was successful. Possible false positives: - if there was failed attempt to convert os from Plesk Ext, and then it was successfully converted after manual run of cldeploy, it will be considered as CLOS installed via Plesk extension - if user has CL already installed and goes to CLOS Plesk Ext and tries to run conversion from there Possible false negatives: - if /var/log/cldeploy-plesk-ext.log was deleted or renamed/archived (e.g. via logrotate) """ result = False ext_log = "/var/log/cldeploy-plesk-ext.log" if os.path.exists(ext_log): result = True return result def _get_control_panel_apache(self): """ Wrapper to retrieve control panel Apache version: EA3 or EA4 for cPanel, native otherwise :return: EA3|EA4|native """ if is_litespeed_running(): if detect.detect_enterprise_litespeed(): result = "litespeed" elif detect.detect_open_litespeed(): result = "openlitespeed" else: # There is no LS config found result = "unknown_litespeed" elif self.statistics["control_panel_name"] == "cPanel": result = "EA4" if detect.is_ea4() else "EA3" else: result = "native" return result @staticmethod def _cagefs_status_wrapper(): """ Wrapper to convert internal values from cagefs_statistics_config.get_cagefs_status function to values for statistics :return: """ cagefs_status = cagefs_statistics_config.get_cagefs_status() if cagefs_status is None: return cagefs_status cagefs_status_map = { cagefs_statistics_config.CAGEFS_STATUS_NOT_INSTALLED: NOT_INSTALLED, cagefs_statistics_config.CAGEFS_STATUS_NOT_INITIALIZED: NOT_INITIALIZED, "Enabled": ENABLED, "Disabled": DISABLED, } return cagefs_status_map.get(cagefs_status, "Unknown") def _fill_cagefs_statistics(self) -> None: """ Fill dict with statistics by statistics about CageFS """ self.statistics.add_metric( self._cagefs_status_wrapper, "str", "cagefs_status", "Can't get CageFS status", ) if self.statistics["cagefs_status"] in [NOT_INSTALLED, NOT_INITIALIZED]: self.statistics["cagefs_user_mode"] = None else: self.statistics.add_metric( cagefs_statistics_config.get_cagefs_user_mode, "str", "cagefs_user_mode", "Can't get CageFS user mode", ) self.statistics.add_metric( partial(cagefs_statistics_config.get_quantity, True), "str", "cagefs_enabled_quantity", "Can't get quantity of users with enabled CageFS", ) self.statistics.add_metric( partial(cagefs_statistics_config.get_quantity, False), "str", "cagefs_disabled_quantity", "Can't get quantity of users with disabled CageFS", ) def _get_amount_of_endusers_under_resellers(self) -> Optional[int]: """ Get amount of end-users which belong to active resellers """ try: lvp_count = Counter(lvp for _, lvp in self.lvpmap.lve_lvp_pairs() if lvp > 0) except cpapi.NotSupported: return None enabled_lvp_id = set(self.lvpmap.name_map.id_list()) return sum(lvp_id in enabled_lvp_id for lvp_id in lvp_count.elements()) def _get_total_amount_of_endusers(self) -> Optional[int]: """ Get total amount of end-users """ try: lvp_count = Counter(lvp for _, lvp in self.lvpmap.lve_lvp_pairs() if lvp > 0) except cpapi.NotSupported: return None return sum(lvp_count.values()) @staticmethod def _get_amount_of_resellers() -> Optional[int]: """ Get amount of resellers """ try: return len(cpapi.resellers()) except cpapi.NotSupported: pass def _fill_resellers_statistics(self) -> None: """ Fill dict with statistics by varied statistics about resellers """ self.statistics.add_metric( lvectllib.lve.is_lve10, "int", "reseller_limits_supported_kernel", "Can't detect status of support reseller limits by kernel", ) self.statistics.add_metric( lvectllib.lve.is_panel_supported, "int", "reseller_limits_supported_control_panel", "Can't detect status of support reseller limits by control panel", ) self.statistics.add_metric( lvectllib.lve.reseller_limit_supported, "int", # name of metric means that reseller limits is supported by all sides: # kmod-lve, liblve, /proc/lve, control panel "reseller_limits_enabled", "Can't detect status of support of reseller limits", ) self.statistics.add_metric( self._get_amount_of_resellers, "int", "resellers_total", "Can't get total amount of resellers", ) self.statistics.add_metric( self._get_amount_of_endusers_under_resellers, "int", "resellers_endusers_under_reseller_limits", "Can't get amount of end-users which belong to active resellers", ) self.statistics.add_metric( self._get_total_amount_of_endusers, "int", "resellers_endusers_total", "Can't get total amount of end-users", ) self.statistics.add_metric( lambda: self._get_resellers_with_faults(), "int", "resellers_with_faults", "Can't get amount of resellers with faults for the past 24h", ) if self.statistics["reseller_limits_enabled"]: self.statistics.add_metric( lambda: len(list(lvectllib.lvp_list())), "int", "resellers_active", "Can't get amount of active resellers", ) else: self.statistics["resellers_active"] = None self.statistics["resellers_endusers_under_reseller_limits"] = None self.statistics["resellers_with_faults"] = None def _fill_default_limits_statistics(self, xml_cfg_provider: LimitsDataStorage) -> None: """ Fill dict with statistics by statistics about default limits """ self.statistics.add_metric( partial( self._cpu_limit_to_percents, xml_cfg_provider.defaults[DEFAULTS].cpu, xml_cfg_provider.defaults[DEFAULTS].ncpu, ), "int", "default_limit_speed", "Can't get default speed limit", ) self.statistics.add_metric( partial(self._get_cpu_limit_units, xml_cfg_provider.defaults[DEFAULTS].cpu), "str", "default_limit_cpu_origin_units", "Can't get cpu origin units of default limit", ) self.statistics.add_metric( lambda: xml_cfg_provider.defaults[DEFAULTS].ncpu, "int", "default_limit_ncpu", "Can't get default ncpu limit", ) self.statistics.add_metric( lambda: xml_cfg_provider.defaults[DEFAULTS].io, "int", "default_limit_io", "Can't get default io limit", ) self.statistics.add_metric( lambda: xml_cfg_provider.defaults[DEFAULTS].nproc, "int", "default_limit_nproc", "Can't get default nproc limit", ) self.statistics.add_metric( lambda: xml_cfg_provider.defaults[DEFAULTS].ep, "int", "default_limit_ep", "Can't get default ep limit", ) self.statistics.add_metric( lambda: xml_cfg_provider.defaults[DEFAULTS].iops, "int", "default_limit_iops", "Can't get default iops limit", ) self.statistics.add_metric( partial(self._mempages_to_mb, xml_cfg_provider.defaults[DEFAULTS].vmem), "int", "default_limit_vmem_mb", "Can't get default vmem limit", ) self.statistics.add_metric( partial(self._mempages_to_mb, xml_cfg_provider.defaults[DEFAULTS].pmem), "int", "default_limit_pmem_mb", "Can't get default pmem limit", ) def _fill_other_limits_statistics(self, xml_cfg_provider: LimitsDataStorage) -> None: """ Fill dict with statistics by other statistics about limits: packages_total, users_total, amount users/packages with custom limits """ self.statistics.add_metric( lambda: len(xml_cfg_provider.packages), "int", "packages_total", "Can't get total amount of packages", ) self.statistics.add_metric( lambda: len(xml_cfg_provider.get_packages_with_custom_limits()), "int", "packages_with_custom_limits", "Can't get amount of packages with custom limits", ) self.statistics.add_metric( lambda: len(xml_cfg_provider.get_users_with_custom_limits()), "int", "users_with_custom_limits", "Can't get amount of users with custom limits", ) self.statistics.add_metric( self._get_users_with_faults, "int", "users_with_faults", "Can't get amount of users with faults for the past 24h", ) def _fill_top_packages_statistics(self, xml_cfg_provider: LimitsDataStorage) -> None: """ Fill dict with statistics by statistics about top packages on server """ # pylint: disable=cell-var-from-loop for i in range(1, 4): success, top_result = self.statistics.evaluate_safe( partial(self._get_top_package_by_number_of_users, i, xml_cfg_provider), f"Can't get top {i} package by users", ) # Break cycle if result is None, # beacuse package with number more than i doesn't exists if not success or top_result is None: break # getting of that metric (and a few metrics below) is wrapped by `format_metric`, # because value of `top_result` can be -42 in case exception while calling # method `_get_top_package_by_number_of_users` self.statistics.add_metric( lambda: top_result[1].name, "str", f"top_{i}_package_name", log_message=None ) self.statistics.add_metric( lambda: int(top_result[0]), "int", f"top_{i}_package_users_num", log_message=None ) self.statistics.add_metric( lambda: self._cpu_limit_to_percents( (top_result[1].limits or EMPTY_LIMITS).cpu, (top_result[1].limits or EMPTY_LIMITS).ncpu, ), "int", f"top_{i}_package_limit_speed", log_message=None ) self.statistics.add_metric( lambda: self._get_cpu_limit_units( (top_result[1].limits or EMPTY_LIMITS).cpu, ), "str", f"top_{i}_package_limit_cpu_origin_units", log_message=None ) self.statistics.add_metric( lambda: (top_result[1].limits or EMPTY_LIMITS).ncpu, "int", f"top_{i}_package_limit_ncpu", log_message=None ) self.statistics.add_metric( lambda: (top_result[1].limits or EMPTY_LIMITS).io, "int", f"top_{i}_package_limit_io", log_message=None ) self.statistics.add_metric( lambda: (top_result[1].limits or EMPTY_LIMITS).nproc, "int", f"top_{i}_package_limit_nproc", log_message=None ) self.statistics.add_metric( lambda: (top_result[1].limits or EMPTY_LIMITS).ep, "int", f"top_{i}_package_limit_ep", log_message=None ) self.statistics.add_metric( lambda: (top_result[1].limits or EMPTY_LIMITS).iops, "int", f"top_{i}_package_limit_iops", log_message=None ) self.statistics.add_metric( lambda: self._mempages_to_mb((top_result[1].limits or EMPTY_LIMITS).vmem), "int", f"top_{i}_package_limit_vmem_mb", log_message=None ) self.statistics.add_metric( lambda: self._mempages_to_mb((top_result[1].limits or EMPTY_LIMITS).pmem), "int", f"top_{i}_package_limit_pmem_mb", log_message=None ) # pylint: enable=cell-var-from-loop def _fill_limits_statistics(self) -> None: """ Fill dict with statistiscs by varied statistics about limits """ xml_cfg_provider = LimitsDataStorage() self._fill_default_limits_statistics(xml_cfg_provider) self._fill_other_limits_statistics(xml_cfg_provider) self._fill_top_packages_statistics(xml_cfg_provider) def _fill_lsapi_statistics(self) -> None: """ Fill dict with statistics by statistics about mod_lsapi """ # ignore success flag here because we mute error messages below # using log_message=None success, raw_lsapi_info = self.statistics.evaluate_safe( self.get_raw_lsapi_info, "Can't get raw mod_lsapi info", ) if not success: self.statistics["lsapi_mod_status"] = ERROR elif raw_lsapi_info is not None: self.statistics.add_metric( lambda: raw_lsapi_info["criu"]["status"], "str", "lsapi_criu_service_status", log_message=None ) self.statistics.add_metric( lambda: raw_lsapi_info["criu"]["version"], "str", "lsapi_criu_service_version", log_message=None ) self.statistics.add_metric( lambda: raw_lsapi_info["lsapiConf"]["lsapi_criu"], "str", "lsapi_option_criu", log_message=None ) self.statistics.add_metric( lambda: raw_lsapi_info["lsapiConf"]["lsapi_with_connection_pool"], "str", "lsapi_option_connection_pool", log_message=None ) self.statistics.add_metric( lambda: raw_lsapi_info["libVersion"], "str", "lsapi_lib_version", log_message=None ) self.statistics.add_metric( lambda: raw_lsapi_info["modStatus"], "str", "lsapi_mod_status", log_message=None ) self.statistics.add_metric( lambda: raw_lsapi_info["modVersion"], "str", "lsapi_mod_version", log_message=None ) self.statistics.add_metric( lambda: raw_lsapi_info["totalDomain"], "int", "lsapi_total_domain_count", log_message=None ) self.statistics.add_metric( lambda: raw_lsapi_info["domainStat"], "int_dict", "lsapi_domain_stat", log_message=None ) self.statistics.add_metric( lambda: raw_lsapi_info["controlPanel"], "str", "lsapi_apache_environment", log_message=None ) else: self.statistics["lsapi_mod_status"] = NOT_INSTALLED def _fill_wmt_settings_statistics(self): self.statistics.add_metric( self._get_wmt_api_config, "str_dict", "wmt_config", "Cant get status of wmt_api_config" ) def _fill_wp_statistics(self): try: all_wp_paths, paths_with_wpos_object_cache, wp_php_versions = self._prepare_wp_statistics() except Exception as e: app_logger.error("Error while getting WordPress statistics %s", str(e)) all_wp_paths, paths_with_wpos_object_cache, wp_php_versions = [], [], {} without_wpos_object_cache = list(set(all_wp_paths) - set(paths_with_wpos_object_cache)) try: object_cache_plugins, advanced_cache_plugins = self._prepare_wp_plugins_statistics( without_wpos_object_cache ) except Exception as e: app_logger.error("Error while getting WordPress plugins statistics %s", str(e)) object_cache_plugins, advanced_cache_plugins = {}, {} if paths_with_wpos_object_cache: object_cache_plugins.update({"cloudlinux_wpos_object_cache": len(paths_with_wpos_object_cache)}) self.statistics.add_metric( lambda: len(all_wp_paths), "int", "wordpress_installed_total", "Can't get total count of installed wordpress", ) self.statistics.add_metric( lambda: object_cache_plugins, "int_dict", "wordpress_object_cache_plugins", "Can't get WP object cache plugins info", ) self.statistics.add_metric( lambda: advanced_cache_plugins, "int_dict", "wordpress_advanced_cache_plugins", "Can't get WP advanced cache plugins info", ) self.statistics.add_metric( lambda: sum(object_cache_plugins.values()), "int", "wordpress_object_cache_plugins_total", "Can't get WP object cache total plugins info", ) self.statistics.add_metric( lambda: sum(advanced_cache_plugins.values()), "int", "wordpress_advanced_cache_plugins_total", "Can't get WP advanced cache total plugins info", ) self.statistics.add_metric( lambda: wp_php_versions, "int_dict", "wordpress_php_versions", "Can't get WP php version info" ) def _fill_per_user_wp_statistics(self): try: users_with_wp, users_with_wp_and_litespeed = self._prepare_per_user_wp_statistics() except Exception as e: app_logger.error("Error while getting per-user WordPress statistics %s", str(e)) users_with_wp = [] users_with_wp_and_litespeed = [] self.statistics.add_metric( lambda: len(users_with_wp), "int", "users_count_with_wordpress_installed", "Can't get total count of users with wordpress installed", ) self.statistics.add_metric( lambda: len(users_with_wp_and_litespeed), "int", "users_count_with_wordpress_and_lscache_installed", "Can't get total count of users with wordpress and LiteSpeed installed", ) def _prepare_wp_plugins_statistics(self, wp_paths): """ Prepares dict with plugin-counter pairs e.g: {'w3-total-cache': 4, 'redis': 1, 'ls': 2} """ object_cache_plugins = self._get_plugin_info("object-cache", wp_paths) advanced_cache_plugins = self._get_plugin_info("advanced-cache", wp_paths) return object_cache_plugins, advanced_cache_plugins @staticmethod def _get_plugin_info(plugin_type: str, wp_paths: List): object_cache_plugins = list(filter(None, [get_wp_cache_plugin(path, plugin_type) for path in wp_paths])) object_plugin_counter_pairs = defaultdict(int) for plugin in object_cache_plugins: object_plugin_counter_pairs[plugin] += 1 return object_plugin_counter_pairs def _prepare_wp_statistics(self) -> Tuple[List[Path], List[Path], Dict[str, int]]: """ Return Wordpress absolute paths, paths with install WPOS object cache module and counter of php versions that are used by these Wordpresses. """ all_wp_paths, paths_with_enabled_wpos_object_cache = [], [] php_versions = defaultdict(int) panel_users = cpapi.cpusers() for user in panel_users: try: domains_info = cpapi.userdomains(user) except Exception: continue docroot_domains_map = defaultdict(list) for domain, docroot in domains_info: if os.path.exists(docroot): docroot_domains_map[docroot].append(domain) for docroot in docroot_domains_map: docroot_info = self._get_docroot_wp_info(user, docroot, docroot_domains_map) docroot_wp_paths = docroot_info["wp_paths"] docroot_php_version = docroot_info["php_version"] all_wp_paths.extend(docroot_wp_paths) paths_with_enabled_wpos_object_cache.extend(docroot_info["wp_paths_with_enabled_wpos_object_cache"]) if docroot_wp_paths and docroot_php_version != "undefined": php_versions[docroot_php_version] += len(docroot_wp_paths) return all_wp_paths, paths_with_enabled_wpos_object_cache, php_versions def _prepare_per_user_wp_statistics(self) -> Tuple[set[str], set[str]]: """ Return Wordpress absolute paths, paths with install LiteSpeed cache plugin. """ users_with_wp, users_with_wp_and_litespeed = set(), set() panel_users = cpapi.cpusers() for user in panel_users: try: domains_info = cpapi.userdomains(user) except Exception as e: app_logger.error("Can't get user's domains info, error: %s", str(e)) continue docroot_domains_map = defaultdict(list) for domain, docroot in domains_info: if os.path.exists(docroot): docroot_domains_map[docroot].append(domain) for docroot in docroot_domains_map: docroot_info = self._get_docroot_wp_info(user, docroot, docroot_domains_map) if len(docroot_info["wp_paths"]) > 0: users_with_wp.add(user) if len(docroot_info["wp_paths_with_enabled_litespeed_cache"]) > 0: users_with_wp_and_litespeed.add(user) return users_with_wp, users_with_wp_and_litespeed def _get_docroot_wp_info( self, user: str, docroot: str, docroot_domains_map: Dict[str, str] ) -> Dict[str, Union[List, str]]: """ Return data about docroot's WP paths and used php versions. """ docroot_info = { "wp_paths": [], "wp_paths_with_enabled_wpos_object_cache": [], "wp_paths_with_enabled_litespeed_cache": set(), "php_version": "undefined", } # filter found WP paths that are the same as other docroots (not current checking) # because all docroots will be checked later anyway wp_paths = [ wp_path for wp_path in find_wp_paths(docroot) if (os.path.join(docroot, wp_path) == docroot or os.path.join(docroot, wp_path) not in docroot_domains_map) ] # if WP paths were not found we are not detecting php version # and path with cloudlinux object_cache module if not wp_paths: return docroot_info docroot_info["wp_paths"] = [Path(docroot).joinpath(path) for path in wp_paths] try: docroot_info["wp_paths_with_enabled_wpos_object_cache"] = [ Path(docroot).joinpath(path) for path in get_wp_paths_with_enabled_module(user, wp_paths) if path is not None ] except Exception: app_logger.exception("Can't get WordPress sites with enabled Object Cache module") for path in docroot_info["wp_paths"]: try: object_cache_plugin = get_wp_cache_plugin(path, "object-cache") except Exception: app_logger.exception("Can't get Object Cache plugin for Wordpress site") object_cache_plugin = None if object_cache_plugin is not None and "litespeed" in object_cache_plugin: docroot_info["wp_paths_with_enabled_litespeed_cache"].add(path) try: advanced_cache_plugin = get_wp_cache_plugin(path, "advanced-cache") except Exception: app_logger.exception("Can't get Advanced Cache plugin for Wordpress site") advanced_cache_plugin = None if advanced_cache_plugin is not None and "litespeed" in advanced_cache_plugin: docroot_info["wp_paths_with_enabled_litespeed_cache"].add(path) for domain in docroot_domains_map[docroot]: php_version = self._get_php_version_for_domain(domain) if php_version != "undefined": docroot_info["php_version"] = php_version break return docroot_info def _get_php_version_for_domain(self, domain: str) -> str: """ Return php version that is used for specified domain. Determine whether of MultiPHP Manager or PHP Selector is used. """ if detect.getCPName() not in (cpapi.CPANEL_NAME, cpapi.DIRECTADMIN_NAME, cpapi.PLESK_NAME): return "undefined" system_default_php_version, domains_php_info, selector_php_info = self._get_system_php_info() # get version that is shown in MultiPHP Manager php_info = domains_php_info.get(domain, {}) multi_php_version = php_info.get("php_version_id", "undefined") # in cPanel when PHP Selector is used for domain, domain's php version # in MultiPHP Manager becomes equal to system's default php version # if not equal => PHP Selector is not used if detect.getCPName() == cpapi.CPANEL_NAME and system_default_php_version != multi_php_version: return multi_php_version # trying to get version from PHP Selector info # if not succeeded => user uses MultiPHP for version, domains in selector_php_info.items(): if domain in domains and version != "native": alt_php_version = f'alt-php{version.replace(".", "")}' return alt_php_version if detect.getCPName() == cpapi.DIRECTADMIN_NAME: da_php_version = f'da-php{multi_php_version.replace(".", "")}' return da_php_version if detect.getCPName() == cpapi.PLESK_NAME: handler = php_info.get("handler_type", "") php_version = multi_php_version[: -len(handler)].strip("-") return php_version or "undefined" return multi_php_version @staticmethod @lru_cache(maxsize=None) def _get_system_php_info(): """ Return info about: - system_php_version (implemented only on cPanel) - domains_php_info (php versions and handlers turned on in MultiPHP) - selector_php_info (php version used by panel users via PHP Selector) """ try: system_php_version = cpapi.get_system_php_info()["default_version_id"] except Exception: system_php_version = None domains_php_info = cpapi.get_domains_php_info() php_selector_usage = get_php_selector_usage() if php_selector_usage is not None: selector_php_info = php_selector_usage["domains_by_php_version"] else: selector_php_info = {} return system_php_version, domains_php_info, selector_php_info def _fill_ssa_statistics(self) -> None: """ Fill dict with SSA statistics """ success, ssa_stats = self.statistics.evaluate_safe( self.get_ssa_stats, "Can't get SSA statistics", ) if not success: self.statistics["ssa_status"] = ERROR return if ssa_stats is not None: self.statistics.add_metric(lambda: ssa_stats["config"], "str_dict", "ssa_config", "Can't get SSA config") self.statistics.add_metric( lambda: ssa_stats["version"], "str", "ssa_version", "Can't get version of SSA" ) self.statistics.add_metric(lambda: ssa_stats["status"], "str", "ssa_status", "Can't get state of SSA") self.statistics.add_metric( lambda: ssa_stats["agent_status"], "str", "ssa_agent_status", "Can't get state of SSA agent" ) self.statistics.add_metric( lambda: ssa_stats["autotracing"]["status"], "str", "autotracing_status", "Can't get state of SSA Autotracing", ) self.statistics.add_metric( lambda: ssa_stats["autotracing"]["disabled_users_quantity"], "int", "autotracing_disabled_users_quantity", "Can't get state of SSA Autotracing disabled users", ) self.statistics.add_metric( lambda: ssa_stats["autotracing"]["rules_version"], "str", "autotracing_rules_version", "Can't get SSA Autotracing rules version", ) self.statistics.add_metric( lambda: ssa_stats["autotracing"]["urls_processed"], "int", "autotracing_urls_processed", "Can't get SSA Autotracing processed URLs count", ) self.statistics.add_metric( lambda: ssa_stats["autotracing"]["urls_selected"], "int", "autotracing_urls_selected", "Can't get SSA Autotracing selected URLs count", ) self.statistics.add_metric( lambda: ssa_stats["autotracing"]["rejects"], "int_dict", "autotracing_rejects", "Can't get SSA Autotracing rejects", ) else: # no SSA external utility to get statistics -- SSA is not installed self.statistics["ssa_status"] = NOT_INSTALLED def _fill_php_selector_statistics(self) -> None: """ Fill dict with statistics by varied statistics about PHP selector """ success, php_interpreters = self.statistics.evaluate_safe( lambda: get_versions_statistics("php"), "Can't get statistics about PHP interpreters" ) if not success: self.statistics["selector_php_status"] = ERROR elif php_interpreters is not None: self.statistics.add_metric( partial(self._get_status_of_selector, "php"), "str", "selector_php_status", "Can't get status of PHP selector", ) self.statistics.add_metric( partial(self._get_list_versions_of_interperters, php_interpreters, INSTALLED), "str_list", "selector_php_versions_installed", log_message=None ) self.statistics.add_metric( partial(self._get_list_versions_of_interperters, php_interpreters, ENABLED), "str_list", "selector_php_versions_enabled", log_message=None ) self.statistics.add_metric( get_default_php_version, "str", "selector_php_version_default", "Can't get default version of PHP interpreter", ) self.statistics.add_metric( get_native_version_safe, "str", "selector_php_version_native", "Can't get native version of PHP interpreter", ) self.statistics.add_metric( CloudlinuxSelectorLib("php").php_selector_is_enabled, "int", "selector_php_enabled_ui", "Can't get state of UI of PHP selector", ) self.statistics.add_metric( get_mode_of_php_selector, "str", "selector_php_mode", "Can't get mode of PHP selector", ) success, php_usage_summary = self.statistics.evaluate_safe( get_php_selector_usage, "Can't get summary usage of PHP selector", ) if not success or php_usage_summary is None: self.statistics["selector_php_num_domains_by_interpreter"] = None self.statistics["selector_php_num_users_by_interpreter"] = None else: self.statistics.add_metric( lambda: {v: len(domains) for v, domains in php_usage_summary["domains_by_php_version"].items()}, "int_dict", "selector_php_num_domains_by_interpreter", log_message=None ) self.statistics.add_metric( lambda: {v: len(domains) for v, domains in php_usage_summary["users_by_php_version"].items()}, "int_dict", "selector_php_num_users_by_interpreter", log_message=None ) else: self.statistics["selector_php_status"] = NOT_INSTALLED @staticmethod def _get_average_apps_per_domain(total_apps: int, amount_of_apps_per_domain: int) -> Optional[int]: """ Get average amount of applications per domain :param total_apps: total amount of applications :param amount_of_apps_per_domain: amount of applications per domain """ if total_apps < 1 or amount_of_apps_per_domain < 1: return None return total_apps // amount_of_apps_per_domain @staticmethod def _get_average_apps_per_user(total_apps: int, amount_of_apps_per_user: int) -> Optional[int]: """ Get average amount of applications per user :param total_apps: total amount of applications :param amount_of_apps_per_user: amount of applications per user """ if total_apps < 1 or amount_of_apps_per_user < 1: return None return total_apps // amount_of_apps_per_user @staticmethod def _get_amount_of_runned_apps(apps: list) -> int: """ Get amount of running applications on server :param apps: list of applications for something selector """ return len([app for app in apps if app.app_status and app.app_status == "started"]) @staticmethod def _get_max_apps_per_domain(apps: list) -> int: """ Get maximum amount of applications per domain :param apps: list of applications for something selector """ apps_per_domain = Counter() for app in apps: apps_per_domain[app.doc_root] += 1 # [(doc_root, amount_of_apps_per_domain,),] # We should return 0 if counter is empty most_commons = apps_per_domain.most_common(1) or [(0, 0)] return most_commons[0][1] @staticmethod def _get_max_apps_per_user(apps: list) -> int: """ Get maximum amount of applications per user :param apps: list of applications for something selector """ apps_per_user = Counter() for app in apps: apps_per_user[app.user] += 1 # [(user, amount_of_apps_per_user,),] # We should return 0 if counter is empty most_commons = apps_per_user.most_common(1) or [(0, 0)] return most_commons[0][1] @staticmethod def _get_counter_apps_per_version(apps: list) -> Counter: """ Get Counter object which contains amount applications per version of interpreter :param apps: list of applications for something selector """ apps_per_version = Counter() for app in apps: apps_per_version[app.version] += 1 return apps_per_version def _get_max_apps_per_version(self, apps: list) -> int: """ Get maximum amount of applications per version of interpreter :param apps: list of applications for something selector """ apps_per_version = self._get_counter_apps_per_version(apps) # [(version, amount_of_apps_per_version,),] # We should return 0 if counter is empty most_commons = apps_per_version.most_common(1) or [(0, 0)] return most_commons[0][1] @staticmethod def _get_amount_of_domains_with_apps(apps: list) -> int: """ Get amount of domains with applications :param apps: list of applications for something selector """ domains = set() for app in apps: domains.add(app.doc_root) return len(domains) @staticmethod def _get_amount_of_users_with_apps(apps: list) -> int: """ Get amount of users with applications :param apps: list of applications for something selector """ users = set() for app in apps: users.add(app.user) return len(users) def _get_amount_of_apps_per_each_version_of_interpreters(self, apps: list) -> dict: """ Get amount of applications per each version of interpeters :param apps: list of applications for something selector """ apps_per_version = self._get_counter_apps_per_version(apps) return dict(apps_per_version) @staticmethod def _get_list_versions_of_interperters(interpreters_stats: dict, state: str) -> list[str]: """ Get list of versions of interpreters on server :param interpreters_stats: dict with varied statistics about each version of interpeters :param state: state of interpeters (installed, enabled) """ return [interpreter_stats for interpreter_stats, stat in interpreters_stats.items() if stat[state]] @staticmethod def _get_list_of_applications(interpreter: str) -> list: """ Get list of apllications on server for defined selector """ iter_apps = iter_server_applications(interpreter) if iter_apps is not None: return list(iter_apps) return [] def _fill_selectors_statistics(self) -> None: """ Fill dict with statistics by varied statistics about ruby/nodejs/python selectors """ for selector in self.SELECTORS: success, interpreters_stats = self.statistics.evaluate_safe( partial(get_versions_statistics, selector), f"Can't get statistics about {selector} interpreters", ) if interpreters_stats is None: self.statistics["selector_" + selector + "_status"] = NOT_INSTALLED continue if not success: self.statistics["selector_" + selector + "_status"] = ERROR continue self.statistics.add_metric( partial(self._get_status_of_selector, selector), "str", "selector_" + selector + "_status", f"Can't get status of {selector} selector" if success else None, ) self.statistics.add_metric( partial(self._get_list_versions_of_interperters, interpreters_stats, INSTALLED), "str_list", "selector_" + selector + "_versions_installed", log_message=None ) self.statistics.add_metric( partial(self._get_list_versions_of_interperters, interpreters_stats, ENABLED), "str_list", "selector_" + selector + "_versions_enabled", log_message=None ) success, interpreter_apps = self.statistics.evaluate_safe( partial(self._get_list_of_applications, selector), f"Can't get list of {selector} applications", ) # Following statistics evaluation does not make sense because # data that we are processing is wrong. Report to sentry and wait for fix. if not success: continue self.statistics.add_metric( lambda: len(interpreter_apps), # pylint: disable=cell-var-from-loop "int", "selector_" + selector + "_applications_amount", log_message=None ) self.statistics["selector_" + selector + "_used"] = ( self.statistics["selector_" + selector + "_applications_amount"] > 0 ) self.statistics.add_metric( partial(self._get_amount_of_runned_apps, interpreter_apps), "int", "selector_" + selector + "_applications_running", log_message=None ) success, default_version_of_selector = self.statistics.evaluate_safe( partial(get_default_version, selector), f"Can't get default version of {selector} selector", ) if success and default_version_of_selector is not None: self.statistics["selector_" + selector + "_default_version"] = default_version_of_selector self.statistics.add_metric( partial(self._get_max_apps_per_domain, interpreter_apps), "int", "selector_" + selector + "_max_applications_per_domain", f"Can't get max applications per domain for {selector} interpreter", ) self.statistics.add_metric( partial(self._get_max_apps_per_user, interpreter_apps), "int", "selector_" + selector + "_max_applications_per_user", f"Can't get max applications per user for {selector} interpreter", ) self.statistics.add_metric( partial(self._get_amount_of_users_with_apps, interpreter_apps), "int", "selector_" + selector + "_num_users_with_apps", f"Can't get amount of users with applications for {selector} interpeter", ) self.statistics.add_metric( partial(self._get_amount_of_domains_with_apps, interpreter_apps), "int", "selector_" + selector + "_num_domains_with_apps", f"Can't get amount of domains with applications for {selector} interpeter", ) success, average_apps_per_domain = self.statistics.evaluate_safe( partial( self._get_average_apps_per_domain, self.statistics["selector_" + selector + "_applications_amount"], self.statistics["selector_" + selector + "_num_domains_with_apps"], ), f"Can't get average amount of applications per domain for {selector} interpreter", ) if success and average_apps_per_domain is not None: self.statistics["selector_" + selector + "_average_applications_per_domain"] = average_apps_per_domain success, average_apps_per_user = self.statistics.evaluate_safe( partial( self._get_average_apps_per_user, self.statistics["selector_" + selector + "_applications_amount"], self.statistics["selector_" + selector + "_num_users_with_apps"], ), f"Can't get average amount of applications per user for {selector} interpreter", ) if success and average_apps_per_user is not None: self.statistics["selector_" + selector + "_average_applications_per_user"] = average_apps_per_user self.statistics.add_metric( partial(self._get_amount_of_apps_per_each_version_of_interpreters, interpreter_apps), "int_dict", "selector_" + selector + "_num_applications_by_interpreter", f"Can't get amount of applications per each version of {selector} interpreters", ) @staticmethod def _get_wizard_statistics() -> dict: """ Get wizard status and list of installed modules """ cmd = ["/usr/sbin/cloudlinux-wizard", "status"] ret_code, std_out, std_err = run_command(cmd, return_full_output=True) if ret_code != 0: raise ExternalProgramFailed(std_err) parsed_json = json.loads(std_out) wizard_statistics = {} wizard_statistics["wizard_status"] = parsed_json["wizard_status"] parsed_modules = {module["name"]: module["status"] for module in parsed_json["modules"]} for module in ALL_MODULES: wizard_statistics["wizard_module_" + module] = parsed_modules.get(module, NOT_SELECTED) return wizard_statistics def _fill_wizard_statistics(self) -> None: """ Fill dict with statistics by varied statistics about cloudlinux-wizard """ # ignore non-success here because error messages below are suppressed _, wizard_statistics = self.statistics.evaluate_safe( self._get_wizard_statistics, "Can't get statistics about cloudlinux-wizard", ) self.statistics.add_metric( lambda: wizard_statistics["wizard_status"], "str", "wizard_status", log_message=None ) for module in ALL_MODULES: self.statistics.add_metric( lambda: wizard_statistics["wizard_module_" + module], # pylint: disable=cell-var-from-loop "str", "wizard_module_" + module, log_message=None ) @staticmethod def _get_implemented_integration_scripts(): """ Returns list of implemented scripts in integration.ini """ config = _read_config_file() scripts = [] for section in config: scripts += list(config[section].keys()) return scripts def _get_integration_info(self): """ Checks integration script exists and if exists get list of implemented scripts """ result = {"integration_scripts_used": False, "integration_scripts_specified": []} if not os.path.isfile(CONFIG_PATH): return result result["integration_scripts_used"] = True result["integration_scripts_specified"] = self._get_implemented_integration_scripts() return result @staticmethod def _get_memory_used(): """ Gets memory usage: total and used memory in megabytes """ bytes_in_mb = 1024**2 mem = psutil.virtual_memory() mem_total = float(mem.total) / bytes_in_mb mem_used = float(mem.used) / bytes_in_mb return mem_total, mem_used @staticmethod def _get_kernel_info(): """ Gets kernel info release and module version (starting from 7h) :return: """ kernel_release = platform.release() # modinfo has same version output as rpm -q kmod-lve kmodlve_version_file = "/sys/module/kmodlve/version" kmodlve_version = None if is_panel_feature_supported(Feature.LVE) and os.path.exists(kmodlve_version_file): with open(kmodlve_version_file, "r", encoding="utf-8") as f: kmodlve_version = f.read().strip() return kernel_release, kmodlve_version @staticmethod def _get_lve_extensions_packages_amount(): """ Gets info about lve extensions usage Calculates amount of packages with lve extensions """ return len(get_packages_with_lve_extensions()) @staticmethod def _is_kernel_datacycle_enabled_in_file() -> bool: """ Reads /proc/sys/fs/datacycle/enable in order to check datacycle enabled parameter """ datacycle_file = "/proc/sys/fs/datacycle/enable" if not os.path.exists(datacycle_file): return False with open(datacycle_file, "r", encoding="utf-8") as f: data = f.read().strip() return bool(int(data)) @staticmethod def _is_datacycle_param_was_passed() -> bool: """ Checks if datacycle parameter was given for current boot """ cmdline_file, param_name = "/proc/cmdline", "datacycle" # just in case if not os.path.exists(cmdline_file): return False with open(cmdline_file, "r", encoding="utf-8") as f: data = f.read().strip().split(" ") return param_name in data @staticmethod def _get_total_domains_amount() -> int: """ Returns general amount of domains on server """ cpusers_list = cpapi.cpusers() domains_count = 0 for user in cpusers_list: domains_count += len(cpapi.userdomains(user)) return domains_count @staticmethod def _is_link_traversal_protection_enabled() -> str: """ Returns is links traversal protection enabled on server (symlinks or hardlinks) """ sysctl = SysCtlConf() symlink_protection_enabled = bool(int(sysctl.get("fs.protected_symlinks_create"))) hardlink_protection_enabled = bool(int(sysctl.get("fs.protected_hardlinks_create"))) if symlink_protection_enabled and hardlink_protection_enabled: return "all" if symlink_protection_enabled: return "symlinks_only" if hardlink_protection_enabled: return "hardlinks_only" return "no" @staticmethod def _get_cl_installation_source(): iso = "iso" cldeploy = "cldeploy" qcow2 = "qcow2" if os.path.exists("/etc/cl-convert-saved") or os.path.exists("/var/log/cldeploy"): return cldeploy if os.path.exists("/etc/cloudlinux-qcow2-install"): return qcow2 return iso def _fill_system_statistics(self): kernel_info = self._get_kernel_info() if is_panel_feature_supported(Feature.LVE): self.statistics.add_metric( self._get_lve_extensions_packages_amount, "int", "lve_extension_packages_amount", log_message=None ) self.statistics.add_metric( self._is_link_traversal_protection_enabled, "str", "link_traversal_protection_enabled", log_message=None ) self.statistics.add_metric( lambda: kernel_info[1], "str", "installed_kmod_lve_version", log_message=None ) if not is_cl_solo_edition(skip_jwt_check=True): self.statistics.add_metric( is_email_notification_enabled, "int", "cldiag_cron_check_enabled", log_message=None ) self.statistics.add_metric(cpu_count, "str", "cpu_amount", log_message=None) self.statistics.add_metric( is_testing_enabled_repo, "int", "testing_repository_enabled", log_message=None ) self.statistics.add_metric(self._get_platform, "str", "base_distro", "Can't detect platform for CLOS") self.statistics.add_metric( self._detect_secureboot, "str", "secureboot_status", "Can't detect whether secure boot is enabled" ) self.statistics.add_metric( lambda: kernel_info[0], "str", "kernel_release", log_message=None ) vendor_integration_info = self._get_integration_info() self.statistics.add_metric( lambda: vendor_integration_info["integration_scripts_used"], "int", "integration_scripts_used", log_message=None ) self.statistics.add_metric( lambda: vendor_integration_info["integration_scripts_specified"], "str_list", "integration_scripts_specified", log_message=None ) memory_usage = self._get_memory_used() self.statistics.add_metric( lambda: memory_usage[0], "float", "memory_total_mb", log_message=None ) self.statistics.add_metric( lambda: memory_usage[1], "float", "memory_used_mb", log_message=None ) self.statistics.add_metric( self._get_total_domains_amount, "int", "domains_total", log_message=None ) if is_panel_feature_supported(Feature.LVE): self.statistics.add_metric( lambda: (self._is_kernel_datacycle_enabled_in_file() or self._is_datacycle_param_was_passed()), "int", "kernel_datacycle_usage_enabled", "Can't get kernel datacycle enabled parameter" ) self.statistics.add_metric( get_virt_type, "str", "virt_type", "Can't get the virtualization type" ) self.statistics.add_metric(getfqdn, "str", "hostname", "Can't get the hostname") self.statistics.add_metric( self._get_cl_installation_source, "str", "cloudlinux_installation_source", "Can't get installation source" ) def _fill_dict_with_rpm_packages_statistics(self): self.statistics.add_metric( lambda: self.system_id or UNKNOWN_RHN_ID, "str", "system_id", "Can't get system ID", ) self.statistics.add_metric(get_cl_version, "str", "os_version", "Can't get version of OS") self.statistics.add_metric( get_rpm_packages_info, "rpm_stat_list", "packages", "Can't get info about client's rpm packages" ) def _get_proc_param(self, param: AnyStr) -> Optional[int]: """ Retrieve data from proc/mounts for param :return: param_value - Optional[int], if there is no value - None """ return clconfig_utils.str_to_int(self.sysctl.get(param)) def _fill_proc_params_statistics(self): """ Filling stats about mounting - mount params from parameters list - separate hidepid getting, since it is more complicated than other mounting params """ parameters = [ "kernel.memcg_oom_disable", # Only for CL7+ ] if is_panel_feature_supported(Feature.LVE): # Get all mounting parameters including hidepid # two mounting params: fs.protected_symlinks_create and # fs.protected_hardlinks_create are gathered in # _is_link_traversal_protection_enabled method parameters.extend( [ "fs.enforce_symlinksifowner", "fs.symlinkown_gid", "fs.protected_symlinks_allow_gid", "fs.protected_hardlinks_allow_gid", "fs.global_root_enable", "fs.proc_can_see_other_uid", "fs.proc_super_gid", "fs.xfs.cap_res_quota_disable", # Only for ext4 fs "ubc.ubc_oom_disable", # Only for CL6 "fs.process_symlinks_by_task", # Only for CPanel on CL7+ ] ) for p in parameters: self.statistics.add_metric( partial(self._get_proc_param, p), "int", # It is forbidden to use '.' in the field name p.replace(".", "_"), f"Can't get {p}" ) self.statistics.add_metric( get_hidepid_typing_from_mounts, "int", "hidepid", "Can't get hidepid value" ) def _fill_kmodlve_params_statistics(self): """ Fill dict with statistics about kmodlve parameters usage """ parameters = ("lve_setuid_enter",) for p in parameters: self.statistics.add_metric( partial(self._get_kmodlve_param_value, p), "int", f"kmodlve_{p}", f'Can\'t get "{p}" kmodlve parameter value' ) def _fill_cln_jwt_information(self): """ Fill statistics fields from jwt token. - jwt token metrics (cl_plus existence and client_id) """ # Get metrics from jwt token and process them data = get_client_data_from_jwt_token(check_expiration=False) self.statistics.add_metric( lambda: None if data is None else data.get("cl_plus", None), "int", "cl_plus", "Can't get cl_plus information", ) self.statistics.add_metric( lambda: None if data is None else data.get("client_id", None), "int", "client_id", "Can't get client_id value", ) self.statistics.add_metric( lambda: None if data is None else data.get("is_awp_premium_allowed", None), "int", "jwt_is_awp_premium_allowed", "Can't get is_awp_premium_allowed value", ) self.statistics.add_metric( lambda: None if data is None else data.get("is_awp_cdn_allowed", None), "int", "jwt_is_awp_cdn_allowed", "Can't get is_awp_cdn_allowed value", ) self.statistics.add_metric( lambda: None if data is None else data.get("is_trial", None), "int", "jwt_is_trial_license", "Can't get jwt_is_trial_license value", ) self.statistics.add_metric( partial(is_active_cloudlinux_license, data), "int", "license_active", "Can't get license status value" ) def _fill_centralized_management_statistics(self): """ Filling stats centralized management - centralized management existence """ # Get information about forcible disabling of CM self.statistics.add_metric( # If such file exists, CM disabled and vice versa lambda: os.path.isfile(self.CL_PLUS_CM_DISABLED_PATH), "int", "centralized_management_disabled", "Can't check CM disabling status", ) # Get status of cl_plus_sender service self.statistics.add_metric( get_cl_plus_sender_status, "str", "cl_plus_sender_service_status", "Can't check cl plus sender service status", ) @staticmethod def make_flat_cpu_metrics() -> Dict: """ Prepare list of dicts with CPU metrics Method get_cpu_metrics returns data in following format: [ { "id": 0, "model": "QEMU Virtual CPU version 2.5+" }, { "id": 0, "model": "QEMU Virtual CPU version 2.5+" } ] This helper produces a dict, where each key - metric_name, value - list of values for all CPUs "cpu_model": [ "QEMU Virtual CPU version 2.5+", "QEMU Virtual CPU version 2.5+" ], "cpu_id": [ 0, 0 ] """ result = {} try: cpu_cores = get_cpu_metrics() for cpu_core in cpu_cores: for metric, value in cpu_core.items(): result.setdefault(f"cpu_{metric}", []).append(value) except (OSError, NotSupported) as ex: app_logger.error("CPU metrics getting error: %s", ex) return result def _fill_hardware_statistics(self): """ Filling stats about hardware metrics, specifically: CPU: - cache - frequency - model - id RAM: - ram - swap """ # Get CPUs metrics exp_metrics = ["cpu_id", "cpu_cache_mb", "cpu_model", "cpu_frequency_mhz"] cpu_metrics = self.make_flat_cpu_metrics() for metric in exp_metrics: # All metrics accept model are numeric metric_type = "int_list" if metric != "cpu_model" else "str_list" self.statistics.add_metric( partial(cpu_metrics.get, metric, None), metric_type, metric, f"Can't parse {metric} metric for all cores", ) # Get memory metrics self.statistics.add_metric( get_memory_metrics, "int_dict", "memory", "Can't parse memory metrics", ) def _fill_act_cagefs_disabled_statistics(self): """ Collect CageFS enter errors number: 1. "Act like CageFS is disabled (unable to create LVE).. %d" 2. "Act like CageFS is disabled (unable to enter into NameSpace).. %d" 3. "Act like CageFS is disabled (unable to acquire lock for user %s uid %d)" 4. File-marker /etc/cagefs/fail.on.error presense :return None """ ( self.statistics["act_cagefs_disabled_unable_to_create_lve"], self.statistics["act_cagefs_disabled_unable_to_enter_ns"], self.statistics["act_cagefs_disabled_unable_to_acqure_lock"], ) = self._scan_log_for_act_cagefs_disabled_messages() self.statistics["act_cagefs_disabled_marker_present"] = os.path.exists("/etc/cagefs/fail.on.error") def _scan_log_for_act_cagefs_disabled_messages(self) -> Tuple[int, int, int]: """ Scan /var/log/messages for all needed "Act like CageFS is disabled ..." messages for yesterday :return tuple of ints: Number of "Act like CageFS is disabled (unable to create LVE).. " messages, Number of "Act like CageFS is disabled (unable to enter into NameSpace).. " messages, Number of "Act like CageFS is disabled (unable to acquire lock for user %s uid %d)" messages """ try: returncode, stdout = self._get_data_from_log() if returncode == 0: # Something was found lines_list = stdout.split("\n") # grep for separate messages found_lines_list = list( grep( "Act like CageFS is disabled (unable to create LVE)", fixed_string=True, multiple_search=True, data_from_file=lines_list, ) ) num_unable_to_create_lve = len(found_lines_list) found_lines_list = list( grep( "Act like CageFS is disabled (unable to enter into NameSpace)", fixed_string=True, multiple_search=True, data_from_file=lines_list, ) ) num_unable_to_enter_ns = len(found_lines_list) found_lines_list = list( grep( "Act like CageFS is disabled (unable to acquire lock for user", fixed_string=True, multiple_search=True, data_from_file=lines_list, ) ) num_unable_to_acqure_lock = len(found_lines_list) else: # nothing found num_unable_to_create_lve = 0 num_unable_to_enter_ns = 0 num_unable_to_acqure_lock = 0 return num_unable_to_create_lve, num_unable_to_enter_ns, num_unable_to_acqure_lock except (OSError, IOError): return -42, -42, -42 @staticmethod def _get_data_from_log() -> Tuple[int, str]: """ Scan /var/log/messages for all needed "Act like CageFS is disabled ..." messages for yesterday :return: Tuple (ret code, std_out string) """ os_type = get_cl_version() if os_type in ["cl6", "cl6h"]: # CL6 systems don't have an utility like journalctl, # so we use the logs-at script which can work with different date formats yesterday_date = datetime.date.today() - datetime.timedelta(days=1) date_to_scan = yesterday_date.strftime("%Y-%m-%d") # /usr/share/cloudlinux/logs-at 2021-04-07 /var/log/messages s_cmd = ( f"/usr/share/cloudlinux/logs-at {date_to_scan} " "/var/log/messages | /bin/grep 'Act like CageFS is disabled'" ) else: # CL7 and up or Ubuntu - use the journalctl utility # The minimal supported Ubuntu version is 20.04, and it has journalctl s_cmd = "/usr/bin/journalctl --since yesterday --until today | /usr/bin/grep 'Act like CageFS is disabled'" p = subprocess.run( s_cmd, text=True, shell=True, executable="/bin/bash", stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, check=False, ) return p.returncode, p.stdout def _prepare_cl_normal_statistics(self): if is_panel_feature_supported(Feature.LVE): self.statistics.add_metric( self._detect_old_lve_integration, "int", "old_way_of_integration_used", "Can't detect old LVE integration mechanism", ) self.statistics["lve_utils_version"] = LVE_UTILS_PKG_VERSION self.statistics.add_metric( partial(CLEditions.get_cl_edition, verify_exp=False), "str", "cl_edition", "Can't get CloudLinux edition" ) self.statistics.add_metric( self._is_solo_marker_exists, "int", "is_solo_meta_file_present", "Can't detect solo meta file presence" ) self.statistics.add_metric( self._is_admin_marker_exists, "int", "is_admin_meta_file_present", "Can't detect admin meta file presence" ) self.statistics.add_metric( self._is_container_marker_exists, "int", "is_container_meta_file_present", "Can't detect container meta file presence", ) self.statistics.add_metric( # security_token is property lambda: self.security_token, "str", "security_token", "Can't get or generate security token", ) self.statistics.add_metric( lambda: self.system_id or UNKNOWN_RHN_ID, "str", "system_id", "Can't get system ID", ) if LVEMANAGER_PKG_VERSION is not None and LVEMANAGER_PKG_RELEASE is not None: self.statistics["lvemanager_version"] = f"{LVEMANAGER_PKG_VERSION}-{LVEMANAGER_PKG_RELEASE}" else: self.statistics["lvemanager_version"] = None self._fill_dict_with_statistics() def _fill_lvemanager_statistics(self): """ Filling stats lvemanager - number of visits on X-Ray tab - number of X-Ray installation - number of LVEManager opens """ LOG_FOLDER = "/var/log/cloudlinux/" COLLECTING_PERIOD = 3600 * 24 LVE_NUMBER_OF_OPENS = "lve_number_of_opens" XRAY_NUMBER_OF_INSTALLS = "xray_number_of_installation" XRAY_NUMBER_OF_VISITS = "xray_number_of_visits" XRAY_ADVANCED_METRICS_STATUS = "xray_advanced_metrics_status" if not os.path.isdir(LOG_FOLDER): return # Custom Exception for breaking out outer loop class BreakOuterLoop(Exception): pass # Nested function to fill statistics def fill_statistics(stats, file): current_date_timestamp = datetime.datetime.now().timestamp() file_path = LOG_FOLDER + file with open(file_path, "r", encoding="utf-8") as f: break_outer_loop = False for line in f: try: log_time = " ".join(line.split(" ")[:2]) log_date_timestamp = datetime.datetime.strptime(log_time, "%Y-%m-%d %H:%M:%S,%f").timestamp() if current_date_timestamp - COLLECTING_PERIOD < log_date_timestamp < current_date_timestamp: message = " ".join(line.strip().split(" ")[2:]) if message == "lvemanager-opened": stats[LVE_NUMBER_OF_OPENS] += 1 elif message == "xray-install-started": stats[XRAY_NUMBER_OF_INSTALLS] += 1 elif message == "xray-tab-clicked": stats[XRAY_NUMBER_OF_VISITS] += 1 else: break_outer_loop = True except (ValueError, AttributeError): continue if break_outer_loop: raise BreakOuterLoop statistics = {LVE_NUMBER_OF_OPENS: 0, XRAY_NUMBER_OF_VISITS: 0, XRAY_NUMBER_OF_INSTALLS: 0} # include files which are contain lvemanager.log in name # except which are ends with .gz # include: lvemanager.log, lvemanager.log.1 lvemanager.log.2 # exclude: lvemanager.log*.gz, lvemanager.log*.bz2 log_files = [name for name in os.listdir(LOG_FOLDER) if re.match(r"^lvemanager\.log(\.\d{1,2})?$", name)] log_files.sort() for log_file in log_files: try: fill_statistics(statistics, log_file) except BreakOuterLoop: pass except OSError: for stat in statistics: statistics[stat] = -1 self.statistics[XRAY_NUMBER_OF_VISITS] = statistics[XRAY_NUMBER_OF_VISITS] self.statistics[XRAY_NUMBER_OF_INSTALLS] = statistics[XRAY_NUMBER_OF_INSTALLS] self.statistics[LVE_NUMBER_OF_OPENS] = statistics[LVE_NUMBER_OF_OPENS] if os.path.exists("/opt/alt/php-xray/php/advanced_metrics.enabled"): self.statistics[XRAY_ADVANCED_METRICS_STATUS] = "enabled" elif os.path.exists("/opt/alt/php-xray/php/advanced_metrics.disabled"): self.statistics[XRAY_ADVANCED_METRICS_STATUS] = "disabled" else: self.statistics[XRAY_ADVANCED_METRICS_STATUS] = "none" def _fill_leapp_statistics(self) -> None: """ Fill dict with Leapp upgrade statistics """ # Upgrade process status: 0 - not started, 1 - error, 2 - success LEAPP_STATUS = "leapp_upgrade_status" # See CLOS-2417 - we want to track users that converted from CentOS 7 to CloudLinux 8 # and offer them a discount on CLOS Shared DISCOUNT_ELIGIBILITY = "centos7_cl7_cl8_transformation" success, elevate_status = self.statistics.evaluate_safe( self.get_leapp_stats, "Can't get Leapp statistics", ) if not success: self.statistics[LEAPP_STATUS] = ERROR return self.statistics[LEAPP_STATUS] = elevate_status["leapp_status"] self.statistics[DISCOUNT_ELIGIBILITY] = elevate_status["discount_eligible"] def _fill_feature_flags_statistics(self) -> None: """ Fill dict with feature flags information """ self.statistics.add_metric( lambda: [f.name for f in list_flags_info()], "str_list", "server_flags_available", "Can't get information about available feature flags", ) self.statistics.add_metric( lambda: [f.name for f in list_flags_info() if f.enabled], "str_list", "server_flags_enabled", "Can't get information about enabled feature flags", ) def _fill_accelerate_wp_promotion_statistics(self) -> None: """ Fill out AccelerateWP promotion status """ # Metric field FIELD = "accelerate_wp_promotion_status" # Status ENABLED_FOR_CURRENT_SERVER = "enabled_for_current_server" ENABLED_FOR_ALL_SERVERS = "enabled_for_all_servers" SKIPPED = "skipped" # flags enable_awp_all_servers_flag = "/var/lve/clflags/enable_awp_all_servers.flag" enable_awp_this_server_flag = "/var/lve/clflags/enable_awp_this_server.flag" skip_awp_setup = "/var/lve/clflags/skip_awp_setup.flag" # Default status status = None if os.path.isfile(enable_awp_all_servers_flag): status = ENABLED_FOR_ALL_SERVERS if os.path.isfile(enable_awp_this_server_flag): status = ENABLED_FOR_CURRENT_SERVER if os.path.isfile(skip_awp_setup): status = SKIPPED self.statistics[FIELD] = status def _prepare_statistics(self): self.statistics.add_metric( # security_token is property lambda: self.security_token, "str", "security_token", "Can't get or generate security token", ) self.statistics.add_metric( lambda: self.system_id or UNKNOWN_RHN_ID, "str", "system_id", "Can't get system ID", ) self.statistics["lve_utils_version"] = LVE_UTILS_PKG_VERSION if LVEMANAGER_PKG_VERSION is not None and LVEMANAGER_PKG_RELEASE is not None: self.statistics["lvemanager_version"] = f"{LVEMANAGER_PKG_VERSION}-{LVEMANAGER_PKG_RELEASE}" else: self.statistics["lvemanager_version"] = None self.statistics.add_metric( self._detect_old_lve_integration, "int", "old_way_of_integration_used", "Can't detect old LVE integration mechanism", ) self._fill_dict_with_statistics() def _fill_dict_with_statistics(self): if is_panel_feature_supported(Feature.GOVERNOR): self._fill_mysql_governor_statistics() if is_panel_feature_supported(Feature.CAGEFS): self._fill_cagefs_statistics() self._fill_act_cagefs_disabled_statistics() if is_panel_feature_supported(Feature.RESELLER_LIMITS): self._fill_resellers_statistics() if is_panel_feature_supported(Feature.LVE): self._fill_limits_statistics() self._fill_kmodlve_params_statistics() if is_panel_feature_supported(Feature.PHP_SELECTOR): self._fill_php_selector_statistics() if ( is_panel_feature_supported(Feature.RUBY_SELECTOR) or is_panel_feature_supported(Feature.PYTHON_SELECTOR) or is_panel_feature_supported(Feature.NODEJS_SELECTOR) ): self._fill_selectors_statistics() if not is_cl_solo_edition(skip_jwt_check=True): self._fill_centralized_management_statistics() if is_panel_feature_supported(Feature.LSAPI): self._fill_lsapi_statistics() if is_panel_feature_supported(Feature.WIZARD): self._fill_wizard_statistics() self._fill_wpos_activation_statistics() self._fill_wpos_statistics() self._fill_cln_jwt_information() self._fill_control_panel_statistics() self._fill_system_statistics() self._fill_proc_params_statistics() self._fill_hardware_statistics() self._fill_wmt_settings_statistics() self._fill_ssa_statistics() self._fill_lvemanager_statistics() self._fill_wp_statistics() self._fill_per_user_wp_statistics() self._fill_leapp_statistics() self._fill_feature_flags_statistics() self._fill_accelerate_wp_promotion_statistics() self._fill_apache2nginx_statistics() def _fill_apache2nginx_statistics(self): self.statistics.add_metric( self.get_apache2nginx_stats, "int_dict", "apache2nginx", "Can't parse Apache2Nginx metrics", ) def _fill_wpos_activation_statistics(self): # AccelerateWP installation was in fact successful on end-server self.statistics.add_metric( lambda: os.path.exists(ACCELERATE_WP_INSTALLED_FROM_CM), "int", "cmt_is_accelerate_wp_free_activation_completed", "Can't get statistic of AccelerateWP CMT activation", ) success, client_activation = self.statistics.evaluate_safe( client_activation_data, "Can't get AccelerateWP activation statistics", ) # client clicked button in centralized monitoring UI self.statistics.add_metric( lambda: client_activation.get("accelerate_wp_free_activate"), "int", "cmt_accelerate_wp_free_activated_in_ui", "Can't get statistic of AccelerateWP CMT activation selected" if success else None, ) def _fill_wpos_statistics(self): success, wpos_stats = self.statistics.evaluate_safe( self.get_wpos_stats, "Can't get AccelerateWP statistics", ) if not success: return if isinstance(wpos_stats, dict) and wpos_stats.get("result") == "success": if wpos_stats.get("features_visible_by_default") is not None: self.statistics.add_metric( lambda: wpos_stats["features_visible_by_default"], "str_list", "wpos_features_visible_by_default", "Can't get statistic of using AccelerateWP.", ) if wpos_stats.get("features_allowed_by_default") is not None: self.statistics.add_metric( lambda: wpos_stats["features_allowed_by_default"], "str_list", "wpos_features_allowed_by_default", "Can't get statistic of using AccelerateWP.", ) self.statistics.add_metric( lambda: wpos_stats["enabled_sites"]["total"], "int", "sites_count_with_enabled_wpos", "Can't get statistic of using AccelerateWP.", ) self.statistics.add_metric( lambda: wpos_stats["allowed_users"]["total"], "int", "users_count_with_allowed_wpos", "Can't get statistic of using AccelerateWP.", ) if wpos_stats.get("visible_users"): self.statistics.add_metric( lambda: wpos_stats["visible_users"]["total"], "int", "users_count_with_visible_wpos", "Can't get statistic of using AccelerateWP.", ) # pylint: disable=cell-var-from-loop for module in wpos_stats["enabled_sites"]: if module == "total": continue self.statistics.add_metric( lambda: wpos_stats["enabled_sites"][module], "int", "sites_count_with_enabled_" + module, "Can't get statistic of using AccelerateWP.", ) for module in wpos_stats.get("enabled_users", []): self.statistics.add_metric( lambda: wpos_stats["enabled_users"][module], "int", "users_count_with_enabled_" + module, "Can't get statistic of using AccelerateWP.", ) for module in wpos_stats["allowed_users"]: if module == "total": continue self.statistics.add_metric( lambda: wpos_stats["allowed_users"][module], "int", "users_count_with_allowed_" + module, "Can't get statistic of using AccelerateWP.", ) for suite in wpos_stats.get("allowed_suites", []): self.statistics.add_metric( lambda: wpos_stats["allowed_suites"][suite], "int", "users_count_with_allowed_suite_" + suite, "Can't get statistic of using AccelerateWP.", ) for suite in wpos_stats.get("enabled_suites", []): self.statistics.add_metric( lambda: wpos_stats["enabled_suites"][suite], "int", "users_count_with_enabled_suite_" + suite, "Can't get statistic of using AccelerateWP.", ) billable_suite_metric = f'{suite}_billable' non_billable_suite_metric = f'{suite}_non_billable' if enabled_billable := wpos_stats.get(billable_suite_metric): self.statistics.format_metric( lambda: enabled_billable, "int", "users_count_with_enabled_suite_" + billable_suite_metric, "Can't get statistic of using AccelerateWP.", ) if enabled_non_billable := wpos_stats.get(non_billable_suite_metric): self.statistics.format_metric( lambda: enabled_non_billable, "int", "users_count_with_enabled_suite_" + non_billable_suite_metric, "Can't get statistic of using AccelerateWP.", ) for module in wpos_stats.get("visible_users", []): if module == "total": continue self.statistics.add_metric( lambda: wpos_stats["visible_users"][module], "int", "users_count_with_visible_" + module, "Can't get statistic of using AccelerateWP.", ) # pylint: enable=cell-var-from-loop for module, upgrade_url in wpos_stats.get("upgrade_urls", {}).items(): self.statistics["upgrade_url_" + module] = upgrade_url self.statistics.add_metric( lambda: wpos_stats["accelerate_wp_suite_enabled_premium_suite_disallowed"], "int", "accelerate_wp_suite_enabled_premium_suite_disallowed", "Can't get statistic of using AccelerateWP.", ) self.statistics.add_metric( lambda: wpos_stats["accelerate_wp_suite_enabled_premium_suite_visible"], "int", "accelerate_wp_suite_enabled_premium_suite_visible", "Can't get statistic of using AccelerateWP.", ) self.statistics.add_metric( lambda: wpos_stats.get("is_accelerate_wp_flag_enabled"), "int", "is_accelerate_wp_flag_enabled", "Can't get statistic of AccelerateWP feature flag.", ) self.statistics.add_metric( lambda: wpos_stats.get("is_accelerate_wp_icon_enabled"), "int", "is_accelerate_wp_icon_enabled", "Can't get statistic of AccelerateWP icon status.", ) self.statistics.add_metric( lambda: wpos_stats.get("is_smart_advice_notifications_enabled"), "str", "is_smart_advice_notifications_enabled", "Can't get statistic of AccelerateWP is_smart_advice_notifications_enabled.", ) self.statistics.add_metric( lambda: wpos_stats.get("is_smart_advice_reminders_enabled"), "str", "is_smart_advice_reminders_enabled", "Can't get statistic of AccelerateWP is_smart_advice_reminders_enabled.", ) self.statistics.add_metric( lambda: wpos_stats.get("is_smart_advice_plugin_installation_enabled"), "str", "is_smart_advice_plugin_installation_enabled", "Can't get statistic of AccelerateWP is_smart_advice_plugin_installation_enabled.", ) self.statistics.add_metric( lambda: wpos_stats.get("is_object_cache_banners_enabled"), "str", "is_object_cache_banners_enabled", "Can't get statistic of AccelerateWP is_object_cache_banners_enabled.", ) if whmcs_stats := wpos_stats.get("whmcs"): for key, value in whmcs_stats.items(): self.statistics[f"awp_whmcs_{key}"] = value @staticmethod def _run_cloudlinux_statistics(args): """ Run cloudlinux-statistics using subprocess and handle errors. :type args: list[str] :rtype: str or None """ cmd = ["/usr/sbin/cloudlinux-statistics"] + args try: rc, json_str, _ = run_command(cmd, return_full_output=True) except ExternalProgramFailed as e: app_logger.warning("Unable to run cloudlinux-statistics, error: %s", e) return None if rc == 0: return json_str app_logger.error("cloudlinux-statistics failed with exit code: %i, output: %s", rc, json_str) return None def get_users_and_resellers_with_faults(self): """ Get number of users and resellers with faults for the past 24h :rtype: tuple[int, int] """ json_str = self._run_cloudlinux_statistics(["--by-fault", "any", "--json", "--period=1d"]) # lve-stats is not installed or util is broken if json_str is None: return None, None try: json_data = json.loads(json_str) resellers = json_data["resellers"] users = json_data["users"] except (KeyError, ValueError, TypeError) as e: app_logger.warning("Something really bad happened to cloudlinux-statistics, The reason is: %s", str(e)) return None, None return len(users), len(resellers) def _get_users_with_faults(self) -> Optional[int]: """ a wrapper method for the get_users_and_resellers_with_faults method. return the number of users with faults in the past 24 hours. """ return self.get_users_and_resellers_with_faults()[0] def _get_resellers_with_faults(self) -> Optional[int]: """ a wrapper method for the get_users_and_resellers_with_faults method. return the number of resellers with faults in the past 24 hours. """ return self.get_users_and_resellers_with_faults()[1] @classmethod def _get_cpu_limit_units(cls, cpu): """Get config cpu limit format""" if cpu is None: return None unit = cpu.lower() if unit.endswith("%"): return "speed" if unit.endswith("mhz"): return "mhz" if unit.endswith("ghz"): return "ghz" if unit.isdigit(): return "old_cpu_format" return f"unknown: {cpu}" @staticmethod def _mempages_to_mb(value): """Convert memory limit from mempages to megabytes""" if value is None: return None return 4 * value // 1024 @staticmethod def _cpu_limit_to_percents(cpu, ncpu): """Convert cpu and ncpu to percents of one core""" if cpu is None: return None speed = lvectllib.convert_to_kernel_format(cpu, lncpu=ncpu or 0) if speed is None: return None return round(speed / 100.0, 1) def get_users_amount_per_plan(self, xml_cfg_provider: LimitsDataStorage) -> list[tuple[int, str], ...]: """ Return list of tuples [users_in_package, package] """ if self.packages_by_len is None: # Create a deepcopy of users to avoid being affected by modifications of the original data users_package_copy = deepcopy(list(xml_cfg_provider.users.values())) # Filter users with valid package.name valid_users = [ user for user in users_package_copy if user.package is not None and isinstance(user.package.name, str) ] # Count users by package name package_counts = Counter(user.package for user in valid_users) # Convert to list and sort by user amount per package (descending) self.packages_by_len = sorted([item[::-1] for item in package_counts.items()], reverse=True) # Debugging: Check original list for non-string package names (see CLOS-2370 for context) invalid_users = [ user for user in xml_cfg_provider.users.values() if user.package is None or not isinstance(user.package.name, str) ] if invalid_users: app_logger.error( "Found invalid package names in original data: %s", [(user.id, user.package) for user in invalid_users] ) return self.packages_by_len def _get_top_package_by_number_of_users( self, number_of_top: int, xml_cfg_provider: LimitsDataStorage) -> Optional[Tuple[int, str]]: try: return self.get_users_amount_per_plan(xml_cfg_provider)[number_of_top - 1] except IndexError: return None def _parse_args(self, argv): """ Parse CLI arguments """ status, data = parse_cloudlinux_summary_opts(argv) if not status: # exit with error if can`t parse CLI arguments self._error_and_exit(replace_params(data)) return data @staticmethod def _print_result_and_exit( result: str = "success", data: object | None = None, exit_code: int = 0, is_statistic_enabled: bool | None = None) -> typing.NoReturn: """ Print data in default format for web and exit """ message = {"result": result, "timestamp": time.time(), "data": data} if is_statistic_enabled is not None: message["statistic_enabled"] = is_statistic_enabled print_dictionary(message, True) sys.exit(exit_code) @staticmethod def _error_and_exit(message: dict, error_code: int = 1) -> Optional[None]: """ Print error and exit :param dict message: Dictionary with keys "result" as string and optional "context" as dict """ message.update({"timestamp": time.time()}) print_dictionary(message, True) sys.exit(error_code) @staticmethod def get_raw_lsapi_info() -> Optional[Dict]: """ Return mod_lsapi info from switch_mod_lsapi script """ if os.path.isfile("/usr/bin/switch_mod_lsapi"): p = subprocess.run( ["/usr/bin/switch_mod_lsapi", "--stat"], text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False, ) return json.loads(p.stdout) @staticmethod def _get_wmt_api_config(): """ Return wmt-api config if is_solo_edition True, run wpt-api-solo is command """ command = "/usr/share/web-monitoring-tool/wmtbin/wmt-api" if is_cl_solo_edition(): command += "-solo" if os.path.isfile(command): p = subprocess.run( [command, "--config-get"], text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False ) wmt_api_config_summary = json.loads(p.stdout).get("config") wmt_api_config_summary = {k: str(v) for k, v in wmt_api_config_summary.items()} return wmt_api_config_summary @staticmethod def _is_solo_marker_exists(): return os.path.isfile("/etc/cloudlinux-edition-solo") @staticmethod def _is_admin_marker_exists(): return os.path.isfile("/etc/cloudlinux-edition-admin") @staticmethod def _is_container_marker_exists(): return os.path.isfile("/etc/cloudlinux-container") @staticmethod def get_ssa_stats() -> Optional[Dict]: """ Return statistics from cloudlinux-ssa-manager get-stat API """ if os.path.isfile("/usr/sbin/cloudlinux-ssa-manager"): p = subprocess.run( ["/usr/sbin/cloudlinux-ssa-manager", "get-stat"], text=True, capture_output=True, check=False ) return json.loads(p.stdout) @staticmethod def get_leapp_stats() -> dict: """ Return statistics from Leapp upgrade logs """ discount_eligible = 0 leapp_logfile = "/var/log/leapp/leapp-upgrade.log" elevate_cpanel_logfile = "/var/log/elevate-cpanel.log" cldeploy_logfile = "/var/log/cldeploy.log" if os.path.isfile(leapp_logfile) and os.path.isfile(cldeploy_logfile): # The user is eligible for discount when cldeploy and leapp were ran # within 1 month of each other, and the upgrade was made from CL7 to CL8. current_os_version = get_cl_version() if current_os_version == "cl8": leapp_mtime = os.path.getmtime(leapp_logfile) cldeploy_mtime = os.path.getmtime(cldeploy_logfile) # The actual number of seconds in a month differs depending on how many days # are there in it, but for the sake of this calculation, we can just assume 30 days. month_in_seconds = 30 * 24 * 60 * 60 if abs(leapp_mtime - cldeploy_mtime) < month_in_seconds: discount_eligible = 1 # This means upgrade wasn't attempted. upgrade_status = 0 if detect.getCPName() != cpapi.CPANEL_NAME: # Most likely nopanel, read leapp log file # If the log file exists, `leapp upgrade` was run on the machine. if os.path.isfile(leapp_logfile): # This shows that the upgrade process was not successful. fail_grep_str = r"An upgrade inhibitor detected\|Workflow interrupted due to the FailPhase" # This shows that the upgrade process got to the very last stage - i.e. success. first_boot_grep_str = r"Starting stage After of phase FirstBoot" fail_grep_res = next(grep(fail_grep_str, leapp_logfile, fixed_string=True), None) first_boot_grep_res = next(grep(first_boot_grep_str, leapp_logfile, fixed_string=True), None) # No halting messages found, uprgade is (provisionally) successful. if first_boot_grep_res and not fail_grep_res: upgrade_status = 2 else: upgrade_status = 1 else: # cPanel, read elevate-cpanel log file if os.path.isfile(elevate_cpanel_logfile): success_grep_str = r"Great SUCCESS" grep_res = next(grep(success_grep_str, elevate_cpanel_logfile, fixed_string=True), None) # The process got to the end and reported success, upgrade is successful. if grep_res: upgrade_status = 2 else: upgrade_status = 1 # Discount applies only to users who have successfully upgraded. if upgrade_status != 2: discount_eligible = 0 stat_result = {"leapp_status": upgrade_status, "discount_eligible": discount_eligible} return stat_result @staticmethod def _get_kmodlve_param_value(param_name: str) -> int: """ Get kmodlve parameter value: -1 - not supported (parameter file doesn't exist); 0 - disabled; 1 - enabled """ params_dir = "/sys/module/kmodlve/parameters" param_file = Path(params_dir, param_name) # absense of the file means parameter is not supported if not param_file.exists(): return -1 param_value = param_file.read_text(encoding="utf-8").strip() if param_value == "Y": return 1 if param_value == "N": return 0 raise RuntimeError(f'Unable to interpret "{param_name}" kmodlve ' f'parameter value: "{param_value}"') @staticmethod def get_smart_advice_stats() -> Optional[Dict]: """ Return statistics from cl-smart-advice counters API """ util = "/usr/sbin/cl-smart-advice" if os.path.isfile(util): p = subprocess.run([util, "counters"], text=True, capture_output=True, check=False) return json.loads(p.stdout).get("data") def _get_smart_advice_statistics(self) -> Dict: """ Construct dict with Smart Advice statistics """ result = {"smart_advice_total": None, "smart_advice_applied": None} # checking edition by JWT token only, skipping marker check, # since cl-smart-advice requires JWT # and due to bug with detection on Shared PRO by marker PTCLLIB-238 if not is_cl_shared_edition(skip_marker_check=True): success, sa_stats = self.statistics.evaluate_safe( self.get_smart_advice_stats, "Can't get Smart Advice statistics", ) if sa_stats is not None: try: return {"smart_advice_total": sa_stats["total"], "smart_advice_applied": sa_stats["applied"]} except KeyError as e: app_logger.exception( 'Unexpected json response from server, field %s is missing in "%s"', str(e), sa_stats ) return result return result def get_apache2nginx_stats(self) -> Optional[Dict]: """ Return statistics by parsing /var/lib/apache2nginx/server.json """ stats = {} if os.path.isfile("/var/lib/apache2nginx/server.json"): with open("/var/lib/apache2nginx/server.json", "r", encoding="utf-8") as f: data = json.load(f) # apache2nginx_total_websites is total number of entries # in server.json stats["total_websites"] = len(data) websites = list(data.values()) # apache2nginx_total_proxy_pass is all website with proxy_reason field proxied = [x for x in websites if x.get("proxy_reason")] stats["total_proxy_pass"] = len(proxied) # proxy pass due to failed directive pasing stats["proxy_pass_due_directive"] = len([x for x in proxied if x["proxy_reason"] == "directive"]) stats["failing_directives"] = [] stats["failing_handlers"] = [] for x in proxied: if x["proxy_reason"] == "directive": failing_directive = x.get("proxy_details") if failing_directive: stats["failing_directives"].append(x["proxy_details"]) elif x["proxy_reason"] == "handler": failing_handler = x.get("proxy_details") if failing_handler: stats["failing_handlers"].append(x["proxy_details"]) # Make entries unique stats["failing_directives"] = list(set(stats["failing_directives"])) stats["failing_handlers"] = list(set(stats["failing_handlers"])) stats["proxy_pass_due_test"] = len([x for x in proxied if x["proxy_reason"] == "test"]) stats["proxy_pass_due_forced"] = len([x for x in proxied if x["proxy_reason"] == "forced"]) stats["proxy_pass_due_handler"] = len([x for x in proxied if x["proxy_reason"] == "handler"]) stats["proxy_pass_due_includes"] = len([x for x in proxied if x["proxy_reason"] == "includes"]) stats["proxy_pass_due_http"] = len([x for x in proxied if x["proxy_reason"] == "http"]) # state is based on string in the /var/lib/apache2nginx/state file # if file is missing, state is unknown if os.path.isfile("/var/lib/apache2nginx/state"): with open("/var/lib/apache2nginx/state", "r", encoding="utf-8") as f: stats["state"] = f.read().strip() success, max_webserver_stat = self.statistics.evaluate_safe( self.get_max_webserver_stats, "Can't get Max Webserver statistics", ) if not success: return stats for metric_name, metric_value in max_webserver_stat.items(): stats[metric_name] = metric_value return stats @staticmethod def get_max_webserver_stats() -> Optional[Dict]: """ Return statistics from cmax-webserver-stats utility """ util = "/usr/share/max_webserver/max-webserver-stats" if not os.path.exists(util): return {} p = subprocess.run( [util], text=True, capture_output=True, check=False, ) return json.loads(p.stdout) @staticmethod def get_wpos_stats() -> Optional[Dict]: """ Return statistics from cl-smart-advice counters API """ util = "/usr/bin/clwpos-admin" if os.path.isfile(util): p = subprocess.run( [util, "get-stat"], text=True, capture_output=True, check=False, ) return json.loads(p.stdout) def _get_lvpmap(): lvpmap = LvpMap() lvpmap.name_map.link_xml_node() return lvpmap