????JFIF??x?x????'
| Server IP : 104.21.30.238  /  Your IP : 216.73.216.145 Web Server : LiteSpeed System : Linux premium151.web-hosting.com 4.18.0-553.44.1.lve.el8.x86_64 #1 SMP Thu Mar 13 14:29:12 UTC 2025 x86_64 User : tempvsty ( 647) PHP Version : 8.0.30 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : OFF | Pkexec : OFF Directory : /././././proc/self/root/opt/hc_python/lib/python3.12/site-packages/sentry_sdk/ | 
| Upload File : | 
import os
import time
from threading import Thread, Lock
import sentry_sdk
from sentry_sdk.utils import logger
from typing import TYPE_CHECKING
if TYPE_CHECKING:
    from typing import Optional
MAX_DOWNSAMPLE_FACTOR = 10
class Monitor:
    """
    Performs health checks in a separate thread once every interval seconds
    and updates the internal state. Other parts of the SDK only read this state
    and act accordingly.
    """
    name = "sentry.monitor"
    def __init__(self, transport, interval=10):
        # type: (sentry_sdk.transport.Transport, float) -> None
        self.transport = transport  # type: sentry_sdk.transport.Transport
        self.interval = interval  # type: float
        self._healthy = True
        self._downsample_factor = 0  # type: int
        self._thread = None  # type: Optional[Thread]
        self._thread_lock = Lock()
        self._thread_for_pid = None  # type: Optional[int]
        self._running = True
    def _ensure_running(self):
        # type: () -> None
        """
        Check that the monitor has an active thread to run in, or create one if not.
        Note that this might fail (e.g. in Python 3.12 it's not possible to
        spawn new threads at interpreter shutdown). In that case self._running
        will be False after running this function.
        """
        if self._thread_for_pid == os.getpid() and self._thread is not None:
            return None
        with self._thread_lock:
            if self._thread_for_pid == os.getpid() and self._thread is not None:
                return None
            def _thread():
                # type: (...) -> None
                while self._running:
                    time.sleep(self.interval)
                    if self._running:
                        self.run()
            thread = Thread(name=self.name, target=_thread)
            thread.daemon = True
            try:
                thread.start()
            except RuntimeError:
                # Unfortunately at this point the interpreter is in a state that no
                # longer allows us to spawn a thread and we have to bail.
                self._running = False
                return None
            self._thread = thread
            self._thread_for_pid = os.getpid()
        return None
    def run(self):
        # type: () -> None
        self.check_health()
        self.set_downsample_factor()
    def set_downsample_factor(self):
        # type: () -> None
        if self._healthy:
            if self._downsample_factor > 0:
                logger.debug(
                    "[Monitor] health check positive, reverting to normal sampling"
                )
            self._downsample_factor = 0
        else:
            if self.downsample_factor < MAX_DOWNSAMPLE_FACTOR:
                self._downsample_factor += 1
            logger.debug(
                "[Monitor] health check negative, downsampling with a factor of %d",
                self._downsample_factor,
            )
    def check_health(self):
        # type: () -> None
        """
        Perform the actual health checks,
        currently only checks if the transport is rate-limited.
        TODO: augment in the future with more checks.
        """
        self._healthy = self.transport.is_healthy()
    def is_healthy(self):
        # type: () -> bool
        self._ensure_running()
        return self._healthy
    @property
    def downsample_factor(self):
        # type: () -> int
        self._ensure_running()
        return self._downsample_factor
    def kill(self):
        # type: () -> None
        self._running = False
    def __del__(self):
        # type: () -> None
        self.kill()