????JFIF??x?x????'403WebShell
403Webshell
Server IP : 104.21.30.238  /  Your IP : 216.73.216.145
Web Server : LiteSpeed
System : Linux premium151.web-hosting.com 4.18.0-553.44.1.lve.el8.x86_64 #1 SMP Thu Mar 13 14:29:12 UTC 2025 x86_64
User : tempvsty ( 647)
PHP Version : 8.0.30
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /././opt/cloudlinux/venv/lib/python3.11/site-packages/wmt/common/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /././opt/cloudlinux/venv/lib/python3.11/site-packages/wmt/common//config.py
#!/opt/cloudlinux/venv/bin/python3
import logging
import os
import fnmatch
import json
from dataclasses import dataclass, asdict, field
from wmt.common.const import CONFIG_PATH, PING_CONNECTIONS
from wmt.common.exceptions import WmtConfigException
from wmt.common.url_parser import parse
from clcommon.cpapi import get_admin_email
from socket import gethostname
from typing import List


@dataclass
class Cfg:
    """
    Default values, in case config has not been specified yet
    """
    ping_interval: int = 30
    ping_timeout: int = 10
    ping_connections: int = PING_CONNECTIONS
    report_email: str = None
    report_top: int = 4
    ignore_list: List[str] = field(default_factory=list)
    summary_notification_enabled: bool = True
    alert_notifications_enabled: bool = False


class ConfigManager:
    def __init__(self):
        self.allowed_params = Cfg.__dataclass_fields__.keys()
        self.from_email = f'web-monitoring-tool@{gethostname()}'
        self.default_report_email = get_admin_email()

        self.cfg = self._init_cfg()

        # self.ignored_domains is used as cache for checking domains if it is ignored or not
        self._ignored_domains = self.generate_ignored_domains()
        self.target_email = self._get_target_email()

    def _get_target_email(self):
        """
        This function checks to see which email address to use for TO: field of smtp.
        If report_email has been defined by user then report_email will be used.
        By default (in case not defined by user) default_report_email will be used
        """
        return self.cfg.report_email if self.cfg.report_email else self.default_report_email

    def to_dict(self):
        return asdict(self.cfg)

    def _init_cfg(self) -> Cfg:
        # if not present - use defaults
        if not self.is_present():
            return Cfg()

        data = self.read()

        cfg = Cfg()
        for key, value in data.items():
            if key not in self.allowed_params:
                logging.warning(
                    f'unsupported parameter "{key}", please ensure config contains'
                    f' only allowed parameters: {list(self.allowed_params)}'
                )
            else:
                setattr(cfg, key, value)

        return cfg

    @staticmethod
    def is_present():
        return os.path.isfile(CONFIG_PATH)

    def modify(self, new_json: str):
        """
        Changes configuration of wmt

        Returns:
            self.to_dict()

        Raises:
            WmtConfigException

        Example:
            wmt-api-solo --config-change {'key': 'val'}
        """
        try:
            new_config = json.loads(new_json)
        except json.JSONDecodeError as e:
            raise WmtConfigException(str(e))

        if not set(new_config.keys()).issubset(self.allowed_params):
            raise WmtConfigException(f'some of passed params are unsupported, '
                                     f'only allowed parameters: {list(self.allowed_params)}')

        config = {
            **self.to_dict(),
            **new_config
        }

        if config.get('ignore_list') and isinstance(config.get('ignore_list'), str):
            config['ignore_list'] = config['ignore_list'].split(',')

        # Write config to /etc/wmt/config.json file
        with open(CONFIG_PATH, 'w') as f:
            json.dump(config, f, indent=4)
        self.cfg = Cfg(**config)
        return self.to_dict()

    @staticmethod
    def read():
        try:
            with open(CONFIG_PATH) as f:
                data = json.load(f)
        except json.JSONDecodeError as e:
            raise WmtConfigException(str(e))
        return data

    def reload(self):
        self.cfg = self._init_cfg()

    def is_domain_ignored(self, domain) -> bool:
        """
        Check if domain is in ignored list.
        If record contain *" then it will be processed as wildcard. Else as substring
        """
        for pattern in self._ignored_domains:
            if '*' in pattern:
                # set * on the first position if it not set to allow filter without scheme (e.g. domain.com)
                if not pattern.startswith('*'):
                    pattern = f"*{pattern}"
                if fnmatch.fnmatch(domain, pattern):
                    return True
            else:
                if pattern in domain:
                    return True
        return False

    def generate_ignored_domains(self) -> set:
        """
        Generates ignored domains patterns from self.ignore_list and
        returns it for using as cache in self.ignored_domains set().
        """
        patterns = set()
        for pattern in self.cfg.ignore_list:
            patterns.add(pattern)
        return patterns


Youez - 2016 - github.com/yon3zu
LinuXploit