643 lines
21 KiB
Python
643 lines
21 KiB
Python
#!/usr/bin/env python3
|
||
"""Jibo OS Updater
|
||
|
||
Downloads the latest JiboOs release from the configured Gitea instance,
|
||
extracts it, then uploads the contents of the release "build" folder into
|
||
Jibo's root filesystem over SFTP.
|
||
|
||
High-level flow:
|
||
1) Check latest release
|
||
2) Download + extract archive
|
||
3) SSH into Jibo (root / password)
|
||
4) Remount / as read-write
|
||
5) SFTP upload build/ contents into /
|
||
6) Optionally switch /var/jibo/mode.json back to "normal"
|
||
|
||
This tool assumes your Jibo is already modded and reachable via SSH.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import json
|
||
import os
|
||
import posixpath
|
||
import re
|
||
import shutil
|
||
import sys
|
||
import tarfile
|
||
import time
|
||
import urllib.error
|
||
import urllib.parse
|
||
import urllib.request
|
||
import zipfile
|
||
from dataclasses import dataclass
|
||
from pathlib import Path
|
||
from typing import Iterable, Optional
|
||
|
||
import paramiko
|
||
|
||
|
||
SCRIPT_DIR = Path(__file__).parent.resolve()
|
||
WORK_DIR = SCRIPT_DIR / "jibo_work"
|
||
UPDATES_DIR = WORK_DIR / "updates"
|
||
STATE_FILE_DEFAULT = WORK_DIR / "update_state.json"
|
||
|
||
DEFAULT_RELEASES_API = "https://kevinblog.sytes.net/Code/api/v1/repos/Kevin/JiboOs/releases"
|
||
|
||
|
||
class Colors:
|
||
RED = "\033[91m"
|
||
GREEN = "\033[92m"
|
||
YELLOW = "\033[93m"
|
||
BLUE = "\033[94m"
|
||
CYAN = "\033[96m"
|
||
RESET = "\033[0m"
|
||
BOLD = "\033[1m"
|
||
|
||
|
||
def _no_color_if_not_tty() -> None:
|
||
if not sys.stdout.isatty():
|
||
for attr in dir(Colors):
|
||
if attr.startswith("_"):
|
||
continue
|
||
setattr(Colors, attr, "")
|
||
|
||
|
||
def print_info(msg: str) -> None:
|
||
print(f"{Colors.CYAN}ℹ {msg}{Colors.RESET}")
|
||
|
||
|
||
def print_success(msg: str) -> None:
|
||
print(f"{Colors.GREEN}✓ {msg}{Colors.RESET}")
|
||
|
||
|
||
def print_warning(msg: str) -> None:
|
||
print(f"{Colors.YELLOW}⚠ {msg}{Colors.RESET}")
|
||
|
||
|
||
def print_error(msg: str) -> None:
|
||
print(f"{Colors.RED}✗ {msg}{Colors.RESET}")
|
||
|
||
|
||
def prompt_yes_no(question: str, default: bool = False) -> bool:
|
||
suffix = "[Y/n]" if default else "[y/N]"
|
||
while True:
|
||
ans = input(f"{question} {suffix} ").strip().lower()
|
||
if not ans:
|
||
return default
|
||
if ans in {"y", "yes"}:
|
||
return True
|
||
if ans in {"n", "no"}:
|
||
return False
|
||
print("Please answer y or n.")
|
||
|
||
|
||
@dataclass(frozen=True)
|
||
class Release:
|
||
tag_name: str
|
||
name: str
|
||
prerelease: bool
|
||
tarball_url: str
|
||
zipball_url: str
|
||
|
||
|
||
def http_get_json(url: str, timeout: int = 20) -> object:
|
||
req = urllib.request.Request(
|
||
url,
|
||
headers={
|
||
"Accept": "application/json",
|
||
"User-Agent": "JiboUpdater/1.0",
|
||
},
|
||
)
|
||
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
||
data = resp.read()
|
||
return json.loads(data.decode("utf-8", errors="replace"))
|
||
|
||
|
||
_VERSION_RE = re.compile(r"^v?(\d+)(?:\.(\d+))?(?:\.(\d+))?")
|
||
|
||
|
||
def _version_tuple(tag: str) -> tuple[int, int, int]:
|
||
m = _VERSION_RE.match(tag.strip())
|
||
if not m:
|
||
return (0, 0, 0)
|
||
major = int(m.group(1) or 0)
|
||
minor = int(m.group(2) or 0)
|
||
patch = int(m.group(3) or 0)
|
||
return (major, minor, patch)
|
||
|
||
|
||
def get_latest_release(releases_api: str, allow_prerelease: bool) -> Release:
|
||
raw = http_get_json(releases_api)
|
||
if not isinstance(raw, list) or not raw:
|
||
raise RuntimeError(f"Unexpected releases API response from {releases_api}")
|
||
|
||
releases: list[Release] = []
|
||
for item in raw:
|
||
if not isinstance(item, dict):
|
||
continue
|
||
prerelease = bool(item.get("prerelease", False))
|
||
if prerelease and not allow_prerelease:
|
||
continue
|
||
releases.append(
|
||
Release(
|
||
tag_name=str(item.get("tag_name", "")),
|
||
name=str(item.get("name", "")),
|
||
prerelease=prerelease,
|
||
tarball_url=str(item.get("tarball_url", "")),
|
||
zipball_url=str(item.get("zipball_url", "")),
|
||
)
|
||
)
|
||
|
||
if not releases:
|
||
raise RuntimeError("No releases found (after prerelease filtering)")
|
||
|
||
# Gitea usually returns newest first, but sort by semver-ish tag to be safe.
|
||
releases.sort(key=lambda r: _version_tuple(r.tag_name), reverse=True)
|
||
return releases[0]
|
||
|
||
|
||
def normalize_download_url(download_url: str, base_url: str) -> str:
|
||
"""Force downloads to use the same scheme/host as the API base.
|
||
|
||
Some Gitea instances can be configured with a different ROOT_URL than the
|
||
externally reachable hostname, which can leak into tarball_url/zipball_url.
|
||
"""
|
||
|
||
if not download_url:
|
||
return download_url
|
||
|
||
base = urllib.parse.urlparse(base_url)
|
||
dl = urllib.parse.urlparse(download_url)
|
||
|
||
# If already matches, keep as-is.
|
||
if dl.scheme == base.scheme and dl.netloc == base.netloc:
|
||
return download_url
|
||
|
||
# If download URL is missing components or has a different host, rewrite it.
|
||
return urllib.parse.urlunparse(
|
||
(base.scheme, base.netloc, dl.path, dl.params, dl.query, dl.fragment)
|
||
)
|
||
|
||
|
||
def _ensure_dirs() -> None:
|
||
WORK_DIR.mkdir(parents=True, exist_ok=True)
|
||
UPDATES_DIR.mkdir(parents=True, exist_ok=True)
|
||
|
||
|
||
def _download(url: str, dest: Path, *, force: bool = False) -> None:
|
||
dest.parent.mkdir(parents=True, exist_ok=True)
|
||
if dest.exists() and not force:
|
||
print_info(f"Using cached download: {dest}")
|
||
return
|
||
|
||
print_info(f"Downloading: {url}")
|
||
tmp = dest.with_suffix(dest.suffix + ".part")
|
||
|
||
last_err: Optional[BaseException] = None
|
||
for attempt in range(1, 4):
|
||
try:
|
||
if tmp.exists():
|
||
tmp.unlink(missing_ok=True)
|
||
|
||
with urllib.request.urlopen(url, timeout=180) as resp:
|
||
total = resp.headers.get("Content-Length")
|
||
total_int = int(total) if total and total.isdigit() else None
|
||
downloaded = 0
|
||
chunk_size = 1024 * 256
|
||
with open(tmp, "wb") as f:
|
||
while True:
|
||
chunk = resp.read(chunk_size)
|
||
if not chunk:
|
||
break
|
||
f.write(chunk)
|
||
downloaded += len(chunk)
|
||
if total_int:
|
||
pct = downloaded * 100.0 / total_int
|
||
sys.stdout.write(
|
||
f"\r {downloaded/1e6:.1f}MB / {total_int/1e6:.1f}MB ({pct:.1f}%)"
|
||
)
|
||
sys.stdout.flush()
|
||
|
||
if total_int:
|
||
sys.stdout.write("\n")
|
||
tmp.replace(dest)
|
||
print_success(f"Downloaded to {dest}")
|
||
return
|
||
|
||
except Exception as e:
|
||
last_err = e
|
||
wait = 2**attempt
|
||
print_warning(f"Download attempt {attempt}/3 failed: {e}. Retrying in {wait}s...")
|
||
time.sleep(wait)
|
||
|
||
if tmp.exists():
|
||
tmp.unlink(missing_ok=True)
|
||
raise RuntimeError(f"Download failed after 3 attempts: {last_err}")
|
||
|
||
|
||
def _extract(archive: Path, extract_dir: Path, *, force: bool = False) -> Path:
|
||
if extract_dir.exists() and force:
|
||
shutil.rmtree(extract_dir)
|
||
|
||
if extract_dir.exists():
|
||
print_info(f"Using cached extraction: {extract_dir}")
|
||
return extract_dir
|
||
|
||
extract_dir.mkdir(parents=True, exist_ok=True)
|
||
print_info(f"Extracting {archive.name} ...")
|
||
|
||
def _is_within(base: Path, target: Path) -> bool:
|
||
try:
|
||
target.resolve().relative_to(base.resolve())
|
||
return True
|
||
except Exception:
|
||
return False
|
||
|
||
if archive.suffixes[-2:] == [".tar", ".gz"] or archive.suffix == ".tgz":
|
||
with tarfile.open(archive, "r:gz") as tf:
|
||
for member in tf.getmembers():
|
||
member_path = extract_dir / member.name
|
||
if not _is_within(extract_dir, member_path):
|
||
raise RuntimeError(f"Unsafe path in tar archive: {member.name}")
|
||
# Python 3.14 changes tar default filtering behavior; be explicit.
|
||
try:
|
||
tf.extractall(extract_dir, filter="data")
|
||
except TypeError:
|
||
tf.extractall(extract_dir)
|
||
elif archive.suffix == ".zip":
|
||
with zipfile.ZipFile(archive) as zf:
|
||
for member in zf.infolist():
|
||
member_path = extract_dir / member.filename
|
||
if not _is_within(extract_dir, member_path):
|
||
raise RuntimeError(f"Unsafe path in zip archive: {member.filename}")
|
||
zf.extractall(extract_dir)
|
||
else:
|
||
raise RuntimeError(f"Unsupported archive type: {archive}")
|
||
|
||
print_success(f"Extracted to {extract_dir}")
|
||
return extract_dir
|
||
|
||
|
||
def _iter_build_candidates(root: Path) -> Iterable[Path]:
|
||
for path in root.rglob("build"):
|
||
if path.is_dir():
|
||
yield path
|
||
|
||
|
||
def _score_build_dir(path: Path) -> int:
|
||
score = 0
|
||
for name, weight in (("etc", 5), ("opt", 5), ("var", 2), ("usr", 2), ("lib", 1), ("bin", 1)):
|
||
if (path / name).exists():
|
||
score += weight
|
||
# Prefer build dirs that are under a version folder like V3.1/build
|
||
parts = {p.lower() for p in path.parts}
|
||
if any(re.fullmatch(r"v\d+(?:\.\d+)*", p, flags=re.IGNORECASE) for p in parts):
|
||
score += 2
|
||
return score
|
||
|
||
|
||
def find_build_dir(extract_root: Path, explicit: Optional[str]) -> Path:
|
||
if explicit:
|
||
p = (extract_root / explicit).resolve()
|
||
if not p.exists() or not p.is_dir():
|
||
raise RuntimeError(f"--build-path not found: {p}")
|
||
return p
|
||
|
||
candidates = list(_iter_build_candidates(extract_root))
|
||
if not candidates:
|
||
raise RuntimeError(
|
||
"Could not find a 'build' folder in the extracted archive. "
|
||
"Use --build-path to point to it (relative to the extracted root)."
|
||
)
|
||
|
||
candidates.sort(key=_score_build_dir, reverse=True)
|
||
best = candidates[0]
|
||
|
||
if _score_build_dir(best) == 0 and len(candidates) > 1:
|
||
print_warning("Found build folders, but none look like a rootfs overlay (no etc/opt).")
|
||
|
||
print_info(f"Using build folder: {best}")
|
||
return best
|
||
|
||
|
||
def load_state(path: Path) -> dict:
|
||
if not path.exists():
|
||
return {}
|
||
try:
|
||
return json.loads(path.read_text("utf-8"))
|
||
except Exception:
|
||
return {}
|
||
|
||
|
||
def save_state(path: Path, state: dict) -> None:
|
||
path.parent.mkdir(parents=True, exist_ok=True)
|
||
path.write_text(json.dumps(state, indent=2, sort_keys=True) + "\n", encoding="utf-8")
|
||
|
||
|
||
def ssh_connect(host: str, user: str, password: str, timeout: int) -> paramiko.SSHClient:
|
||
client = paramiko.SSHClient()
|
||
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||
client.connect(
|
||
hostname=host,
|
||
username=user,
|
||
password=password,
|
||
look_for_keys=False,
|
||
allow_agent=False,
|
||
timeout=timeout,
|
||
banner_timeout=timeout,
|
||
auth_timeout=timeout,
|
||
)
|
||
return client
|
||
|
||
|
||
def ssh_exec(client: paramiko.SSHClient, command: str, timeout: int = 60) -> tuple[int, str, str]:
|
||
stdin, stdout, stderr = client.exec_command(command, timeout=timeout)
|
||
_ = stdin
|
||
out = stdout.read().decode("utf-8", errors="replace")
|
||
err = stderr.read().decode("utf-8", errors="replace")
|
||
code = stdout.channel.recv_exit_status()
|
||
return code, out, err
|
||
|
||
|
||
def ensure_remote_dir(sftp: paramiko.SFTPClient, remote_dir: str) -> None:
|
||
# Create each path component if missing.
|
||
parts = [p for p in remote_dir.split("/") if p]
|
||
cur = "/"
|
||
for part in parts:
|
||
cur = posixpath.join(cur, part)
|
||
try:
|
||
sftp.stat(cur)
|
||
except IOError:
|
||
sftp.mkdir(cur)
|
||
|
||
|
||
def upload_tree(
|
||
sftp: paramiko.SFTPClient,
|
||
local_root: Path,
|
||
remote_root: str = "/",
|
||
*,
|
||
dry_run: bool = False,
|
||
) -> None:
|
||
local_root = local_root.resolve()
|
||
|
||
paths = sorted(local_root.rglob("*"))
|
||
total = len(paths)
|
||
sent = 0
|
||
|
||
for p in paths:
|
||
rel = p.relative_to(local_root).as_posix()
|
||
remote_path = posixpath.join(remote_root, rel)
|
||
|
||
if p.is_dir():
|
||
if dry_run:
|
||
continue
|
||
ensure_remote_dir(sftp, remote_path)
|
||
continue
|
||
|
||
if p.is_symlink():
|
||
target = os.readlink(p)
|
||
if dry_run:
|
||
sent += 1
|
||
continue
|
||
# Ensure parent exists
|
||
ensure_remote_dir(sftp, posixpath.dirname(remote_path))
|
||
try:
|
||
# Remove if exists
|
||
try:
|
||
sftp.remove(remote_path)
|
||
except IOError:
|
||
pass
|
||
sftp.symlink(target, remote_path)
|
||
except Exception:
|
||
# Fallback: dereference and upload file content
|
||
real_path = p.resolve()
|
||
sftp.put(str(real_path), remote_path)
|
||
sent += 1
|
||
if sent % 200 == 0:
|
||
print_info(f"Uploaded {sent}/{total} entries...")
|
||
continue
|
||
|
||
if p.is_file():
|
||
if dry_run:
|
||
sent += 1
|
||
continue
|
||
|
||
ensure_remote_dir(sftp, posixpath.dirname(remote_path))
|
||
sftp.put(str(p), remote_path)
|
||
try:
|
||
mode = p.stat().st_mode & 0o777
|
||
sftp.chmod(remote_path, mode)
|
||
except Exception:
|
||
pass
|
||
|
||
sent += 1
|
||
if sent % 200 == 0:
|
||
print_info(f"Uploaded {sent}/{total} entries...")
|
||
|
||
print_success(f"Upload complete ({sent} files/links)")
|
||
|
||
|
||
def set_mode_json_to_normal(sftp: paramiko.SFTPClient) -> None:
|
||
remote = "/var/jibo/mode.json"
|
||
try:
|
||
with sftp.open(remote, "r") as f:
|
||
content = f.read().decode("utf-8", errors="replace")
|
||
except IOError as e:
|
||
raise RuntimeError(f"Failed to read {remote}: {e}")
|
||
|
||
new_content: str
|
||
try:
|
||
data = json.loads(content)
|
||
if not isinstance(data, dict):
|
||
raise ValueError("mode.json is not a JSON object")
|
||
data["mode"] = "normal"
|
||
new_content = json.dumps(data, separators=(",", ": ")) + "\n"
|
||
except Exception:
|
||
# Fallback for non-standard formatting
|
||
new_content = re.sub(r'("mode"\s*:\s*")([^"]+)(")', r'\1normal\3', content)
|
||
if new_content == content:
|
||
# As a last resort, overwrite with a minimal JSON.
|
||
new_content = '{"mode": "normal"}\n'
|
||
|
||
with sftp.open(remote, "w") as f:
|
||
f.write(new_content.encode("utf-8"))
|
||
|
||
|
||
def main() -> int:
|
||
_no_color_if_not_tty()
|
||
|
||
parser = argparse.ArgumentParser(description="Update a modded Jibo with the latest JiboOs release")
|
||
parser.add_argument("--ip", "--host", dest="host", required=True, help="Jibo IP/hostname")
|
||
parser.add_argument("--user", default="root", help="SSH username (default: root)")
|
||
parser.add_argument("--password", default="jibo", help="SSH password (default: jibo)")
|
||
parser.add_argument("--releases-api", default=DEFAULT_RELEASES_API, help="Gitea releases API URL")
|
||
|
||
parser.add_argument("--stable", action="store_true", help="Ignore prereleases")
|
||
parser.add_argument("--tag", help="Install a specific tag (e.g. v3.3.0) instead of latest")
|
||
|
||
parser.add_argument("--build-path", help="Path to build folder inside extracted tree (relative)")
|
||
|
||
parser.add_argument("--state-file", type=Path, default=STATE_FILE_DEFAULT, help="Where to store last applied version")
|
||
parser.add_argument("--force", action="store_true", help="Re-download and re-install even if version matches")
|
||
parser.add_argument("--yes", action="store_true", help="Don’t prompt for confirmation")
|
||
parser.add_argument("--dry-run", action="store_true", help="Download/extract + connect, but don’t write files")
|
||
|
||
parser.add_argument(
|
||
"--return-normal",
|
||
action="store_true",
|
||
help="After update, set /var/jibo/mode.json mode back to normal (no prompt)",
|
||
)
|
||
parser.add_argument(
|
||
"--no-return-normal",
|
||
action="store_true",
|
||
help="After update, do not ask to return to normal mode",
|
||
)
|
||
|
||
parser.add_argument("--ssh-timeout", type=int, default=15, help="SSH connect timeout seconds")
|
||
|
||
args = parser.parse_args()
|
||
|
||
_ensure_dirs()
|
||
|
||
allow_prerelease = not args.stable
|
||
|
||
print_info("Checking latest release...")
|
||
if args.tag:
|
||
# Fetch all releases and pick the one matching tag
|
||
raw = http_get_json(args.releases_api)
|
||
if not isinstance(raw, list):
|
||
raise RuntimeError("Unexpected releases API response")
|
||
chosen: Optional[Release] = None
|
||
for item in raw:
|
||
if not isinstance(item, dict):
|
||
continue
|
||
if str(item.get("tag_name", "")) == args.tag:
|
||
chosen = Release(
|
||
tag_name=str(item.get("tag_name", "")),
|
||
name=str(item.get("name", "")),
|
||
prerelease=bool(item.get("prerelease", False)),
|
||
tarball_url=str(item.get("tarball_url", "")),
|
||
zipball_url=str(item.get("zipball_url", "")),
|
||
)
|
||
break
|
||
if not chosen:
|
||
raise RuntimeError(f"Tag not found in releases: {args.tag}")
|
||
release = chosen
|
||
else:
|
||
release = get_latest_release(args.releases_api, allow_prerelease=allow_prerelease)
|
||
|
||
if not release.tag_name or not release.tarball_url:
|
||
raise RuntimeError("Release JSON missing tag_name or tarball_url")
|
||
|
||
state = load_state(args.state_file)
|
||
last = str(state.get(args.host, "")) if isinstance(state, dict) else ""
|
||
|
||
print_info(f"Latest: {release.tag_name} ({'prerelease' if release.prerelease else 'stable'})")
|
||
if last:
|
||
print_info(f"Last applied (from state): {last}")
|
||
|
||
if (not args.force) and last and last == release.tag_name:
|
||
print_success("Already at latest version (per local state). Use --force to reinstall.")
|
||
return 0
|
||
|
||
if not args.yes:
|
||
if not prompt_yes_no(
|
||
f"This will upload the release build overlay into / on {args.host} and overwrite files. Continue?",
|
||
default=False,
|
||
):
|
||
print_info("Aborted.")
|
||
return 2
|
||
|
||
# Download + extract
|
||
archive_name = f"{release.tag_name}.tar.gz"
|
||
archive_path = UPDATES_DIR / "downloads" / archive_name
|
||
extract_dir = UPDATES_DIR / "extracted" / release.tag_name
|
||
|
||
tarball_url = normalize_download_url(release.tarball_url, args.releases_api)
|
||
|
||
try:
|
||
_download(tarball_url, archive_path, force=args.force)
|
||
except urllib.error.URLError as e:
|
||
raise RuntimeError(f"Download failed: {e}")
|
||
|
||
_extract(archive_path, extract_dir, force=args.force)
|
||
|
||
# Gitea archives usually create a single top-level folder. Prefer that as the search root.
|
||
children = [p for p in extract_dir.iterdir() if p.is_dir()]
|
||
search_root = children[0] if len(children) == 1 else extract_dir
|
||
|
||
build_dir = find_build_dir(search_root, args.build_path)
|
||
|
||
# Connect and update
|
||
print_info(f"Connecting to {args.user}@{args.host} ...")
|
||
client = ssh_connect(args.host, args.user, args.password, timeout=args.ssh_timeout)
|
||
try:
|
||
code, out, err = ssh_exec(client, "sh -c 'touch /.jibo_rw_test 2>/dev/null && rm /.jibo_rw_test 2>/dev/null && echo WRITABLE || echo READONLY'")
|
||
if "WRITABLE" in out:
|
||
print_info("Root FS already writable")
|
||
else:
|
||
print_info("Remounting / as read-write...")
|
||
code, out, err = ssh_exec(client, "sh -c 'mount -o remount,rw /'", timeout=60)
|
||
if code != 0:
|
||
print_warning(f"Remount command returned {code}. stderr: {err.strip()}")
|
||
code, out, err = ssh_exec(client, "sh -c 'touch /.jibo_rw_test 2>/dev/null && rm /.jibo_rw_test 2>/dev/null && echo WRITABLE || echo READONLY'")
|
||
if "WRITABLE" not in out:
|
||
raise RuntimeError("Failed to remount / as writable (still READONLY)")
|
||
print_success("/ remounted writable")
|
||
|
||
if args.dry_run:
|
||
print_success("Dry-run: skipping upload")
|
||
else:
|
||
print_info("Starting SFTP upload (this can take a while)...")
|
||
sftp = client.open_sftp()
|
||
try:
|
||
upload_tree(sftp, build_dir, remote_root="/", dry_run=False)
|
||
finally:
|
||
sftp.close()
|
||
|
||
do_return = False
|
||
if args.return_normal:
|
||
do_return = True
|
||
elif args.no_return_normal:
|
||
do_return = False
|
||
elif args.yes:
|
||
do_return = False
|
||
else:
|
||
do_return = prompt_yes_no("Return Jibo to normal mode (mode.json: int-developer -> normal)?", default=False)
|
||
|
||
if do_return:
|
||
if args.dry_run:
|
||
print_info("Dry-run: skipping mode.json change")
|
||
else:
|
||
sftp = client.open_sftp()
|
||
try:
|
||
set_mode_json_to_normal(sftp)
|
||
print_success("Updated /var/jibo/mode.json to normal")
|
||
finally:
|
||
sftp.close()
|
||
|
||
if not args.dry_run:
|
||
# Update local state
|
||
if isinstance(state, dict):
|
||
state[args.host] = release.tag_name
|
||
save_state(args.state_file, state)
|
||
|
||
print_success(f"Update finished ({release.tag_name})")
|
||
return 0
|
||
|
||
finally:
|
||
client.close()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
try:
|
||
raise SystemExit(main())
|
||
except KeyboardInterrupt:
|
||
print("\nInterrupted.")
|
||
raise SystemExit(130)
|
||
except Exception as e:
|
||
print_error(str(e))
|
||
raise SystemExit(1)
|