????JFIF??x?x????'403WebShell
403Webshell
Server IP : 172.67.174.47  /  Your IP : 216.73.216.145
Web Server : LiteSpeed
System : Linux premium151.web-hosting.com 4.18.0-553.44.1.lve.el8.x86_64 #1 SMP Thu Mar 13 14:29:12 UTC 2025 x86_64
User : tempvsty ( 647)
PHP Version : 8.0.30
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : OFF  |  Pkexec : OFF
Directory :  /././././proc/self/root/opt/imunify360/venv/lib64/python3.11/site-packages/imav/wordpress/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /././././proc/self/root/opt/imunify360/venv/lib64/python3.11/site-packages/imav/wordpress/plugin.py
"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.


This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. 
See the GNU General Public License for more details.


You should have received a copy of the GNU General Public License
 along with this program.  If not, see <https://www.gnu.org/licenses/>.

Copyright © 2019 Cloud Linux Software Inc.

This software is also available under ImunifyAV commercial license,
see <https://www.imunify360.com/legal/eula>
"""
import asyncio
import json
import logging
import os
import pwd
import sentry_sdk
import shutil
import time
from collections import defaultdict
from distutils.version import LooseVersion
from pathlib import Path

from defence360agent.api import inactivity
from defence360agent.contracts.config import (
    MalwareScanSchedule,
    MalwareScanScheduleInterval as Interval,
)
from defence360agent.utils import check_run
from imav.model.wordpress import WPSite
from imav.wordpress import cli, telemetry, PLUGIN_VERSION_FILE
from imav.wordpress.utils import (
    build_command_for_user,
    calculate_next_scan_timestamp,
    clear_get_cagefs_enabled_users_cache,
    get_last_scan,
    get_malware_history,
    prepare_scan_data,
    write_plugin_data_file_atomically,
)
from imav.wordpress.site_repository import (
    delete_site,
    get_outdated_sites,
    get_sites_for_user,
    get_sites_to_install,
    get_sites_to_mark_as_manually_deleted,
    get_sites_to_uninstall,
    insert_installed_sites,
    mark_site_as_manually_deleted,
    update_site_version,
)

from imav.wordpress.proxy_auth import setup_site_authentication

logger = logging.getLogger(__name__)

COMPONENTS_DB_PATH = Path(
    "/var/lib/cloudlinux-app-version-detector/components_versions.sqlite3"
)


def clear_caches():
    """Clear all WordPress-related caches."""
    clear_get_cagefs_enabled_users_cache()
    cli.clear_get_content_dir_cache()


def site_search(items: dict, user_info: pwd.struct_passwd, matcher) -> dict:
    # Get all WordPress sites for the user (the main site is always last)
    user_sites = get_sites_for_user(user_info)
    result = {path: [] for path in user_sites}
    for item in items:
        # Find all matching sites for this item
        matching_sites = [path for path in user_sites if matcher(item, path)]

        if matching_sites:
            # Find the most specific (longest) matching path
            most_specific_site = max(matching_sites, key=len)
            result[most_specific_site].append(item)

    return result


async def _get_scan_data_for_user(sink, user_info: pwd.struct_passwd):
    # Get the last scan data
    last_scan = await get_last_scan(sink, user_info.pw_name)

    # Extract the last scan date
    last_scan_time = last_scan.get("scan_date", None)

    next_scan_time = None
    if MalwareScanSchedule.INTERVAL != Interval.NONE:
        next_scan_time = calculate_next_scan_timestamp()

    # Get the malware history for the user
    malware_history = get_malware_history(user_info.pw_name)

    # Split malware history by site. This part relies on the main site being the last one in the list.
    # Without this all malware could be attributed to the main site.
    malware_by_site = site_search(
        malware_history,
        user_info,
        lambda item, path: item["resource_type"] == "file"
        and item["file"].startswith(path),
    )

    return last_scan_time, next_scan_time, malware_by_site


async def _send_telemetry_task(coro, semaphore: asyncio.Semaphore):
    async with semaphore:
        try:
            await coro
        except Exception as e:
            logger.error(f"Telemetry task failed: {e}")


async def process_telemetry_tasks(coroutines: list, concurrency=10):
    """
    Process a list of telemetry coroutines with a concurrency limit.s
    """
    if not coroutines:
        return

    semaphore = asyncio.Semaphore(concurrency)
    tasks = [
        asyncio.create_task(_send_telemetry_task(coro, semaphore))
        for coro in coroutines
    ]

    try:
        await asyncio.gather(*tasks)
    except Exception as e:
        logger.error(f"Some telemetry tasks failed: {e}")


async def install_everywhere(sink):
    """Install the imunify-security plugin for all sites where it is not installed."""
    logger.info("Installing imunify-security wp plugin")

    # Keep track of the installed sites
    installed = set()
    authenticated = set()
    failed = set()
    telemetry_coros = []
    with inactivity.track.task("wp-plugin-installation"):
        try:
            clear_caches()

            to_install = get_sites_to_install()

            if not to_install:
                return

            # Group sites by user id
            sites_by_user = defaultdict(list)
            for site in to_install:
                sites_by_user[site.uid].append(site)

            # Now iterate over the grouped sites
            for uid, sites in sites_by_user.items():
                try:
                    user_info = pwd.getpwuid(uid)
                    username = user_info.pw_name
                except Exception as error:
                    sentry_sdk.capture_message(
                        "Skipping installation of WordPress plugin on"
                        " {count} site(s) because they belong to user"
                        " {user} and it is not possible to retrieve"
                        " username for this user. Reason: {reason}".format(
                            count=len(sites),
                            user=uid,
                            reason=error,
                        ),
                        level="warning",
                    )
                    continue

                (
                    last_scan_time,
                    next_scan_time,
                    malware_by_site,
                ) = await _get_scan_data_for_user(sink, user_info)

                for site in sites:
                    if await remove_site_if_missing(sink, site):
                        continue
                    try:
                        # Check if site is correctly installed and accessible using WP CLI
                        is_wordpress_installed = (
                            await cli.is_wordpress_installed(site)
                        )
                        if not is_wordpress_installed:
                            sentry_sdk.capture_message(
                                "WordPress site is not accessible using WP"
                                " CLI. site={site}".format(site=site),
                                level="warning",
                            )
                            continue

                        # Prepare scan data
                        scan_data = prepare_scan_data(
                            last_scan_time,
                            next_scan_time,
                            username,
                            site,
                            malware_by_site,
                        )

                        # Create the scan data file
                        await update_scan_data_file(site, scan_data)
                        await update_site_auth(
                            site, user_info, authenticated, failed
                        )

                        # Install the plugin
                        await cli.plugin_install(site)

                        # Get the version of the plugin
                        version = await cli.get_plugin_version(site)

                        if not version:
                            installed.add(site)
                        else:
                            installed.add(
                                WPSite.build_with_version(site, version)
                            )

                        # Prepare telemetry
                        telemetry_coros.append(
                            telemetry.send_event(
                                sink=sink,
                                event="installed_by_imunify",
                                site=site,
                                version=version,
                            )
                        )
                    except Exception as error:
                        logger.error(
                            "Failed to install plugin to site=%s error=%r",
                            site,
                            error,
                        )
            logger.info(
                "Installed imunify-security wp plugin on %d sites",
                len(installed),
            )
            if failed:
                logger.warning(
                    "Failed to authenticate %d sites",
                    len(failed),
                )
        except asyncio.CancelledError:
            logger.info(
                "Installation imunify-security wp plugin was cancelled. Plugin"
                " was installed for %d sites",
                len(installed),
            )
        except Exception as error:
            logger.error(
                "Error occurred during plugin installation. error=%r", error
            )
            raise
        finally:
            # Insert the installed sites into the database
            insert_installed_sites(installed)
            # Send telemetry
            await process_telemetry_tasks(telemetry_coros)


def get_latest_plugin_version() -> str:
    """Get the latest version of the imunify-security plugin from the version file."""
    try:
        if not PLUGIN_VERSION_FILE.exists():
            logger.error(
                "Plugin version file does not exist: %s", PLUGIN_VERSION_FILE
            )
            return None
        return PLUGIN_VERSION_FILE.read_text().strip()
    except Exception as e:
        logger.error("Failed to read plugin version file: %s", e)
        return None


async def update_everywhere(sink):
    """Update the imunify-security plugin on all sites where it is installed."""
    latest_version = get_latest_plugin_version()
    if not latest_version:
        logger.error("Could not determine latest plugin version")
        return

    logger.info(
        "Updating imunify-security wp plugin to the latest version %s",
        latest_version,
    )

    updated = set()
    telemetry_coros = []
    with inactivity.track.task("wp-plugin-update"):
        try:
            # Get sites with outdated versions
            outdated_sites = get_outdated_sites(latest_version)
            logger.info(f"Found {len(outdated_sites)} outdated sites")

            if not outdated_sites:
                return

            # Group sites by user id
            sites_by_user = defaultdict(list)
            for site in outdated_sites:
                sites_by_user[site.uid].append(site)

            # Process each user's sites
            for uid, sites in sites_by_user.items():
                try:
                    user_info = pwd.getpwuid(uid)
                    username = user_info.pw_name
                except Exception as error:
                    logger.error(
                        "Failed to get username for uid=%d. error=%s",
                        uid,
                        error,
                    )
                    continue

                # Get scan data once for all sites of this user
                (
                    last_scan_time,
                    next_scan_time,
                    malware_by_site,
                ) = await _get_scan_data_for_user(sink, user_info)

                for site in sites:
                    if await remove_site_if_missing(sink, site):
                        continue
                    try:
                        # Check if site still exists
                        if not await cli.is_wordpress_installed(site):
                            logger.info(
                                "WordPress site no longer exists: %s", site
                            )
                            continue

                        # Prepare scan data
                        scan_data = prepare_scan_data(
                            last_scan_time,
                            next_scan_time,
                            username,
                            site,
                            malware_by_site,
                        )

                        # Update the scan data file
                        await update_scan_data_file(site, scan_data)

                        # Now update the plugin
                        await cli.plugin_update(site)
                        updated.add(site)

                        # Get the version after update
                        version = await cli.get_plugin_version(site)
                        if version:
                            # Store original version for comparison
                            original_version = site.version

                            # Update the database with the new version
                            update_site_version(site, version)

                            # Create a new WPSite with updated version
                            site = site.build_with_version(version)

                            # Determine if this is a downgrade
                            is_downgrade = LooseVersion(
                                version
                            ) < LooseVersion(original_version)

                            # Prepare telemetry
                            telemetry_coros.append(
                                telemetry.send_event(
                                    sink=sink,
                                    event="downgraded_by_imunify"
                                    if is_downgrade
                                    else "updated_by_imunify",
                                    site=site,
                                    version=version,
                                )
                            )

                    except Exception as error:
                        logger.error(
                            "Failed to update plugin on site=%s error=%s",
                            site,
                            error,
                        )

            logger.info(
                "Updated imunify-security wp plugin on %d sites",
                len(updated),
            )
        except asyncio.CancelledError:
            logger.info(
                "Update of imunify-security wp plugin was cancelled. Plugin"
                " was updated on %d sites",
                len(updated),
            )
        except Exception as error:
            logger.error(
                "Error occurred during plugin update. error=%s", error
            )
            raise
        finally:
            # Send telemetry
            await process_telemetry_tasks(telemetry_coros)


async def delete_plugin_files(site: WPSite):
    data_dir = await cli.get_data_dir(site)
    if data_dir.exists():
        await asyncio.to_thread(shutil.rmtree, data_dir)


async def remove_from_single_site(site: WPSite, sink, telemetry_coros) -> int:
    """
    Remove the imunify-security plugin from a single site, including all cleanup and telemetry.
    Returns the number of affected sites (should be 1 if deletion was successful).
    This function is intended to be protected with asyncio.shield to ensure it completes even if the parent task is cancelled.
    """
    try:
        # Check if site is still installed and accessible using WP CLI
        is_installed = await cli.is_plugin_installed(site)
        if not is_installed:
            # Plugin is no longer installed. It was removed manually by the user.
            await process_manually_deleted_plugin(
                site, time.time(), sink, telemetry_coros
            )
            return 0

        # Get the version of the plugin (for telemetry data)
        version = await cli.get_plugin_version(site)

        # Uninstall the plugin from WordPress site.
        await cli.plugin_uninstall(site)

        # Delete the data files from the site.
        await delete_plugin_files(site)

        # Delete the site from database.
        affected = delete_site(site)

        # Send telemetry for successful uninstall
        telemetry_coros.append(
            telemetry.send_event(
                sink=sink,
                event="uninstalled_by_imunify",
                site=site,
                version=version,
            )
        )
        return affected
    except Exception as error:
        # Log any error that occurs during the removal process
        logger.error("Failed to remove plugin from %s %s", site, error)
        return 0


async def remove_all_installed(sink):
    """Remove the imunify-security plugin from all sites where it is installed."""
    logger.info("Deleting imunify-security wp plugin")

    telemetry_coros = []
    affected = 0
    with inactivity.track.task("wp-plugin-removal"):
        try:
            clear_caches()

            to_remove = get_sites_to_uninstall()

            for site in to_remove:
                try:
                    affected += await asyncio.shield(
                        remove_from_single_site(site, sink, telemetry_coros)
                    )
                except asyncio.CancelledError:
                    logger.info(
                        "Deleting imunify-security wp plugin was cancelled."
                        " Plugin was deleted from %d sites (out of %d)",
                        affected,
                        len(to_remove),
                    )
        except Exception as error:
            logger.error("Error occurred during plugin deleting. %s", error)
            raise
        finally:
            logger.info(
                "Removed imunify-security wp plugin from %s sites",
                affected,
            )

            #  send telemetry
            await process_telemetry_tasks(telemetry_coros)


async def process_manually_deleted_plugin(site, now, sink, telemetry_coros):
    """
    Process the manually deleted plugin for a single site.

    Args:
        site: The site to process.
        now: The current time.
        sink: The telemetry/event sink.
        telemetry_coros: The list of telemetry coroutines to add the event to.

    The process includes:
    - marking the site as manually deleted in the database
    - removing plugin data files
    - sending telemetry for manual removal
    """
    try:
        # Mark the site as manually deleted in the database
        mark_site_as_manually_deleted(site, now)

        # Remove plugin data files
        await delete_plugin_files(site)

        # Send telemetry for manual removal
        telemetry_coros.append(
            telemetry.send_event(
                sink=sink,
                event="removed_by_user",
                site=site,
                version=site.version,
            )
        )
    except Exception as error:
        logger.error(
            "Failed to process manually deleted plugin for site=%s error=%s",
            site,
            error,
        )


async def tidy_up_manually_deleted(sink):
    """
    Tidy up sites that have been manually deleted by the user.

    Args:
        sink: The telemetry/event sink.
    """
    telemetry_coros = []
    try:
        to_mark_as_manually_removed = get_sites_to_mark_as_manually_deleted()
        if to_mark_as_manually_removed:
            now = time.time()
            for site in to_mark_as_manually_removed:
                await process_manually_deleted_plugin(
                    site, now, sink, telemetry_coros
                )

    except Exception as error:
        logger.error("Error occurred during site tidy up. %s", error)
    finally:
        if telemetry_coros:
            await process_telemetry_tasks(telemetry_coros)


async def update_data_on_sites(sink, sites: list[WPSite]):
    if not sites:
        return

    # Group sites by user id
    sites_by_user = defaultdict(list)
    for site in sites:
        sites_by_user[site.uid].append(site)

    # Now iterate over the grouped sites
    for uid, sites in sites_by_user.items():
        try:
            user_info = pwd.getpwuid(uid)
            username = user_info.pw_name
        except Exception as error:
            logger.error(
                "Failed to get username for uid=%d. error=%s",
                uid,
                error,
            )
            continue

        (
            last_scan_time,
            next_scan_time,
            malware_by_site,
        ) = await _get_scan_data_for_user(sink, user_info)

        for site in sites:
            if await remove_site_if_missing(sink, site):
                continue
            try:
                # Prepare scan data
                scan_data = prepare_scan_data(
                    last_scan_time,
                    next_scan_time,
                    username,
                    site,
                    malware_by_site,
                )

                # Update the scan data file
                await update_scan_data_file(site, scan_data)
            except Exception as error:
                logger.error(
                    "Failed to update scan data on site=%s error=%s",
                    site,
                    error,
                )


async def update_scan_data_file(site: WPSite, scan_data: dict):
    # Get the gid for the given user
    user_info = pwd.getpwuid(site.uid)
    gid = user_info.pw_gid

    # Create data directory
    data_dir = await cli.get_data_dir(site)
    if os.path.islink(data_dir):
        # If the data directory is a symlink, interrupt the process.
        raise Exception(
            "Data directory %s is a symlink, skipping.", str(data_dir)
        )

    if not data_dir.exists():
        command = build_command_for_user(
            user_info.pw_name,
            [
                "mkdir",
                "-p",
                str(data_dir),
            ],
        )

        await check_run(command)

        if not data_dir.exists():
            # Directory creation failed. Interrupt the process.
            raise Exception(
                "Failed to create directory %s for user %s",
                str(data_dir),
                user_info.pw_name,
            )

        # we can safely change the permissions of the directory because we just created it
        data_dir.chmod(0o750)

    scan_data_path = data_dir / "scan_data.php"

    # Format the PHP file content
    php_content = (
        "<?php\n"
        "if ( ! defined( 'WPINC' ) ) {\n"
        "\texit;\n"
        "}\n"
        "return json_decode( '"
        + json.dumps(scan_data).replace("'", "\\'")
        + "', true );"
    )

    # Write the formatted PHP file
    write_plugin_data_file_atomically(
        scan_data_path, php_content, uid=site.uid, gid=gid
    )


async def update_auth_everywhere():
    """Update auth.php files for all existing WordPress sites."""
    logger.info("Updating auth.php files for existing WordPress sites")

    updated = set()
    failed = set()

    with inactivity.track.task("wp-auth-update"):
        try:
            clear_caches()

            # Get all installed sites instead of sites to install
            # get_sites_to_uninstall returns all active sites from db
            installed_sites = get_sites_to_uninstall()

            if not installed_sites:
                logger.info("No installed WordPress sites found")
                return

            sites_by_user = defaultdict(list)
            for site in installed_sites:
                sites_by_user[site.uid].append(site)

            # Process users concurrently
            tasks = []
            for uid, sites in sites_by_user.items():
                try:
                    user_info = pwd.getpwuid(uid)
                    # Create tasks for all sites of this user
                    for site in sites:
                        task = update_site_auth(
                            site, user_info, updated, failed
                        )
                        tasks.append(task)
                except Exception as error:
                    sentry_sdk.capture_message(
                        "Skipping auth update for WordPress sites on"
                        " {count} site(s) because they belong to user"
                        " {user} and it is not possible to retrieve"
                        " username for this user. Reason: {reason}".format(
                            count=len(sites),
                            user=uid,
                            reason=error,
                        ),
                        level="warning",
                    )
                    continue

            # Run all site updates concurrently with a reasonable limit
            # Adjust max_concurrent based on your system's I/O capacity
            max_concurrent = 10
            for i in range(0, len(tasks), max_concurrent):
                batch = tasks[i : i + max_concurrent]
                await asyncio.gather(*batch, return_exceptions=True)

            logger.info(
                "Updated auth.php files for %d WordPress sites, %d failed",
                len(updated),
                len(failed),
            )

        except asyncio.CancelledError:
            logger.info(
                "Auth update for WordPress sites was cancelled. Auth was"
                " updated for %d sites",
                len(updated),
            )
        except Exception as error:
            logger.error("Error occurred during auth update. error=%s", error)
            raise


async def update_site_auth(site, user_info, updated, failed):
    """Process authentication setup for a single site."""
    try:
        await setup_site_authentication(site, user_info)
        updated.add(site)
    except Exception as error:
        failed.add(site)
        logger.error(
            "Failed to update auth for site=%s error=%s",
            site,
            error,
        )


async def remove_site_if_missing(sink, site: WPSite) -> bool:
    """
    Checks if the site directory exists. If not, removes the site from the local database and sends a 'site_removed' telemetry event only if deletion is successful.
    Returns True if the site was removed (directory missing), False otherwise.
    Parameters:
        sink: The telemetry/event sink.
        site: The WPSite object to check and potentially remove.
    Side effect: If the site is missing and successfully deleted from database, a telemetry event will be sent.
    """
    if os.path.isdir(site.docroot):
        return False

    # Attempt to delete the site from the database first
    rows_deleted = delete_site(site)

    # Only send telemetry if the deletion was successful (at least one row was deleted)
    if rows_deleted > 0:
        await telemetry.send_event(
            sink=sink, event="site_removed", site=site, version=site.version
        )
    else:
        logger.warning(
            "Failed to delete missing site %s from database, no rows affected",
            site,
        )

        sentry_sdk.capture_message(
            "Failed to delete missing site {site} from database".format(
                site=site
            ),
            level="warning",
        )

    return True


async def fix_site_data_file_permissions(
    site: WPSite, file_permissions: int
) -> bool:
    """
    Fix data file permissions for a single WordPress site.

    Args:
        site: The WordPress site to fix permissions for
        file_permissions: The file permissions to set (e.g., 0o440 or 0o400)

    Returns:
        bool: True if permissions were fixed successfully, False otherwise
    """
    try:
        # Get the data directory
        data_dir = await cli.get_data_dir(site)
        if not data_dir.exists():
            return False

        # Fix directory permissions (0o750) only if not already correct
        current_dir_mode = data_dir.stat().st_mode & 0o777
        if current_dir_mode != 0o750:
            data_dir.chmod(0o750)

        for file_name in ["scan_data.php", "auth.php"]:
            file_path = data_dir / file_name
            if file_path.exists():
                # Set permissions based on hosting panel only if not already correct
                current_file_mode = file_path.stat().st_mode & 0o777
                if current_file_mode != file_permissions:
                    file_path.chmod(file_permissions)

        return True
    except Exception as error:
        logger.error(
            "Failed to fix permissions for site=%s error=%s",
            site,
            error,
        )
        return False


async def fix_data_file_permissions_everywhere(sink):
    """
    Fix data file permissions for all WordPress sites with imunify-security plugin installed.

    Args:
        sink: The telemetry/event sink
    """
    fixed = set()
    failed = set()

    with inactivity.track.task("wp-plugin-fix-permissions"):
        try:
            clear_caches()

            # Get all installed sites
            installed_sites = get_sites_to_uninstall()
            if not installed_sites:
                return

            # Determine file permissions based on hosting panel
            from defence360agent.subsys.panels.hosting_panel import (
                HostingPanel,
            )
            from defence360agent.subsys.panels.plesk import Plesk

            file_permissions = (
                0o440 if HostingPanel().NAME == Plesk.NAME else 0o400
            )

            # Process sites
            for site in installed_sites:
                if await remove_site_if_missing(sink, site):
                    continue

                success = await fix_site_data_file_permissions(
                    site, file_permissions
                )
                if success:
                    fixed.add(site)
                else:
                    failed.add(site)

            logger.info(
                "Fixed data file permissions for %d WordPress sites, %d"
                " failed",
                len(fixed),
                len(failed),
            )

        except asyncio.CancelledError:
            logger.info(
                "Fixing data file permissions was cancelled. Permissions were"
                " fixed for %d sites",
                len(fixed),
            )
        except Exception as error:
            logger.error(
                "Error occurred during permission fixing. error=%s", error
            )

Youez - 2016 - github.com/yon3zu
LinuXploit