????JFIF??x?x????'
| Server IP : 104.21.30.238 / Your IP : 216.73.216.145 Web Server : LiteSpeed System : Linux premium151.web-hosting.com 4.18.0-553.44.1.lve.el8.x86_64 #1 SMP Thu Mar 13 14:29:12 UTC 2025 x86_64 User : tempvsty ( 647) PHP Version : 8.0.30 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : OFF | Pkexec : OFF Directory : /././././proc/self/root/opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/ |
Upload File : |
"""Provide Router for db migrations."""
import os
from contextlib import suppress
from peewee_migrate import Router as PeeweeRouter
from peewee_migrate.router import void
__all__ = ["Router"]
class Router(PeeweeRouter):
"""Like peewee_migrate.Router but supports multiple migrations dirs."""
# this is a slightly edited version from peewee_migrate.router.Router
def __init__(self, database, migrations_dirs, **kwargs):
super().__init__(database, migrate_dir=migrations_dirs[0], **kwargs)
self.migrations_dirs = migrations_dirs
@property
def todo(self):
"""Scan migrations in file system."""
for migrate_dir in self.migrations_dirs:
if not os.path.exists(migrate_dir):
self.logger.warn(
"Migration directory: %s does not exist.", migrate_dir
)
os.makedirs(migrate_dir)
migration_names = []
for migrate_dir in self.migrations_dirs:
migration_names += sorted(
f[: -len(".py")]
for f in os.listdir(migrate_dir)
if self.filemask.match(f)
)
return migration_names
def read(self, name):
"""Read migration from file."""
scope = {}
for migrate_dir in self.migrations_dirs:
with suppress(FileNotFoundError):
with open(os.path.join(migrate_dir, name + ".py")) as f:
code = compile(
f.read(), "<string>", "exec", dont_inherit=True
)
exec(code, scope)
return scope.get("migrate", void), scope.get("rollback", void)