????JFIF??x?x????'
Server IP : 104.21.96.1 / Your IP : 216.73.216.1 Web Server : LiteSpeed System : Linux premium151.web-hosting.com 4.18.0-553.44.1.lve.el8.x86_64 #1 SMP Thu Mar 13 14:29:12 UTC 2025 x86_64 User : tempvsty ( 647) PHP Version : 8.0.30 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : OFF | Pkexec : OFF Directory : /proc/self/root/./opt/cloudlinux/venv/lib64/python3.11/site-packages/ssa/modules/ |
Upload File : |
# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT """ This module contains RequestProcessor class """ import logging import sys import time import signal import traceback from datetime import datetime, timedelta, timezone from threading import Thread, RLock, current_thread from typing import Callable, Any from queue import Queue, Empty from sqlalchemy.exc import OperationalError from .autotracer import AutoTracer from .common import Common from .decision_maker import DecisionMaker from .stat_sender import StatisticsSender from ..db import session_scope, setup_database, RequestResult, cleanup_old_data, restore_database, is_malformed_database from ..internal.exceptions import SSAError from ..internal.utils import ( singleton, url_split, switch_schedstats ) @singleton class RequestProcessor(Common): """ SSA Request processor implementation. Only one instance is allowed to be created """ BUFFER_SIZE = 100 DB_ACCESS_RETRIES = 5 def __init__(self, engine=None): super().__init__() self.logger = logging.getLogger('req_processor') self.logger.info('Processor enabled: %s', __package__) # enable throttling detection kernel mechanism on service start switch_schedstats(enabled=True) self.engine = engine if engine else setup_database() self._lock = RLock() self.decision_maker = DecisionMaker(engine=self.engine) self.sender = StatisticsSender() self.auto_tracer = AutoTracer(engine=self.engine) self._queue = Queue() self._buffer = [] self.start_background_routine() self.start_flush_worker() # Catch the shutdown signal for save last data from buffer if < BUFFER_SIZE signal.signal(signal.SIGTERM, self.shutdown_handler) signal.signal(signal.SIGINT, self.shutdown_handler) @property def configured_duration(self): """ Return config file value multiplied by 1000000, as we receive duration in microseconds """ return self.requests_duration * 1000000 def send_stats(self, report: dict): """ Call Statistics Sender """ try: self.sender.send(report) except SSAError as e: self.logger.error('StatisticsSender failed: %s', str(e)) def start_background_routine(self) -> None: """ Start dumper|DecisionMaker thread in background """ t = Thread(target=self.background_routine, daemon=True) t.start() self.logger.info('[%s] Routine started', t.name) def start_flush_worker(self) -> None: """ Start flush worker thread """ t = Thread(target=self.flush_worker, daemon=True) t.start() self.logger.info('[%s] Flush worker started', t.name) def background_routine(self) -> None: """ Dumps collected stats to file once an hour. Runs DecisionMaker once a day Cleanup storage after DecisionMaker run """ while True: tick = datetime.now(timezone.utc) if tick.minute == 0: if tick.hour == 0: if is_malformed_database(self.engine): self._safe_exec(self.restore_db_with_lock(self.engine)) self.logger.info( '[%s] Routine thread found Database disk image is malformed and now restored (%s)', current_thread().name, tick) self.logger.info( '[%s] Routine thread launching buffer flushing (%s)', current_thread().name, tick) self._safe_exec(self.flush_remaining_objects) self.logger.info( '[%s] Routine thread launching AutoTracer (%s)', current_thread().name, tick) self._safe_exec(self.auto_tracer) self.logger.info( '[%s] Routine thread launching DecisionMaker (%s)', current_thread().name, tick) report = self._safe_exec(self.decision_maker) self.logger.info( '[%s] Routine thread launching cleanup (%s)', current_thread().name, tick) cleanup_old_data(self.engine) self._safe_exec(self.send_stats, report) # attempt to enable throttling detection kernel mechanism # in case it was accidentally switched off switch_schedstats(enabled=True) self._simple_sleep(60) else: self._sleep_till_next_hour(tick.minute) def _safe_exec(self, action: Callable, *args) -> Any: """Call requested Callable with given args and capture any exception""" try: return action(*args) except Exception: et, ev, _ = sys.exc_info() self.logger.exception('%s failed with exception %s, %s', str(action), et, ev, extra={'orig_traceback': traceback.format_exc()}) def _simple_sleep(self, to_sleep: int = 15 * 60): """ Log and sleep given number of seconds or 15 minutes by default """ self.logger.info('[%s] Routine thread sleeping for (%s)', current_thread().name, to_sleep) time.sleep(to_sleep) def _sleep_till_next_hour(self, start_minute): """ Sleep the number of minutes remaining till next hour """ sleep_for = (timedelta(hours=1) - timedelta( minutes=start_minute)).total_seconds() self._simple_sleep(int(sleep_for)) def restore_db_with_lock(self, engine): with self._lock: restore_database(engine) @staticmethod def get_interval_for(timestamp: int) -> int: """ Takes an hour of a day, to which the given timestamp belongs """ return datetime.fromtimestamp(timestamp, timezone.utc).hour def flush_worker(self): """ Continuously flush queued request results to the database """ while True: try: obj = self._queue.get() self._buffer.append(obj) if len(self._buffer) >= self.BUFFER_SIZE: self._flush_objects(self._buffer) self._buffer = [] except Exception: self.logger.exception('Flush worker failed') def flush_remaining_objects(self): """ Flush all remaining objects even if less than BUFFER_SIZE. Should be called once a day. """ if self._buffer: try: self._flush_objects(self._buffer) except Exception: self.logger.exception('Flush remaining objects failed') finally: self._buffer = [] def _flush_objects(self, objects): for attempt in range(self.DB_ACCESS_RETRIES): try: with session_scope(self.engine) as db: db.bulk_save_objects(objects) break except OperationalError as e: if "database is locked" in str(e): self.logger.warning('Database is locked, retrying attempt %s/%s...', attempt + 1, self.DB_ACCESS_RETRIES) time.sleep(0.1) else: raise def handle(self, data: dict) -> None: """ Process given request data """ if not data: self.logger.info('[%s] has empty request, skipping', current_thread().name) return url = data.get('url') if self.is_ignored(url): self.logger.debug('%s ignored', url) return domain, uri = url_split(url) request = RequestResult( domain=domain, path=url, timestamp=data['timestamp'], duration=data['duration'], is_slow_request=data['duration'] > self.configured_duration, hitting_limits=data['hitting_limits'], throttled_time=data['throttled_time'], io_throttled_time=data['io_throttled_time'], wordpress=data['wordpress'], ) self._queue.put(request) def shutdown_handler(self, signum, frame): """ Handle shutdown signals to flush remaining objects before exit. """ self.logger.info(f'Received shutdown signal {signum}, flushing queue and buffer before shutdown') try: self._drain_queue_to_buffer() self.flush_remaining_objects() except Exception: self.logger.exception('Failed to flush remaining objects during shutdown') finally: sys.exit(0) def _drain_queue_to_buffer(self): """ Drain all remaining objects from queue to buffer. """ while not self._queue.empty(): try: obj = self._queue.get_nowait() self._buffer.append(obj) except Empty: break