????JFIF??x?x????'
Server IP : 104.21.64.1 / Your IP : 216.73.216.243 Web Server : LiteSpeed System : Linux premium151.web-hosting.com 4.18.0-553.44.1.lve.el8.x86_64 #1 SMP Thu Mar 13 14:29:12 UTC 2025 x86_64 User : tempvsty ( 647) PHP Version : 8.0.30 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : OFF | Pkexec : OFF Directory : /proc/self/root/opt/imunify360/venv/lib/python3.11/site-packages/imav/wordpress/ |
Upload File : |
""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>. Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see <https://www.imunify360.com/legal/eula> """ import jwt import logging import asyncio import pwd import os import secrets from datetime import datetime, timedelta from functools import lru_cache from pathlib import Path from defence360agent.utils import atomic_rewrite, check_run from imav.wordpress.utils import get_data_dir logger = logging.getLogger(__name__) DEFAULT_TOKEN_EXPIRATION = timedelta(hours=72) JWT_SECRET_PATH = "/etc/imunify-agent-proxy/jwt-secret" JWT_SECRET_PATH_OLD = "/etc/imunify-agent-proxy/jwt-secret.old" PROXY_SERVICE_NAME = "imunify-agent-proxy" SECRET_EXPIRATION_TTL = timedelta(days=7) def is_secret_expired(): try: stat = os.stat(JWT_SECRET_PATH) except FileNotFoundError: st_mtime = 0.0 else: st_mtime = stat.st_mtime return ( datetime.now().timestamp() - st_mtime > SECRET_EXPIRATION_TTL.seconds ) def rotate_secret(): """Load JWT secret from the configured file path.""" secret_path = Path(JWT_SECRET_PATH) try: logger.info( "Rotating proxy auth secret", ) stub_secret = secrets.token_bytes(32) secret_path.parent.mkdir(mode=0o700, parents=True, exist_ok=True) secret_path.touch(mode=0o600) atomic_rewrite( secret_path, stub_secret, uid=-1, backup=str(JWT_SECRET_PATH_OLD), permissions=0o600, ) check_run(["systemctl", "restart", PROXY_SERVICE_NAME]) except Exception as e: logger.error("Got error while rotating the secret %s", e) @lru_cache(1) def load_secret_from_file() -> bytes: """Load JWT secret from the configured file path.""" try: with open(JWT_SECRET_PATH, "rb") as f: return f.read().strip() except FileNotFoundError: logger.error("JWT secret file not found at %s", JWT_SECRET_PATH) raise except Exception as e: logger.error("Failed to read JWT secret: %s", e) raise def generate_token(username: str, docroot: str) -> str: """ Generate a JWT token for the given username and docroots. Args: username: The username for the token docroot: document root paths the user has access to Returns: The JWT token string """ exp_time = datetime.utcnow() + DEFAULT_TOKEN_EXPIRATION claims = {"exp": exp_time, "username": username, "site_path": docroot} try: token = jwt.encode(claims, load_secret_from_file(), algorithm="HS256") return token except Exception as e: logger.error("Failed to generate JWT token: %s", e) raise def _write_auth_php_file(auth_file_path: Path, php_content, uid, gid: int): """Synchronous function to write the auth.php file.""" if not auth_file_path.exists(): auth_file_path.touch() atomic_rewrite( auth_file_path, php_content, backup=False, uid=uid, gid=gid, permissions=0o400, ) async def create_auth_php_file(site, token: str, uid, gid: int) -> None: """ Create the auth.php file in the site's imunify-security directory. Args: site: WPSite instance token: JWT token string uid, gid: int used for file creation """ try: auth_file_path = get_data_dir(site) / "auth.php" php_content = f"""<?php return array( \t'token' => '{token}', ); """ # Run the file write operation in a thread pool await asyncio.to_thread( _write_auth_php_file, auth_file_path, php_content, uid, gid ) logger.info( "Created auth.php file for site %s at %s", site, auth_file_path ) except Exception as e: logger.error("Failed to create auth.php file for site %s: %s", site, e) raise async def setup_site_authentication( site, user_info: pwd.struct_passwd ) -> None: """ Set up authentication for a site by creating JWT token and auth.php file. Args: site: WPSite instance user_info: pwd.struct_passwd data """ try: token = generate_token(user_info.pw_name, str(site.docroot)) await create_auth_php_file( site, token, user_info.pw_uid, user_info.pw_gid ) logger.info("Successfully set up authentication for site %s", site) except Exception as e: logger.error( "Failed to set up authentication for site %s: %s", site, e ) raise