????JFIF??x?x????'
Server IP : 104.21.64.1 / Your IP : 216.73.216.243 Web Server : LiteSpeed System : Linux premium151.web-hosting.com 4.18.0-553.44.1.lve.el8.x86_64 #1 SMP Thu Mar 13 14:29:12 UTC 2025 x86_64 User : tempvsty ( 647) PHP Version : 8.0.30 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : OFF | Pkexec : OFF Directory : /proc/self/root/opt/imunify360/venv/lib/python3.11/site-packages/imav/wordpress/ |
Upload File : |
""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>. Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see <https://www.imunify360.com/legal/eula> """ import logging import pwd from pathlib import Path from peewee import SqliteDatabase from imav.model.wordpress import WPSite, WordpressSite from imav.wordpress import PLUGIN_SLUG logger = logging.getLogger(__name__) COMPONENTS_DB_PATH = Path( "/var/lib/cloudlinux-app-version-detector/components_versions.sqlite3" ) def get_sites_by_path(path: str) -> list[WPSite]: """ Get a list of WordPress sites that match the given path. Args: path: The path to search for WordPress sites. Returns: A list of WPSite objects that match the path. """ if not COMPONENTS_DB_PATH.exists(): logger.error( "App detector database '%s' couldn't be found.", str(COMPONENTS_DB_PATH), ) return list() cursor = SqliteDatabase(COMPONENTS_DB_PATH).execute_sql( f""" WITH latest_reports AS ( SELECT id, uid, domain FROM report WHERE id IN ( SELECT MAX(id) FROM report WHERE domain IS NOT NULL AND domain != '' GROUP BY dir ) ) SELECT wp.real_path, lr.domain, lr.uid FROM apps AS wp INNER JOIN latest_reports AS lr ON wp.report_id = lr.id WHERE wp.title = 'wp_core' AND wp.parent_id IS NULL AND wp.real_path LIKE '{path}%' """ ) return [ WPSite(docroot=row[0], domain=row[1], uid=int(row[2])) for row in cursor.fetchall() ] def get_sites_for_user(user_info: pwd.struct_passwd) -> list[str]: """ Get a set of paths to WordPress sites belonging to a particular user. Paths are sorted by their length to make sure that the main site is the last one in the list. The data is pulled from the app-version-detector database. Args: user_info: The user info with ID to get sites for. Returns: A list of paths to WordPress sites. """ if not COMPONENTS_DB_PATH.exists() or user_info is None: logger.error( "App detector database '%s' couldn't be found.", str(COMPONENTS_DB_PATH), ) return list() if user_info is None: logger.error( "No user info provided for getting sites", ) return list() cursor = SqliteDatabase(COMPONENTS_DB_PATH).execute_sql( f""" WITH latest_reports AS ( SELECT MAX(id) as id FROM report WHERE uid = {user_info.pw_uid} GROUP BY dir ) SELECT wp.real_path FROM apps AS wp INNER JOIN latest_reports AS lr ON wp.report_id = lr.id WHERE wp.title = 'wp_core' AND wp.parent_id IS NULL GROUP BY wp.real_path ORDER BY length(wp.real_path) DESC """ ) return [row[0] for row in cursor.fetchall()] def get_sites_without_plugin() -> set[WPSite]: """ Get a set of wp sites where imunify-security plugin is not installed. The data is pulled from the app-version-detector database. Returns: A set of WPSite objects where the plugin is not installed. """ if not COMPONENTS_DB_PATH.exists(): logger.error( "App detector database '%s' couldn't be found.", str(COMPONENTS_DB_PATH), ) return set() cursor = SqliteDatabase(COMPONENTS_DB_PATH).execute_sql( f""" WITH latest_reports AS ( SELECT id, uid, domain FROM report WHERE id IN ( SELECT MAX(id) FROM report WHERE domain IS NOT NULL AND domain != '' GROUP BY dir ) ) SELECT wp.real_path, lr.domain, lr.uid FROM apps AS wp INNER JOIN latest_reports AS lr ON wp.report_id = lr.id WHERE wp.title = 'wp_core' AND wp.parent_id IS NULL AND NOT EXISTS ( SELECT 1 FROM apps AS plugin WHERE plugin.parent_id = wp.id AND plugin.title = 'wp_plugin_{PLUGIN_SLUG.replace("-", "_")}' ) """ ) return { WPSite(docroot=row[0], domain=row[1], uid=int(row[2])) for row in cursor.fetchall() } def get_sites_to_install() -> set[WPSite]: """ Get a set of WordPress sites where we need to install the plugin. This is determined by finding sites that don't have the plugin installed and are not already tracked in our database. Returns: A set of WPSite objects where the plugin needs to be installed. """ sites_without_plugin = get_sites_without_plugin() existing_sites = { WPSite.from_wordpress_site(r) for r in WordpressSite.select() } return sites_without_plugin - existing_sites def insert_installed_sites(sites: set[WPSite]) -> None: """ Insert a set of installed WordPress sites into the database. This is used to track which sites have the plugin installed. Args: sites: A set of WPSite objects representing sites where the plugin was installed. """ if not sites: return WordpressSite.insert_many( [ { "domain": site.domain, "docroot": site.docroot, "uid": site.uid, "version": site.version, "manually_deleted_at": None, } for site in sites ] ).execute() def get_outdated_sites(latest_version: str) -> list[WPSite]: """ Get a list of WordPress sites that have outdated plugin versions. Args: latest_version: The latest available plugin version to compare against. Returns: A list of WPSite objects that have versions older than latest_version. """ if not latest_version: logger.error( "Cannot get outdated sites without a valid latest version" ) return [] return [ WPSite.from_wordpress_site(r) for r in WordpressSite.select().where( WordpressSite.manually_deleted_at.is_null(), WordpressSite.version != latest_version, ) ] def mark_site_as_manually_deleted(site: WPSite, timestamp: float) -> None: """ Mark a WordPress site as manually deleted in the database. Args: site: The WPSite object to mark as deleted timestamp: The timestamp when the site was deleted """ logger.info( "Mark site %s as manually deleted at %s (WP-Plugin removed)", site, timestamp, ) ( WordpressSite.update(manually_deleted_at=timestamp) .where(WordpressSite.docroot == site.docroot) .execute() ) def get_sites_to_mark_as_manually_deleted() -> set[WPSite]: """ Get a set of WordPress sites that should be marked as manually deleted. These are sites that are in our database but no longer have the plugin installed. Returns: set[WPSite]: A set of WordPress sites that should be marked as manually deleted """ # Get sites without plugin from AVD database sites_without_plugin = get_sites_without_plugin() # Get sites from our database that haven't been marked as manually deleted sites_from_db = { WPSite.from_wordpress_site(r) for r in WordpressSite.select().where( WordpressSite.manually_deleted_at.is_null() ) } # Return intersection of these sets - sites that are in our DB but no longer have the plugin return sites_without_plugin & sites_from_db def update_site_version(site: WPSite, version: str) -> None: """ Update the version of a WordPress site in the database. Args: site: The WPSite object to update version: The new version to set """ WordpressSite.update(version=version).where( WordpressSite.docroot == site.docroot ).execute() def get_sites_to_uninstall() -> list[WPSite]: """ Get a list of WordPress sites that haven't been marked as manually deleted. Returns: A list of WPSite objects representing non-deleted sites. """ return [ WPSite.from_wordpress_site(site) for site in WordpressSite.select().where( WordpressSite.manually_deleted_at.is_null(True) ) ] def delete_site(site: WPSite) -> int: """ Delete a WordPress site from the database. Args: site: The WPSite object to delete Returns: The number of rows affected by the delete operation """ return ( WordpressSite.delete() .where(WordpressSite.docroot == site.docroot) .execute() )