mirror of
https://github.com/decompals/wibo.git
synced 2025-12-11 06:27:45 +00:00
307 lines
9.5 KiB
Python
307 lines
9.5 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Reusable venv management for PEP 723 inline script dependencies.
|
|
|
|
This module provides utilities to:
|
|
1. Parse PEP 723 inline script metadata blocks
|
|
2. Create and manage virtual environments
|
|
3. Track dependencies and reinstall when they change
|
|
"""
|
|
|
|
import contextlib
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import venv
|
|
from pathlib import Path
|
|
|
|
SCRIPT_BLOCK_RE = re.compile(r"(?m)^# /// script$\s(?P<content>(^#(| .*)$\s)+)^# ///$")
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def _venv_lock(venv_dir: Path, timeout: float = 300.0):
|
|
"""
|
|
Context manager for file-based locking of venv operations.
|
|
|
|
Uses the .script-managed file within the venv directory for locking,
|
|
synchronizing venv creation and pip operations across multiple processes.
|
|
|
|
Args:
|
|
venv_dir: Path to the virtual environment directory
|
|
timeout: Maximum seconds to wait for lock (default: 5 minutes)
|
|
"""
|
|
venv_dir.mkdir(parents=True, exist_ok=True)
|
|
lock_file = venv_dir / ".script-managed"
|
|
fd = os.open(str(lock_file), os.O_CREAT | os.O_RDWR, 0o644)
|
|
|
|
try:
|
|
start_time = time.time()
|
|
locked = False
|
|
|
|
while not locked:
|
|
try:
|
|
if os.name == "nt":
|
|
import msvcrt
|
|
|
|
# msvcrt.locking locks from current file position
|
|
os.lseek(fd, 0, os.SEEK_SET)
|
|
msvcrt.locking(fd, msvcrt.LK_NBLCK, 1)
|
|
locked = True
|
|
else:
|
|
import fcntl
|
|
|
|
fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
|
locked = True
|
|
except (IOError, OSError):
|
|
elapsed = time.time() - start_time
|
|
if elapsed >= timeout:
|
|
raise TimeoutError(f"Failed to acquire venv lock after {timeout}s")
|
|
time.sleep(0.1)
|
|
|
|
yield
|
|
|
|
finally:
|
|
try:
|
|
if os.name == "nt":
|
|
import msvcrt
|
|
|
|
os.lseek(fd, 0, os.SEEK_SET)
|
|
msvcrt.locking(fd, msvcrt.LK_UNLCK, 1)
|
|
else:
|
|
import fcntl
|
|
|
|
fcntl.flock(fd, fcntl.LOCK_UN)
|
|
finally:
|
|
os.close(fd)
|
|
|
|
|
|
def _load_toml(text: str) -> dict:
|
|
"""Load TOML using stdlib tomllib or third-party tomli as a fallback."""
|
|
try:
|
|
import tomllib # type: ignore[attr-defined]
|
|
except Exception:
|
|
try:
|
|
import tomli as tomllib # type: ignore[no-redef]
|
|
except Exception as exc: # pragma: no cover - import error path
|
|
raise SystemExit(
|
|
"Missing TOML parser. Install 'tomli' or use Python >= 3.11."
|
|
) from exc
|
|
return tomllib.loads(text)
|
|
|
|
|
|
def read_pep723_metadata(script_path: Path) -> dict:
|
|
"""
|
|
Parse PEP 723 inline script metadata from a Python file.
|
|
|
|
Returns the parsed TOML data as a dict, or empty dict if no block found.
|
|
"""
|
|
text = script_path.read_text(encoding="utf-8")
|
|
m = SCRIPT_BLOCK_RE.search(text)
|
|
if not m:
|
|
return {}
|
|
content = m.group("content")
|
|
toml_lines: list[str] = []
|
|
for line in content.splitlines():
|
|
if not line.startswith("#"):
|
|
continue
|
|
# Strip the leading comment marker and a single optional space
|
|
if line.startswith("# "):
|
|
toml_lines.append(line[2:])
|
|
else:
|
|
toml_lines.append(line[1:])
|
|
|
|
toml_text = "\n".join(toml_lines)
|
|
return _load_toml(toml_text)
|
|
|
|
|
|
def deps_digest(deps: list[str]) -> str:
|
|
"""Compute a stable hash of the dependency list."""
|
|
return hashlib.sha256(json.dumps(sorted(deps)).encode()).hexdigest()
|
|
|
|
|
|
def in_venv() -> bool:
|
|
"""
|
|
Check if we're currently running inside a virtual environment.
|
|
"""
|
|
return sys.prefix != sys.base_prefix
|
|
|
|
|
|
def _parse_version_tuple(v: str) -> tuple[int, int, int]:
|
|
"""Parse a version like '3.12.1' into a 3-tuple, ignoring any suffixes."""
|
|
parts = re.findall(r"\d+", v)
|
|
nums = [int(p) for p in parts[:3]]
|
|
while len(nums) < 3:
|
|
nums.append(0)
|
|
return tuple(nums) # type: ignore[return-value]
|
|
|
|
|
|
def _satisfies_requires_python(
|
|
spec: str, current: tuple[int, int, int] | None = None
|
|
) -> bool:
|
|
"""
|
|
Minimal evaluator for PEP 440-like specifiers in requires-python.
|
|
|
|
Supports common operators: >=, >, <=, <, ==, != and wildcard '==3.12.*'.
|
|
Combines multiple comma-separated specifiers with logical AND.
|
|
"""
|
|
cur = current or (
|
|
sys.version_info.major,
|
|
sys.version_info.minor,
|
|
sys.version_info.micro,
|
|
)
|
|
|
|
def cmp(a: tuple[int, int, int], b: tuple[int, int, int]) -> int:
|
|
return (a > b) - (a < b)
|
|
|
|
for raw in spec.split(","):
|
|
s = raw.strip()
|
|
if not s:
|
|
continue
|
|
op = None
|
|
for candidate in (">=", "<=", "==", "!=", ">", "<"):
|
|
if s.startswith(candidate):
|
|
op = candidate
|
|
ver = s[len(candidate) :].strip()
|
|
break
|
|
if op is None:
|
|
# Treat bare version as ==version (prefix match compatible with '==3.12.*')
|
|
op, ver = "==", s
|
|
wildcard = op in {"==", "!="} and ver.endswith(".*")
|
|
if wildcard:
|
|
ver = ver[:-2]
|
|
tgt = _parse_version_tuple(ver)
|
|
c = cmp(cur, tgt)
|
|
if op == ">=":
|
|
if c < 0:
|
|
return False
|
|
elif op == ">":
|
|
if c <= 0:
|
|
return False
|
|
elif op == "<=":
|
|
if c > 0:
|
|
return False
|
|
elif op == "<":
|
|
if c >= 0:
|
|
return False
|
|
elif op == "==":
|
|
if wildcard:
|
|
# Prefix equality: compare only provided components
|
|
prefix = _parse_version_tuple(ver) # already trimmed
|
|
plen = 2 if ver.count(".") == 1 else 3
|
|
if tuple(cur[:plen]) != tuple(prefix[:plen]):
|
|
return False
|
|
else:
|
|
if c != 0:
|
|
return False
|
|
elif op == "!=":
|
|
if wildcard:
|
|
prefix = _parse_version_tuple(ver)
|
|
plen = 2 if ver.count(".") == 1 else 3
|
|
if tuple(cur[:plen]) == tuple(prefix[:plen]):
|
|
return False
|
|
else:
|
|
if c == 0:
|
|
return False
|
|
else:
|
|
return False
|
|
return True
|
|
|
|
|
|
def is_venv_managed(venv_dir: Path) -> bool:
|
|
"""Check if a venv was created by this script manager."""
|
|
marker = venv_dir / ".script-managed"
|
|
return marker.exists()
|
|
|
|
|
|
def get_venv_digest(venv_dir: Path) -> str | None:
|
|
"""Get the stored dependency digest from a managed venv."""
|
|
marker = venv_dir / ".script-managed"
|
|
if not marker.exists():
|
|
return None
|
|
return marker.read_text().strip()
|
|
|
|
|
|
def set_venv_digest(venv_dir: Path, digest: str) -> None:
|
|
"""Store the dependency digest in a managed venv."""
|
|
marker = venv_dir / ".script-managed"
|
|
marker.write_text(digest)
|
|
|
|
|
|
def create_venv(venv_dir: Path) -> Path:
|
|
"""
|
|
Create a new virtual environment and return the path to its Python binary.
|
|
|
|
Note: This function should be called within a _venv_lock() context.
|
|
"""
|
|
python_bin = venv_dir / ("Scripts/python.exe" if os.name == "nt" else "bin/python")
|
|
if not python_bin.exists():
|
|
venv.create(venv_dir, with_pip=True)
|
|
return python_bin
|
|
|
|
|
|
def install_deps(python_bin: Path, deps: list[str]) -> None:
|
|
"""
|
|
Install dependencies into a virtual environment.
|
|
|
|
Note: This function should be called within a _venv_lock() context.
|
|
"""
|
|
if not deps:
|
|
return
|
|
subprocess.check_call([str(python_bin), "-m", "pip", "install", *deps])
|
|
|
|
|
|
def bootstrap_venv(script_file: str) -> None:
|
|
"""
|
|
Bootstrap the script with its venv if not already running in one.
|
|
|
|
If script_path is None, uses __file__ from the calling context.
|
|
This function will re-exec the script with the venv's Python if needed.
|
|
"""
|
|
# Allow users to opt out entirely
|
|
if os.environ.get("AUTOVENV", "1").lower() in {"0", "false", "no"}:
|
|
return
|
|
|
|
script_path = Path(script_file).resolve()
|
|
|
|
# Read PEP 723 metadata
|
|
meta = read_pep723_metadata(script_path)
|
|
requires = meta.get("requires-python")
|
|
if isinstance(requires, str) and not _satisfies_requires_python(requires):
|
|
msg = (
|
|
f"Python {sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro} "
|
|
f"does not satisfy requires-python: {requires}"
|
|
)
|
|
raise SystemExit(msg)
|
|
deps = meta.get("dependencies", [])
|
|
current_digest = deps_digest(deps)
|
|
|
|
if in_venv():
|
|
# Already in a venv, use it
|
|
venv_dir = Path(sys.prefix)
|
|
python_bin = Path(sys.executable)
|
|
managed = is_venv_managed(venv_dir)
|
|
else:
|
|
# Create a new managed venv
|
|
venv_dir = script_path.parent / ".venv"
|
|
with _venv_lock(venv_dir):
|
|
python_bin = create_venv(venv_dir)
|
|
managed = True
|
|
|
|
stored_digest = get_venv_digest(venv_dir)
|
|
if managed and stored_digest != current_digest:
|
|
# Managed venv and deps changed, reinstall
|
|
with _venv_lock(venv_dir):
|
|
# Double-check pattern: another process may have just finished installing
|
|
stored_digest = get_venv_digest(venv_dir)
|
|
if stored_digest != current_digest:
|
|
install_deps(python_bin, deps)
|
|
set_venv_digest(venv_dir, current_digest)
|
|
|
|
if venv_dir != Path(sys.prefix):
|
|
# Re-exec with venv Python
|
|
os.execv(str(python_bin), [str(python_bin), str(script_path), *sys.argv[1:]])
|