????JFIF??x?x????'
| Server IP : 104.21.30.238 / Your IP : 216.73.216.87 Web Server : LiteSpeed System : Linux premium151.web-hosting.com 4.18.0-553.44.1.lve.el8.x86_64 #1 SMP Thu Mar 13 14:29:12 UTC 2025 x86_64 User : tempvsty ( 647) PHP Version : 8.0.30 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : OFF | Pkexec : OFF Directory : /././opt/imunify360/venv/lib/python3.11/site-packages/pip/_internal/cli/ |
Upload File : |
import functools
import sys
from typing import Callable, Generator, Iterable, Iterator, Optional, Tuple, TypeVar
from pip._vendor.rich.progress import (
BarColumn,
DownloadColumn,
FileSizeColumn,
MofNCompleteColumn,
Progress,
ProgressColumn,
SpinnerColumn,
TextColumn,
TimeElapsedColumn,
TimeRemainingColumn,
TransferSpeedColumn,
)
from pip._internal.cli.spinners import RateLimiter
from pip._internal.req.req_install import InstallRequirement
from pip._internal.utils.logging import get_console, get_indentation
T = TypeVar("T")
ProgressRenderer = Callable[[Iterable[T]], Iterator[T]]
def _rich_download_progress_bar(
iterable: Iterable[bytes],
*,
bar_type: str,
size: Optional[int],
initial_progress: Optional[int] = None,
) -> Generator[bytes, None, None]:
assert bar_type == "on", "This should only be used in the default mode."
if not size:
total = float("inf")
columns: Tuple[ProgressColumn, ...] = (
TextColumn("[progress.description]{task.description}"),
SpinnerColumn("line", speed=1.5),
FileSizeColumn(),
TransferSpeedColumn(),
TimeElapsedColumn(),
)
else:
total = size
columns = (
TextColumn("[progress.description]{task.description}"),
BarColumn(),
DownloadColumn(),
TransferSpeedColumn(),
TextColumn("eta"),
TimeRemainingColumn(),
)
progress = Progress(*columns, refresh_per_second=5)
task_id = progress.add_task(" " * (get_indentation() + 2), total=total)
if initial_progress is not None:
progress.update(task_id, advance=initial_progress)
with progress:
for chunk in iterable:
yield chunk
progress.update(task_id, advance=len(chunk))
def _rich_install_progress_bar(
iterable: Iterable[InstallRequirement], *, total: int
) -> Iterator[InstallRequirement]:
columns = (
TextColumn("{task.fields[indent]}"),
BarColumn(),
MofNCompleteColumn(),
TextColumn("{task.description}"),
)
console = get_console()
bar = Progress(*columns, refresh_per_second=6, console=console, transient=True)
# Hiding the progress bar at initialization forces a refresh cycle to occur
# until the bar appears, avoiding very short flashes.
task = bar.add_task("", total=total, indent=" " * get_indentation(), visible=False)
with bar:
for req in iterable:
bar.update(task, description=rf"\[{req.name}]", visible=True)
yield req
bar.advance(task)
def _raw_progress_bar(
iterable: Iterable[bytes],
*,
size: Optional[int],
initial_progress: Optional[int] = None,
) -> Generator[bytes, None, None]:
def write_progress(current: int, total: int) -> None:
sys.stdout.write(f"Progress {current} of {total}\n")
sys.stdout.flush()
current = initial_progress or 0
total = size or 0
rate_limiter = RateLimiter(0.25)
write_progress(current, total)
for chunk in iterable:
current += len(chunk)
if rate_limiter.ready() or current == total:
write_progress(current, total)
rate_limiter.reset()
yield chunk
def get_download_progress_renderer(
*, bar_type: str, size: Optional[int] = None, initial_progress: Optional[int] = None
) -> ProgressRenderer[bytes]:
"""Get an object that can be used to render the download progress.
Returns a callable, that takes an iterable to "wrap".
"""
if bar_type == "on":
return functools.partial(
_rich_download_progress_bar,
bar_type=bar_type,
size=size,
initial_progress=initial_progress,
)
elif bar_type == "raw":
return functools.partial(
_raw_progress_bar,
size=size,
initial_progress=initial_progress,
)
else:
return iter # no-op, when passed an iterator
def get_install_progress_renderer(
*, bar_type: str, total: int
) -> ProgressRenderer[InstallRequirement]:
"""Get an object that can be used to render the install progress.
Returns a callable, that takes an iterable to "wrap".
"""
if bar_type == "on":
return functools.partial(_rich_install_progress_bar, total=total)
else:
return iter