ummm new tui thing ?

This commit is contained in:
2026-03-28 21:49:33 +02:00
parent 3db6de2d2c
commit de440305c7
16 changed files with 534 additions and 226 deletions

2
.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
*.bak

8
Distributors.json Normal file
View File

@@ -0,0 +1,8 @@
{
"LastUpdated": "28-3-26",
"OfficialHosts": [
"https://code.zane.org/ZaneDev/JiboOs-Mirror/releases",
"https://kevinblog.sytes.net/Code/api/v1/repos/Kevin/JiboOs/releases"
],
"UnoficialHosts": []
}

View File

@@ -67,13 +67,11 @@ def load_config_entries_from_values_md() -> list[ConfigEntry]:
if path in seen:
continue
# Filter out non-robot/server dev configs the user doesn't want here.
if path.startswith("/hub-shim/"):
continue
if path.lower().endswith(".md"):
continue
# Keep it focused on JSON files (these are strict JSON configs).
if not path.lower().endswith(".json"):
continue

View File

@@ -50,12 +50,10 @@ class MainWindowController:
self._identity: Optional[dict] = None
self._connecting = False
# Tabs + connection pill
self.tab_widget = require_child(self.window, "tabWidget", QTabWidget)
self.connection_pill, self.conn_dot, self.conn_text = self._create_connection_pill()
self.tab_widget.setCornerWidget(self.connection_pill, Qt.TopRightCorner)
# Jibo/config
self.jibo_ip = require_child(self.window, "JiboIpField", QLineEdit)
self.connect_button = require_child(self.window, "TryToConnect", QPushButton)
self.jibo_title = require_child(self.window, "jiboTitle", QLabel)
@@ -65,7 +63,6 @@ class MainWindowController:
self.ha_enable = require_child(self.window, "haEnableCheck", QCheckBox)
self.ha_server_ip = require_child(self.window, "haServerIpField", QLineEdit)
# AI Bridge (formerly "AI Provider")
self.ai_enable = require_child(self.window, "aiEnableCheck", QCheckBox)
self.ai_mode = require_child(self.window, "aiProviderCombo", QComboBox)
self.ai_server_base_url = require_child(self.window, "aiEndpointField", QLineEdit)
@@ -82,10 +79,8 @@ class MainWindowController:
self._ai_bridge_obj: Optional[dict[str, Any]] = None
# Tool settings
self.enable_logging_check = require_child(self.window, "enableLoggingCheck", QCheckBox)
# Config editor (main panel "Config" section)
self.config_file_combo = require_child(self.window, "configFileCombo", QComboBox)
self.config_read_button = require_child(self.window, "configReadButton", QPushButton)
self.config_write_button = require_child(self.window, "configWriteButton", QPushButton)
@@ -96,18 +91,15 @@ class MainWindowController:
self._config_last_read_text: Optional[str] = None
self._config_paths: list[str] = []
# Jibo card controls
self.robot_settings_button = require_child(self.window, "RobotSettings", QPushButton)
self.robot_action_combo = require_child(self.window, "comboBox", QComboBox)
self.jibo_image = require_child(self.window, "jiboImage", QLabel)
self._robot_settings_window: Optional[object] = None
# Update page
self.install_button = require_child(self.window, "installButton", QPushButton)
self.check_updates_button = require_child(self.window, "checkUpdatesButton", QPushButton)
# Status page
self.status_dot = require_child(self.window, "statusDot", QLabel)
self.status_text = require_child(self.window, "statusText", QLabel)
@@ -137,7 +129,6 @@ class MainWindowController:
return self.session_connected
def _configure_ui(self) -> None:
# Simple styling, roughly matching the previous QML look.
self.connection_pill.setStyleSheet(
"QFrame#connectionPill {"
"background-color: #f6f6f6;"
@@ -146,22 +137,18 @@ class MainWindowController:
"}"
)
# AI Bridge mode choices
self.ai_mode.clear()
self.ai_mode.addItems(["TEXT", "AUDIO"])
# Robot controls start disabled until connected.
self.robot_settings_button.setEnabled(False)
self.robot_action_combo.setEnabled(False)
# Config editor defaults
self.config_editor.setPlaceholderText("Select a config file, then Read")
self.config_activity_log.setReadOnly(True)
self.config_activity_log.setPlaceholderText("Logging is disabled")
self.config_read_button.setEnabled(False)
self.config_write_button.setEnabled(False)
# Defaults
self.connect_button.setText("Connect")
self.jibo_title.setText("Connect Your Jibo")
@@ -179,7 +166,6 @@ class MainWindowController:
self.edit_ai_bridge_button.clicked.connect(self._jump_to_ai_bridge_config)
# Keep AI Bridge in-memory config in sync with UI edits.
self.ai_enable.toggled.connect(self._sync_ai_bridge_obj_from_ui)
self.ai_mode.currentIndexChanged.connect(self._sync_ai_bridge_obj_from_ui)
self.ai_server_base_url.textChanged.connect(self._sync_ai_bridge_obj_from_ui)
@@ -218,7 +204,6 @@ class MainWindowController:
ssh_client=self._ssh_client,
logging_enabled_check=self.enable_logging_check,
)
# Refresh the SSH client reference in case we reconnected.
try:
self._robot_settings_window.set_ssh_client(self._ssh_client) # type: ignore[attr-defined]
except Exception:
@@ -245,12 +230,10 @@ class MainWindowController:
self.ai_followup_enabled.setEnabled(ai_enabled)
self.ai_followup_delay_ms.setEnabled(ai_enabled)
# Connection button enabled unless a connect attempt is in progress.
self.connect_button.setEnabled(not self._connecting)
connected = self.session_connected
self.config_read_button.setEnabled(connected and self.config_file_combo.count() > 0)
# write button is controlled by editor dirty state
def _sync_all(self) -> None:
host = self.host
@@ -285,7 +268,6 @@ class MainWindowController:
else:
self.status_text.setText("No Jibo IP configured")
# Image swap
assets = Path(__file__).resolve().parent / "Assets" / "Jibo"
img_path = assets / ("JiboFaceForward.png" if visual_connected else "NoJiboConnected.png")
pm = QPixmap(str(img_path))
@@ -340,7 +322,6 @@ class MainWindowController:
self.config_activity_log.appendPlainText(message)
def _populate_config_file_combo(self) -> None:
# Populate from inventory, excluding /usr/local/etc (those belong under Robot Settings)
entries = load_config_entries_from_values_md()
paths = [e.remote_path for e in entries if not e.is_usr_local_etc]
paths = sorted(paths)
@@ -369,8 +350,6 @@ class MainWindowController:
self.config_file_combo.setCurrentIndex(idx)
self._read_selected_config()
# Seed editor with the current AI Bridge UI state so the user can
# immediately press Write.
try:
merged = self._merged_ai_bridge_obj_from_ui()
desired_text = json.dumps(merged, indent=2, ensure_ascii=False) + "\n"
@@ -539,7 +518,6 @@ class MainWindowController:
return base
def _sync_ai_bridge_obj_from_ui(self, *_args: Any) -> None:
# Keep unknown keys (if any) from the on-robot JSON.
try:
self._ai_bridge_obj = self._merged_ai_bridge_obj_from_ui()
except Exception:
@@ -577,7 +555,6 @@ class MainWindowController:
except Exception:
old_obj = MISSING
# Safety: if a /usr/local path ever ends up here, handle remount.
if p.startswith("/usr/local/"):
cmd = "mount -o remount,rw /usr/local"
self._log(f"EXEC {cmd}")
@@ -666,12 +643,10 @@ class MainWindowController:
identity = json.loads(raw_text)
# Success: store session.
self._ssh_client = client
self._identity = identity if isinstance(identity, dict) else None
self.status_text.setText(f"Connected via SSH to {host}")
# Auto-populate AI Bridge section when connected.
try:
self._load_ai_bridge_from_robot()
except Exception:
@@ -705,7 +680,6 @@ class MainWindowController:
layout.addWidget(dot)
layout.addWidget(text)
# Keep it tight on the tab bar.
pill.setSizePolicy(pill.sizePolicy().horizontalPolicy(), pill.sizePolicy().verticalPolicy())
pill.setMinimumHeight(28)
return pill, dot, text

View File

@@ -45,7 +45,6 @@ def resolve_python_invocation() -> tuple[str, list[str]]:
if venv_py.exists():
return (str(venv_py), [])
# Prefer the current interpreter when running inside a venv (e.g. Qt Creator).
try:
if sys.executable and Path(sys.executable).exists():
return (sys.executable, [])
@@ -64,7 +63,6 @@ def resolve_python_invocation() -> tuple[str, list[str]]:
def resolve_python() -> str:
program, prefix = resolve_python_invocation()
if prefix:
# Best-effort string representation (mostly for display)
return " ".join([program] + prefix)
return program
@@ -82,7 +80,6 @@ def _pick_terminal_command() -> Optional[list[str]]:
return None
candidates: list[list[str]] = []
# Debian/Ubuntu alternative system
candidates.append(["x-terminal-emulator", "-e"])
candidates.append(["gnome-terminal", "--"])
candidates.append(["konsole", "-e"])
@@ -102,8 +99,6 @@ def spawn_in_terminal(argv: list[str]) -> bool:
"""
if os.name == "nt":
# Use cmd.exe window, keep it open (/k)
# Build a single string command for cmd.
cmdline = " ".join(shlex.quote(a) for a in argv)
subprocess.Popen(["cmd", "/c", "start", "cmd", "/k", cmdline], shell=False)
return True

View File

@@ -44,13 +44,11 @@ class RobotSettingsWindow:
splitter = QSplitter(Qt.Horizontal)
outer.addWidget(splitter, 1)
# Left: tree
self.tree = QTreeWidget()
self.tree.setHeaderHidden(True)
self.tree.setSelectionMode(QAbstractItemView.SingleSelection)
splitter.addWidget(self.tree)
# Right: editor + buttons + log
right = QWidget()
right_layout = QVBoxLayout(right)
right_layout.setContentsMargins(0, 0, 0, 0)
@@ -168,7 +166,6 @@ class RobotSettingsWindow:
@Slot()
def _on_editor_changed(self) -> None:
# Enable write only if we have a loaded file and text changed.
if not self._current_remote_path or self._last_read_text is None:
self.write_button.setEnabled(False)
return
@@ -243,14 +240,12 @@ class RobotSettingsWindow:
new_text_raw = self.editor.toPlainText()
# Validate JSON if possible; this tool is focused on strict JSON configs.
try:
new_obj = json.loads(new_text_raw)
except Exception as e:
QMessageBox.warning(self.window, "Invalid JSON", f"JSON parse failed: {e}")
return
# Canonicalize to keep robot-side JSON strict/clean.
new_text = json.dumps(new_obj, indent=2, ensure_ascii=False) + "\n"
try:
@@ -264,7 +259,6 @@ class RobotSettingsWindow:
except Exception:
old_obj = MISSING
# Mounted dir special case: /usr/local/* is often read-only until remount.
if remote_path.startswith("/usr/local/"):
cmd = "mount -o remount,rw /usr/local"
self._log(f"EXEC {cmd}")
@@ -280,7 +274,6 @@ class RobotSettingsWindow:
if out.strip():
self._log(out.strip())
# Compute diffs (best-effort).
if old_obj is not MISSING:
diffs = diff_json(old_obj, new_obj)
if diffs:
@@ -302,7 +295,6 @@ class RobotSettingsWindow:
try:
self._sftp_write_text(remote_path, new_text)
self._log(f"WROTE {remote_path} ({len(new_text)} bytes)")
# Refresh read baseline.
self.editor.setPlainText(new_text)
self._last_read_text = new_text
self.write_button.setEnabled(False)

View File

@@ -63,7 +63,6 @@ class ToolRunnerWindow(QObject):
self._host_field.setVisible(self._is_updater)
# Installer-specific UX
self._use_existing_dump.setVisible(self._is_installer)
self._dump_path.setVisible(self._is_installer)
self._browse_dump.setVisible(self._is_installer)
@@ -108,7 +107,6 @@ class ToolRunnerWindow(QObject):
self._sync_buttons()
self._sync_status()
# Ensure the process is stopped when the window closes.
self.window.closeEvent = self._on_close # type: ignore[assignment]
def show(self) -> None:
@@ -125,7 +123,6 @@ class ToolRunnerWindow(QObject):
extra = self._extra_args.text().strip()
extra_args: list[str] = shlex.split(extra) if extra else []
# Installer convenience: if the user has an existing dump, pass it via --dump-path
if self._is_installer and self._use_existing_dump.isChecked():
dump_path = self._dump_path.text().strip()
if dump_path and "--dump-path" not in extra_args:
@@ -150,7 +147,6 @@ class ToolRunnerWindow(QObject):
self._status.setText("Dump file not found")
return
# Reset progress state for a new run.
self._output_buffer = ""
self._last_step_total = None
if self._is_installer:
@@ -168,7 +164,6 @@ class ToolRunnerWindow(QObject):
@Slot(str)
def _append_output(self, chunk: str) -> None:
# Keep it simple: append and scroll to end.
self._log.moveCursor(QTextCursor.End)
self._log.insertPlainText(chunk)
self._log.moveCursor(QTextCursor.End)
@@ -181,7 +176,6 @@ class ToolRunnerWindow(QObject):
self._start_stop.setText("Stop" if running else "Start")
self._open_terminal.setEnabled(not running)
if not running and self._is_installer:
# Leave progress/status in a meaningful final state.
if self.runner.exitCode == 0 and self._last_step_total:
self._progress.setRange(0, self._last_step_total)
self._progress.setValue(self._last_step_total)
@@ -194,7 +188,6 @@ class ToolRunnerWindow(QObject):
def _sync_status(self) -> None:
if self.runner.running:
self._status.setText("Running...")
# Indeterminate until we see a structured step marker.
if self._is_installer and self._last_step_total is None:
self._progress.setRange(0, 0)
return
@@ -233,7 +226,6 @@ class ToolRunnerWindow(QObject):
self._output_buffer += chunk
lines = self._output_buffer.splitlines(keepends=True)
# Keep any partial line for the next chunk.
if lines and not (lines[-1].endswith("\n") or lines[-1].endswith("\r")):
self._output_buffer = lines[-1]
lines = lines[:-1]
@@ -245,7 +237,6 @@ class ToolRunnerWindow(QObject):
if not clean:
continue
# Also surface meaningful non-step status lines (RCM detection, warnings, etc.)
if clean.startswith(("", "", "", "")) or "RCM" in clean:
msg = _clean_status_line(clean)
if msg and not msg.startswith("["):
@@ -258,7 +249,6 @@ class ToolRunnerWindow(QObject):
total = int(m.group(2))
msg = m.group(3).strip()
# Some flows use [0/6] for dependency checks.
if total > 0:
self._last_step_total = total
self._progress.setRange(0, total)
@@ -284,9 +274,7 @@ def _strip_ansi(s: str) -> str:
def _clean_status_line(s: str) -> str:
# Drop leading glyphs used by the CLI (info/warn/success/error)
s = re.sub(r"^[✓⚠✗ℹ]\s+", "", s).strip()
# Collapse extra whitespace
s = re.sub(r"\s+", " ", s).strip()
return s

View File

@@ -27,7 +27,6 @@ def load_ui(ui_path: Path) -> object:
def require_child(parent: object, name: str, typ: type[T]) -> T:
# Qt objects implement findChild; keep typing light.
child = parent.findChild(typ, name) # type: ignore[attr-defined]
if child is None:
raise RuntimeError(f"UI is missing required widget '{name}' ({typ.__name__})")

View File

@@ -1,12 +1,5 @@
# -*- coding: utf-8 -*-
################################################################################
## Form generated from reading UI file 'tool_runner.ui'
##
## Created by: Qt User Interface Compiler version 6.10.2
##
## WARNING! All changes made in this file will be lost when recompiling UI file!
################################################################################
from PySide6.QtCore import (QCoreApplication, QDate, QDateTime, QLocale,
QMetaObject, QObject, QPoint, QRect,
@@ -149,7 +142,6 @@ class Ui_ToolRunnerWindow(object):
self.retranslateUi(ToolRunnerWindow)
QMetaObject.connectSlotsByName(ToolRunnerWindow)
# setupUi
def retranslateUi(self, ToolRunnerWindow):
ToolRunnerWindow.setWindowTitle(QCoreApplication.translate("ToolRunnerWindow", u"Tool", None))
@@ -165,5 +157,4 @@ class Ui_ToolRunnerWindow(object):
self.currentStepLabel.setText(QCoreApplication.translate("ToolRunnerWindow", u"Idle", None))
self.statusLabel.setText(QCoreApplication.translate("ToolRunnerWindow", u"Idle", None))
self.clearLogButton.setText(QCoreApplication.translate("ToolRunnerWindow", u"Clear log", None))
# retranslateUi

View File

@@ -1,12 +1,5 @@
# -*- coding: utf-8 -*-
################################################################################
## Form generated from reading UI file 'form.ui'
##
## Created by: Qt User Interface Compiler version 6.10.2
##
## WARNING! All changes made in this file will be lost when recompiling UI file!
################################################################################
from PySide6.QtCore import (QCoreApplication, QDate, QDateTime, QLocale,
QMetaObject, QObject, QPoint, QRect,
@@ -557,7 +550,6 @@ class Ui_MainWindow(object):
QMetaObject.connectSlotsByName(MainWindow)
# setupUi
def retranslateUi(self, MainWindow):
MainWindow.setWindowTitle(QCoreApplication.translate("MainWindow", u"Jibo Tools", None))
@@ -627,5 +619,4 @@ class Ui_MainWindow(object):
self.tabWidget.setTabText(self.tabWidget.indexOf(self.tabStatus), QCoreApplication.translate("MainWindow", u"Status", None))
self.robotOsComingSoon.setText(QCoreApplication.translate("MainWindow", u"Coming soon.", None))
self.tabWidget.setTabText(self.tabWidget.indexOf(self.tabRobotOs), QCoreApplication.translate("MainWindow", u"Robot OS", None))
# retranslateUi

View File

@@ -398,6 +398,41 @@ Returning to normal mode
After update, set /var/jibo/mode.json back to normal (no prompt).
--no-return-normal
## Updater: Interactive TUI and GUI integration
The bundled `jibo_updater.py` now supports a simple interactive text UI and a small programmatic interface intended for later GUI integration.
- `--tui`: Launch a brief interactive text UI to pick a distribution host and release, or select a local archive under `jibo_work/updates/downloads/`.
- `--distributors <path>`: Use `Distributors.json` (default) to get a list of release hosts to probe for latency and releases.
Standalone TUI helper
---------------------
If you want a more polished terminal UI, use the new `jibo_updater_tui.py` curses-based helper. It provides
keyboard navigation and prints a JSON selection to stdout suitable for piping into other scripts.
Usage:
```bash
python3 jibo_updater_tui.py --distributors Distributors.json
```
The TUI outputs a JSON object describing the selected host and release, for example:
```json
{"host": "https://code.zane.org/..", "source":"remote", "tag":"v3.3.0", "tarball_url":"https://..."}
```
Behavior notes:
- The TUI will probe hosts listed in `Distributors.json` and present latency and available releases.
- Local archives found in `jibo_work/updates/downloads/` are shown as a "local" source and can be chosen without downloading.
- When updating, uploaded files and directories are set to permissive `0777` to avoid boot failures caused by missing execute/read permissions.
GUI integration:
- `jibo_updater.py` includes a simple programmatic surface (CLI flags and a small interactive mode) designed so a GUI can call it or be wired to a future HTTP/JSON control API. For now, use `--tui` to exercise the flow; GUI hooks will be documented in `CHECKLIST.md` for the next steps.
Dependencies: no additional Python packages were added for this change (uses standard library + `paramiko` already required). If you use the GUI in future, update `requirements-gui.txt` accordingly.
Never prompt and never change mode back.
Examples
Update to latest:

Binary file not shown.

View File

@@ -24,19 +24,14 @@ from pathlib import Path
from typing import Optional, Tuple, List
from dataclasses import dataclass
# ============================================================================
# Configuration
# ============================================================================
SCRIPT_DIR = Path(__file__).parent.resolve()
SHOFEL_DIR = SCRIPT_DIR / "Shofel"
WORK_DIR = SCRIPT_DIR / "jibo_work"
# eMMC dump parameters
EMMC_TOTAL_SECTORS = 0x1D60000 # Total sectors to dump (~15GB)
EMMC_SECTOR_SIZE = 512
# Colors for terminal output
class Colors:
RED = '\033[91m'
GREEN = '\033[92m'
@@ -47,7 +42,6 @@ class Colors:
RESET = '\033[0m'
BOLD = '\033[1m'
# Disable colors on Windows unless using Windows Terminal
if platform.system() == "Windows" and "WT_SESSION" not in os.environ:
for attr in dir(Colors):
if not attr.startswith('_'):
@@ -64,9 +58,6 @@ class PartitionInfo:
name: str
# ============================================================================
# Utilities
# ============================================================================
def print_banner():
"""Print the tool banner"""
@@ -151,22 +142,17 @@ def _check_payloads_exist() -> bool:
return all((SHOFEL_DIR / p).exists() for p in critical_payloads)
# ============================================================================
# Dependency Checking
# ============================================================================
def check_linux_dependencies() -> Tuple[bool, List[str], List[str]]:
"""Check for required Linux dependencies"""
missing = []
warnings = []
# Required tools for host build
required_tools = {
"gcc": "build-essential or base-devel",
"make": "build-essential or base-devel",
}
# Optional tools (have fallbacks)
optional_tools = {
"lsusb": "usbutils (optional, used for device detection)",
"fdisk": "util-linux (optional, has Python fallback)",
@@ -180,12 +166,10 @@ def check_linux_dependencies() -> Tuple[bool, List[str], List[str]]:
if not shutil.which(tool):
warnings.append(f"{tool} ({package})")
# Check ARM toolchain only if payloads are missing
if not _check_payloads_exist():
if not shutil.which("arm-none-eabi-gcc"):
missing.append("arm-none-eabi-gcc (arm-none-eabi-gcc or arm-none-eabi-toolchain)")
# Check for libusb
try:
result = subprocess.run(
["pkg-config", "--exists", "libusb-1.0"],
@@ -194,7 +178,6 @@ def check_linux_dependencies() -> Tuple[bool, List[str], List[str]]:
if result.returncode != 0:
missing.append("libusb-1.0-dev or libusb1-devel")
except FileNotFoundError:
# pkg-config not found, try alternative check
if not Path("/usr/include/libusb-1.0").exists() and \
not Path("/usr/local/include/libusb-1.0").exists():
missing.append("libusb-1.0-dev or libusb1-devel")
@@ -207,20 +190,16 @@ def check_windows_dependencies() -> Tuple[bool, List[str], List[str]]:
missing = []
warnings = []
# Check for MinGW or MSYS2
if not shutil.which("gcc") and not shutil.which("x86_64-w64-mingw32-gcc"):
missing.append("MinGW-w64 or MSYS2")
# Check for ARM toolchain only if payloads missing
if not _check_payloads_exist():
if not shutil.which("arm-none-eabi-gcc"):
missing.append("ARM GNU Toolchain (arm-none-eabi-gcc)")
# Check for make
if not shutil.which("make") and not shutil.which("mingw32-make"):
missing.append("GNU Make")
# Optional: debugfs for editing ext filesystem images without mounting
if not shutil.which("debugfs") and not shutil.which("debugfs.exe"):
warnings.append("debugfs (e2fsprogs) - optional but recommended for reliable mode.json edits on Windows")
@@ -243,7 +222,6 @@ def print_install_instructions(system: str, missing: List[str], warnings: List[s
print(f"\n{Colors.BOLD}Installation instructions:{Colors.RESET}")
if system == "Linux":
# Detect distro
distro = "unknown"
if Path("/etc/arch-release").exists():
distro = "arch"
@@ -289,9 +267,6 @@ def print_install_instructions(system: str, missing: List[str], warnings: List[s
""")
# ============================================================================
# Shofel Building
# ============================================================================
def check_shofel_built() -> bool:
"""Check if shofel2_t124 is already built"""
@@ -333,18 +308,14 @@ def build_shofel(force_rebuild: bool = False) -> bool:
print_info("Compiling shofel2_t124...")
try:
# Only clean host build (preserves payload .bin files)
if force_rebuild:
run_command(["make", "clean"], cwd=SHOFEL_DIR, capture_output=True, check=False)
# Build (Makefile will skip existing payload .bin files)
result = run_command(["make"], cwd=SHOFEL_DIR, capture_output=True, check=False)
# Check if the main executable was built
if check_shofel_built():
print_success("Host tool (shofel2_t124) built successfully!")
# Check payloads again
payloads_ok, missing_payloads = check_payloads_built()
if not payloads_ok:
print_error("ARM payload binaries are missing!")
@@ -353,7 +324,6 @@ def build_shofel(force_rebuild: bool = False) -> bool:
print(f"{Colors.YELLOW}The ARM toolchain (arm-none-eabi-gcc) is required to build payloads.{Colors.RESET}")
print()
# Detect distro and provide instructions
if Path("/etc/arch-release").exists():
print(f" {Colors.CYAN}Arch/CachyOS:{Colors.RESET} sudo pacman -S arm-none-eabi-gcc arm-none-eabi-newlib")
elif Path("/etc/debian_version").exists():
@@ -381,20 +351,15 @@ def build_shofel(force_rebuild: bool = False) -> bool:
return False
# ============================================================================
# Jibo Detection
# ============================================================================
def detect_jibo_rcm() -> bool:
"""Detect if Jibo is connected in RCM mode"""
print_info("Looking for Jibo in RCM mode (NVIDIA APX device)...")
if platform.system() == "Linux":
# Try lsusb first
if shutil.which("lsusb"):
try:
result = run_command(["lsusb"], capture_output=True)
# Jibo uses 0955:7740 (NVIDIA APX)
if "0955:7740" in result.stdout:
print_success("Found Jibo in RCM mode!")
return True
@@ -408,7 +373,6 @@ def detect_jibo_rcm() -> bool:
except Exception as e:
print_error(f"lsusb failed: {e}")
# Fallback: check /sys/bus/usb/devices
try:
usb_devices = Path("/sys/bus/usb/devices")
if usb_devices.exists():
@@ -424,16 +388,13 @@ def detect_jibo_rcm() -> bool:
except Exception:
pass
# Final fallback: assume user will connect it
print_warning("Cannot detect USB devices. Please ensure Jibo is in RCM mode.")
print_info("The tool will attempt to connect anyway.")
return True # Let shofel try
elif platform.system() == "Windows":
# On Windows, we need to use different methods
print_warning("Windows USB detection - please ensure Zadig drivers are installed")
print_info("Run Zadig and install WinUSB driver for 'APX' device")
# Try to proceed anyway, shofel will detect it
return True
return False
@@ -459,36 +420,25 @@ def wait_for_jibo_rcm(timeout: int = 60) -> bool:
return False
# ============================================================================
# GPT Partition Parsing
# ============================================================================
def parse_gpt_partitions(dump_path: Path) -> List[PartitionInfo]:
"""Parse GPT partition table from dump file"""
partitions = []
with open(dump_path, "rb") as f:
# Read MBR (sector 0) - skip it
f.seek(512)
# Read GPT header (sector 1)
gpt_header = f.read(512)
# Check GPT signature
signature = gpt_header[:8]
if signature != b'EFI PART':
print_warning("GPT signature not found, trying fdisk parsing...")
return parse_partitions_fdisk(dump_path)
# Parse GPT header
# Offset 72: Partition entries start LBA (8 bytes)
# Offset 80: Number of partition entries (4 bytes)
# Offset 84: Size of partition entry (4 bytes)
partition_entries_lba = struct.unpack("<Q", gpt_header[72:80])[0]
num_entries = struct.unpack("<I", gpt_header[80:84])[0]
entry_size = struct.unpack("<I", gpt_header[84:88])[0]
# Seek to partition entries
f.seek(partition_entries_lba * 512)
for i in range(num_entries):
@@ -496,11 +446,6 @@ def parse_gpt_partitions(dump_path: Path) -> List[PartitionInfo]:
if len(entry) < 128:
break
# Parse partition entry
# Offset 0: Partition type GUID (16 bytes)
# Offset 32: First LBA (8 bytes)
# Offset 40: Last LBA (8 bytes)
# Offset 56: Partition name (72 bytes, UTF-16LE)
type_guid = entry[:16]
if type_guid == b'\x00' * 16:
@@ -509,7 +454,6 @@ def parse_gpt_partitions(dump_path: Path) -> List[PartitionInfo]:
first_lba = struct.unpack("<Q", entry[32:40])[0]
last_lba = struct.unpack("<Q", entry[40:48])[0]
# Parse name (UTF-16LE, null-terminated)
name_bytes = entry[56:128]
try:
name = name_bytes.decode('utf-16le').rstrip('\x00')
@@ -538,14 +482,11 @@ def parse_partitions_fdisk(dump_path: Path) -> List[PartitionInfo]:
check=False
)
# Parse fdisk output
for line in result.stdout.split('\n'):
# Look for lines like: dump.bin1 34 2048033 2048000 1000M Microsoft basic data
if dump_path.name in line and not line.startswith("Disk"):
parts = line.split()
if len(parts) >= 4:
try:
# Extract partition number from name (e.g., dump.bin5 -> 5)
part_name = parts[0]
part_num = int(''.join(c for c in part_name if c.isdigit()) or '0')
@@ -570,15 +511,12 @@ def parse_partitions_fdisk(dump_path: Path) -> List[PartitionInfo]:
def find_var_partition(partitions: List[PartitionInfo]) -> Optional[PartitionInfo]:
"""Find the /var partition (partition 5, ~500MB)"""
# The var partition is typically partition 5 with ~500MB size
for part in partitions:
if part.number == 5:
# Verify it's roughly the right size (450-550 MB)
size_mb = (part.size_sectors * EMMC_SECTOR_SIZE) / (1024 * 1024)
if 400 < size_mb < 600:
return part
# Fallback: look for any ~500MB partition
for part in partitions:
size_mb = (part.size_sectors * EMMC_SECTOR_SIZE) / (1024 * 1024)
if 450 < size_mb < 550:
@@ -588,9 +526,6 @@ def find_var_partition(partitions: List[PartitionInfo]) -> Optional[PartitionInf
return None
# ============================================================================
# Partition Extraction and Modification
# ============================================================================
def extract_partition(dump_path: Path, partition: PartitionInfo, output_path: Path) -> bool:
"""Extract a partition from the dump"""
@@ -626,8 +561,6 @@ def modify_mode_json_direct(partition_path: Path) -> bool:
with open(partition_path, "r+b") as f:
data = bytearray(f.read())
# Best-effort raw replacement.
# IMPORTANT: never change image length and never shift bytes; only overwrite in-place.
json_patterns = [
(b'{"mode":"normal"}', b'{"mode":"int-developer"}'),
(b'{"mode": "normal"}', b'{"mode": "int-developer"}'),
@@ -654,7 +587,6 @@ def modify_mode_json_direct(partition_path: Path) -> bool:
return False
region_len = len(new_json)
# Overwrite the JSON plus the padding region; do NOT shift bytes.
data[offset:offset + region_len] = new_json
f.seek(0)
@@ -680,18 +612,15 @@ def modify_partition_mounted(partition_path: Path) -> bool:
mount_point.mkdir(parents=True, exist_ok=True)
try:
# Mount the partition
print_info(f"Mounting partition at {mount_point}...")
run_command(
["mount", "-o", "loop", str(partition_path), str(mount_point)],
sudo=True
)
# Find and modify mode.json
mode_json_path = mount_point / "jibo" / "mode.json"
if not mode_json_path.exists():
# Try alternative paths
for alt_path in [
mount_point / "mode.json",
mount_point / "etc" / "jibo" / "mode.json",
@@ -703,7 +632,6 @@ def modify_partition_mounted(partition_path: Path) -> bool:
if mode_json_path.exists():
print_info(f"Found mode.json at {mode_json_path}")
# Capture original permissions/ownership so we can restore after copy-write
perm = None
uid = None
gid = None
@@ -720,7 +648,6 @@ def modify_partition_mounted(partition_path: Path) -> bool:
except Exception:
pass
# Save a raw backup copy of mode.json for debugging/recovery
try:
backup_text = run_command(
["cat", str(mode_json_path)],
@@ -732,7 +659,6 @@ def modify_partition_mounted(partition_path: Path) -> bool:
except Exception:
pass
# Read current content (prefer sudo cat so permissions don't bite us)
try:
mode_text = run_command(
["cat", str(mode_json_path)],
@@ -742,23 +668,19 @@ def modify_partition_mounted(partition_path: Path) -> bool:
).stdout
content = json.loads(mode_text)
except Exception:
# Fallback: direct open (works if script is run with sudo)
with open(mode_json_path, "r") as f:
content = json.load(f)
print_info(f"Current mode: {content.get('mode', 'unknown')}")
# Modify
content["mode"] = "int-developer"
# Write back (need sudo)
temp_json = WORK_DIR / "mode_temp.json"
with open(temp_json, "w") as f:
json.dump(content, f)
run_command(["cp", str(temp_json), str(mode_json_path)], sudo=True)
# Restore permissions/ownership if we captured them
if perm is not None:
run_command(["chmod", perm, str(mode_json_path)], sudo=True, check=False)
if uid is not None and gid is not None:
@@ -784,7 +706,6 @@ def modify_partition_mounted(partition_path: Path) -> bool:
return False
finally:
# Always unmount
try:
run_command(["umount", str(mount_point)], sudo=True, check=False)
except:
@@ -811,14 +732,12 @@ def modify_partition_debugfs(partition_path: Path) -> bool:
print_info("Attempting mode.json edit via debugfs (no mount)...")
# Potential locations inside /var
candidate_paths = [
"/jibo/mode.json",
"/mode.json",
"/etc/jibo/mode.json",
]
# Find which path exists by trying to cat it
existing_path: Optional[str] = None
original_text: Optional[str] = None
for p in candidate_paths:
@@ -828,7 +747,6 @@ def modify_partition_debugfs(partition_path: Path) -> bool:
capture_output=True,
check=True,
)
# debugfs prints to stdout for cat
if res.stdout and "File not found" not in res.stdout:
existing_path = p
original_text = res.stdout
@@ -840,7 +758,6 @@ def modify_partition_debugfs(partition_path: Path) -> bool:
print_warning("debugfs could not locate mode.json inside the image")
return False
# Save backup
try:
(WORK_DIR / "mode.json.original").write_text(original_text)
except Exception:
@@ -858,9 +775,6 @@ def modify_partition_debugfs(partition_path: Path) -> bool:
temp_json = WORK_DIR / "mode_temp.json"
temp_json.write_text(new_text)
# Overwrite: remove then write to ensure replacement works even if size differs.
# This may change filesystem allocation, which is fine for full /var write, and
# our patch-write logic can still handle it.
try:
run_command([debugfs, "-w", "-R", f"rm {existing_path}", str(partition_path)], check=False, capture_output=True)
run_command([debugfs, "-w", "-R", f"write {str(temp_json)} {existing_path}", str(partition_path)], capture_output=True)
@@ -881,17 +795,14 @@ def modify_var_partition(partition_path: Path) -> bool:
"""Modify the var partition to enable developer mode"""
print_step(4, 6, "Modifying var partition")
# On Linux, prefer mounting: it's the only truly safe way to update a file in an ext filesystem.
if platform.system() == "Linux":
if modify_partition_mounted(partition_path):
return True
print_warning("Mount-based edit failed; falling back to raw in-place patch")
# If mounting is unavailable (Windows/macOS) or failed, try debugfs (ext filesystem edit without mount)
if modify_partition_debugfs(partition_path):
return True
# Raw patch is a best-effort last resort
if modify_mode_json_direct(partition_path):
return True
@@ -969,7 +880,6 @@ def compute_changed_sector_ranges(original_path: Path, modified_path: Path, sect
base_sector += len(b1) // sector_size
continue
# Chunk differs; identify sector-level diffs within this chunk
sectors_in_chunk = min(len(b1), len(b2)) // sector_size
for i in range(sectors_in_chunk):
s1 = b1[i * sector_size:(i + 1) * sector_size]
@@ -1025,9 +935,6 @@ def write_partition_patch_to_emmc(original_path: Path, modified_path: Path, base
return True
# ============================================================================
# eMMC Operations
# ============================================================================
def get_shofel_path() -> Path:
"""Get the path to shofel2_t124 executable"""
@@ -1058,7 +965,6 @@ def dump_emmc(output_path: Path, start_sector: int = 0, num_sectors: int = EMMC_
str(output_path)
]
# Run with sudo on Linux
if platform.system() == "Linux":
cmd = ["sudo"] + cmd
@@ -1136,7 +1042,6 @@ def verify_write(partition_path: Path, start_sector: int, num_sectors: int) -> b
subprocess.run(cmd, cwd=SHOFEL_DIR, check=True)
# Compare hashes
with open(partition_path, "rb") as f:
original_hash = hashlib.md5(f.read()).hexdigest()
@@ -1157,22 +1062,17 @@ def verify_write(partition_path: Path, start_sector: int, num_sectors: int) -> b
return False
# ============================================================================
# Main Workflow
# ============================================================================
def run_full_mod(args) -> bool:
"""Run the complete modding workflow"""
print_banner()
# Check system
sys_info = get_system_info()
print_info(f"System: {sys_info['os']} ({sys_info['arch']})")
if sys_info['is_wsl']:
print_info("Running in WSL - USB passthrough may require additional setup")
# Check dependencies
print_step(0, 6, "Checking dependencies")
if sys_info['os'] == "Linux":
@@ -1190,25 +1090,20 @@ def run_full_mod(args) -> bool:
print_success("All required dependencies found!")
# Create work directory
WORK_DIR.mkdir(parents=True, exist_ok=True)
# Build Shofel
if not build_shofel(force_rebuild=args.rebuild_shofel):
return False
# Detect or wait for Jibo
if not args.skip_detection:
if not detect_jibo_rcm():
if not wait_for_jibo_rcm(timeout=120):
return False
# Paths
dump_path = WORK_DIR / "jibo_full_dump.bin"
var_partition_path = WORK_DIR / "var_partition.bin"
backup_var_path = WORK_DIR / "var_partition_backup.bin"
# Dump eMMC (or use existing dump)
if args.dump_path:
dump_path = Path(args.dump_path)
if not dump_path.exists():
@@ -1222,7 +1117,6 @@ def run_full_mod(args) -> bool:
if not dump_emmc(dump_path):
return False
# Parse partitions
print_step(3, 6, "Analyzing partition table")
partitions = parse_gpt_partitions(dump_path)
@@ -1235,7 +1129,6 @@ def run_full_mod(args) -> bool:
size_mb = (part.size_sectors * EMMC_SECTOR_SIZE) / (1024 * 1024)
print(f" {part.number}: sectors {part.start_sector}-{part.end_sector} ({size_mb:.1f} MB) - {part.name}")
# Find var partition
var_partition = find_var_partition(partitions)
if not var_partition:
print_error("Could not identify /var partition")
@@ -1243,35 +1136,28 @@ def run_full_mod(args) -> bool:
print_success(f"Identified /var partition: partition {var_partition.number}")
# Extract var partition
if not extract_partition(dump_path, var_partition, var_partition_path):
return False
# Create backup
shutil.copy(var_partition_path, backup_var_path)
print_info(f"Backup created: {backup_var_path}")
# Modify partition
if not modify_var_partition(var_partition_path):
return False
# Check if Jibo still connected (may need to re-enter RCM)
if not args.skip_detection:
print_info("Please ensure Jibo is still in RCM mode")
print_info("If Jibo rebooted, re-enter RCM mode now")
if not wait_for_jibo_rcm(timeout=60):
print_warning("Continuing anyway...")
# Write modified partition
if not write_partition_to_emmc(var_partition_path, var_partition.start_sector):
return False
# Verify
if args.verify:
if not verify_write(var_partition_path, var_partition.start_sector, var_partition.size_sectors):
print_warning("Verification failed, but write may still be successful")
# Done!
print(f"""
{Colors.GREEN}╔═══════════════════════════════════════════════════════════════════╗
{Colors.BOLD}MODDING COMPLETE!{Colors.RESET}{Colors.GREEN}
@@ -1302,11 +1188,9 @@ def run_dump_only(args) -> bool:
WORK_DIR.mkdir(parents=True, exist_ok=True)
# Build Shofel
if not build_shofel(force_rebuild=args.rebuild_shofel):
return False
# Wait for Jibo
if not args.skip_detection:
if not wait_for_jibo_rcm(timeout=120):
return False
@@ -1325,11 +1209,9 @@ def run_write_only(args) -> bool:
print_error(f"Partition file not found: {partition_path}")
return False
# Build Shofel if needed
if not build_shofel():
return False
# Wait for Jibo
if not args.skip_detection:
if not wait_for_jibo_rcm(timeout=120):
return False
@@ -1344,16 +1226,13 @@ def run_mode_json_only(args) -> bool:
WORK_DIR.mkdir(parents=True, exist_ok=True)
# Build Shofel
if not build_shofel(force_rebuild=args.rebuild_shofel):
return False
# Wait for Jibo
if not args.skip_detection:
if not wait_for_jibo_rcm(timeout=120):
return False
# Dump GPT / partition table (small read)
gpt_path = WORK_DIR / "gpt_dump.bin"
gpt_sectors = 4096 # 2MB; safely covers typical GPT entry area
print_info(f"Dumping GPT header/table ({gpt_sectors} sectors)...")
@@ -1375,7 +1254,6 @@ def run_mode_json_only(args) -> bool:
f"(start=0x{var_partition.start_sector:x}, sectors={var_partition.size_sectors})"
)
# Dump /var partition only
original_var_path = WORK_DIR / "var_partition_original.bin"
var_partition_path = WORK_DIR / "var_partition.bin"
backup_var_path = WORK_DIR / "var_partition_backup.bin"
@@ -1388,17 +1266,14 @@ def run_mode_json_only(args) -> bool:
shutil.copy(original_var_path, backup_var_path)
print_info(f"Backup created: {backup_var_path}")
# Modify mode.json inside /var
if not modify_var_partition(var_partition_path):
return False
# Re-check connectivity (optional)
if not args.skip_detection:
print_info("Please ensure Jibo is still in RCM mode")
if not wait_for_jibo_rcm(timeout=60):
print_warning("Continuing anyway...")
# Write back: patch by default, full write if requested
if args.full_var_write:
print_info("Writing full /var partition back to device...")
if not write_partition_to_emmc(var_partition_path, var_partition.start_sector):
@@ -1408,7 +1283,6 @@ def run_mode_json_only(args) -> bool:
if not write_partition_patch_to_emmc(original_var_path, var_partition_path, var_partition.start_sector):
return False
# Verify (reads back full /var; optional)
if args.verify:
if not verify_write(var_partition_path, var_partition.start_sector, var_partition.size_sectors):
print_warning("Verification failed, but write may still be successful")
@@ -1419,9 +1293,6 @@ def run_mode_json_only(args) -> bool:
return True
# ============================================================================
# CLI
# ============================================================================
def main():
parser = argparse.ArgumentParser(
@@ -1436,7 +1307,6 @@ Examples:
"""
)
# Operation modes
mode_group = parser.add_mutually_exclusive_group()
mode_group.add_argument("--dump-only", action="store_true",
help="Only dump the eMMC without modifying")
@@ -1445,7 +1315,6 @@ Examples:
mode_group.add_argument("--mode-json-only", action="store_true",
help="Fast mode: dump GPT + /var only, patch /var/jibo/mode.json, write back minimal changes")
# Options
parser.add_argument("--dump-path", metavar="FILE",
help="Use existing dump file instead of dumping")
parser.add_argument("--output", "-o", metavar="FILE",
@@ -1467,11 +1336,9 @@ Examples:
args = parser.parse_args()
# Validate arguments
if args.write_partition and not args.start_sector:
parser.error("--write-partition requires --start-sector")
# Run appropriate mode
try:
if args.dump_only:
success = run_dump_only(args)

View File

@@ -31,9 +31,14 @@ import urllib.error
import urllib.parse
import urllib.request
import zipfile
import logging
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable, Optional
import socket
import threading
import http.server
import socketserver
import paramiko
@@ -43,6 +48,10 @@ WORK_DIR = SCRIPT_DIR / "jibo_work"
UPDATES_DIR = WORK_DIR / "updates"
STATE_FILE_DEFAULT = WORK_DIR / "update_state.json"
__version__ = "0.2.0"
DEFAULT_UPDATER_RELEASES_API = "https://kevinblog.sytes.net/Code/api/v1/repos/Kevin/JiboUpdater/releases"
DEFAULT_RELEASES_API = "https://kevinblog.sytes.net/Code/api/v1/repos/Kevin/JiboOs/releases"
@@ -115,6 +124,65 @@ def http_get_json(url: str, timeout: int = 20) -> object:
return json.loads(data.decode("utf-8", errors="replace"))
def check_updater_version(releases_api: str, current_version: str) -> tuple[Optional[str], bool]:
"""Return (latest_tag, is_newer) comparing semantic-ish tags.
If the check fails, returns (None, False).
"""
try:
raw = http_get_json(releases_api)
except Exception:
return None, False
if not isinstance(raw, list) or not raw:
return None, False
tags: list[str] = []
for item in raw:
if not isinstance(item, dict):
continue
tags.append(str(item.get("tag_name", "")))
tags = [t for t in tags if t]
if not tags:
return None, False
tags.sort(key=_version_tuple, reverse=True)
latest = tags[0]
try:
is_newer = _version_tuple(latest) > _version_tuple(current_version)
except Exception:
is_newer = False
return latest, is_newer
class _Spinner:
def __init__(self, message: str = ""):
self._stop = threading.Event()
self._thread: Optional[threading.Thread] = None
self.message = message
def start(self):
def _spin():
chars = "|/-\\"
i = 0
while not self._stop.is_set():
sys.stdout.write(f"\r{self.message} {chars[i % len(chars)]}")
sys.stdout.flush()
i += 1
time.sleep(0.12)
sys.stdout.write("\r" + " " * (len(self.message) + 4) + "\r")
sys.stdout.flush()
self._thread = threading.Thread(target=_spin, daemon=True)
self._thread.start()
def stop(self):
self._stop.set()
if self._thread:
self._thread.join(timeout=1)
_VERSION_RE = re.compile(r"^v?(\d+)(?:\.(\d+))?(?:\.(\d+))?")
@@ -153,7 +221,6 @@ def get_latest_release(releases_api: str, allow_prerelease: bool) -> Release:
if not releases:
raise RuntimeError("No releases found (after prerelease filtering)")
# Gitea usually returns newest first, but sort by semver-ish tag to be safe.
releases.sort(key=lambda r: _version_tuple(r.tag_name), reverse=True)
return releases[0]
@@ -171,11 +238,9 @@ def normalize_download_url(download_url: str, base_url: str) -> str:
base = urllib.parse.urlparse(base_url)
dl = urllib.parse.urlparse(download_url)
# If already matches, keep as-is.
if dl.scheme == base.scheme and dl.netloc == base.netloc:
return download_url
# If download URL is missing components or has a different host, rewrite it.
return urllib.parse.urlunparse(
(base.scheme, base.netloc, dl.path, dl.params, dl.query, dl.fragment)
)
@@ -261,7 +326,6 @@ def _extract(archive: Path, extract_dir: Path, *, force: bool = False) -> Path:
member_path = extract_dir / member.name
if not _is_within(extract_dir, member_path):
raise RuntimeError(f"Unsafe path in tar archive: {member.name}")
# Python 3.14 changes tar default filtering behavior; be explicit.
try:
tf.extractall(extract_dir, filter="data")
except TypeError:
@@ -291,7 +355,6 @@ def _score_build_dir(path: Path) -> int:
for name, weight in (("etc", 5), ("opt", 5), ("var", 2), ("usr", 2), ("lib", 1), ("bin", 1)):
if (path / name).exists():
score += weight
# Prefer build dirs that are under a version folder like V3.1/build
parts = {p.lower() for p in path.parts}
if any(re.fullmatch(r"v\d+(?:\.\d+)*", p, flags=re.IGNORECASE) for p in parts):
score += 2
@@ -362,7 +425,6 @@ def ssh_exec(client: paramiko.SSHClient, command: str, timeout: int = 60) -> tup
def ensure_remote_dir(sftp: paramiko.SFTPClient, remote_dir: str) -> None:
# Create each path component if missing.
parts = [p for p in remote_dir.split("/") if p]
cur = "/"
for part in parts:
@@ -394,6 +456,10 @@ def upload_tree(
if dry_run:
continue
ensure_remote_dir(sftp, remote_path)
try:
sftp.chmod(remote_path, 0o777)
except Exception:
pass
continue
if p.is_symlink():
@@ -401,19 +467,20 @@ def upload_tree(
if dry_run:
sent += 1
continue
# Ensure parent exists
ensure_remote_dir(sftp, posixpath.dirname(remote_path))
try:
# Remove if exists
try:
sftp.remove(remote_path)
except IOError:
pass
sftp.symlink(target, remote_path)
except Exception:
# Fallback: dereference and upload file content
real_path = p.resolve()
sftp.put(str(real_path), remote_path)
try:
sftp.chmod(remote_path, 0o777)
except Exception:
pass
sent += 1
if sent % 200 == 0:
print_info(f"Uploaded {sent}/{total} entries...")
@@ -427,8 +494,7 @@ def upload_tree(
ensure_remote_dir(sftp, posixpath.dirname(remote_path))
sftp.put(str(p), remote_path)
try:
mode = p.stat().st_mode & 0o777
sftp.chmod(remote_path, mode)
sftp.chmod(remote_path, 0o777)
except Exception:
pass
@@ -455,16 +521,179 @@ def set_mode_json_to_normal(sftp: paramiko.SFTPClient) -> None:
data["mode"] = "normal"
new_content = json.dumps(data, separators=(",", ": ")) + "\n"
except Exception:
# Fallback for non-standard formatting
new_content = re.sub(r'("mode"\s*:\s*")([^"]+)(")', r'\1normal\3', content)
if new_content == content:
# As a last resort, overwrite with a minimal JSON.
new_content = '{"mode": "normal"}\n'
with sftp.open(remote, "w") as f:
f.write(new_content.encode("utf-8"))
def load_distributors_file(path: Path) -> dict:
try:
raw = json.loads(path.read_text("utf-8"))
return raw if isinstance(raw, dict) else {}
except Exception:
return {}
def measure_host_latency(url: str, timeout: int = 5) -> float:
start = time.time()
try:
req = urllib.request.Request(url, headers={"User-Agent": "JiboUpdater/1.0"})
with urllib.request.urlopen(req, timeout=timeout) as resp:
resp.read(512)
return time.time() - start
except Exception:
return float("inf")
def get_releases_from_host(api_url: str) -> list[Release]:
try:
raw = http_get_json(api_url)
except Exception:
return []
releases: list[Release] = []
if isinstance(raw, list):
for item in raw:
if not isinstance(item, dict):
continue
releases.append(
Release(
tag_name=str(item.get("tag_name", "")),
name=str(item.get("name", "")),
prerelease=bool(item.get("prerelease", False)),
tarball_url=str(item.get("tarball_url", "")),
zipball_url=str(item.get("zipball_url", "")),
)
)
return releases
def list_local_archives() -> list[Release]:
dl = UPDATES_DIR / "downloads"
found: list[Release] = []
if not dl.exists():
return found
for p in dl.iterdir():
if not p.is_file():
continue
name = p.name
if name.endswith((".tar.gz", ".tgz", ".zip")):
tag = name.rsplit(".", 2)[0]
found.append(Release(tag_name=tag, name=tag, prerelease=False, tarball_url=str(p), zipball_url=""))
return found
def robots_config_path() -> Path:
return WORK_DIR / "robots.json"
def load_robots() -> dict:
p = robots_config_path()
if not p.exists():
return {}
try:
return json.loads(p.read_text("utf-8"))
except Exception:
return {}
def save_robots(data: dict) -> None:
p = robots_config_path()
p.parent.mkdir(parents=True, exist_ok=True)
p.write_text(json.dumps(data, indent=2) + "\n", encoding="utf-8")
def fetch_robot_identity(host: str, user: str, password: str, timeout: int = 10) -> Optional[str]:
try:
client = ssh_connect(host, user, password, timeout=timeout)
try:
sftp = client.open_sftp()
try:
with sftp.open("/var/jibo/identity.json", "r") as f:
content = f.read().decode("utf-8", errors="replace")
data = json.loads(content)
name = None
if isinstance(data, dict):
name = data.get("name") or data.get("robot_name")
if isinstance(name, str):
return name
finally:
sftp.close()
finally:
client.close()
except Exception:
return None
return None
def prompt_select_release_and_host(distributors_file: Path) -> tuple[Optional[Release], Optional[str], str]:
d = load_distributors_file(distributors_file)
hosts = d.get("UpdateHosts") or d.get("OfficialHosts") or []
hosts = [h for h in hosts if isinstance(h, str)]
print_info("Checking hosts for latency and available releases...")
host_infos = []
for h in hosts:
lat = measure_host_latency(h)
releases = get_releases_from_host(h)
host_infos.append((h, lat, releases))
local_releases = list_local_archives()
if local_releases:
host_infos.append(("local", 0.0, local_releases))
print("Hosts (lower latency preferred):")
host_infos.sort(key=lambda t: (t[1] if isinstance(t[1], float) else float("inf")))
for idx, (h, lat, rels) in enumerate(host_infos, start=1):
label = f"{h} ({'local' if h=='local' else f'{lat:.2f}s'}) - {len(rels)} releases"
print(f"{idx}) {label}")
chosen_host_idx = None
while chosen_host_idx is None:
ans = input("Choose host number to browse releases (or q to cancel): ").strip()
if ans.lower() in {"q", "quit", "exit"}:
return None, None, ""
if not ans.isdigit():
print("Enter a number.")
continue
i = int(ans)
if i < 1 or i > len(host_infos):
print("Out of range")
continue
chosen_host_idx = i - 1
host, lat, releases = host_infos[chosen_host_idx]
if not releases:
print_warning("No releases found for that host.")
return None, host, "remote"
releases.sort(key=lambda r: _version_tuple(r.tag_name), reverse=True)
for idx, r in enumerate(releases, start=1):
pre = " [prerelease]" if r.prerelease else ""
print(f"{idx}) {r.tag_name}{pre} - {r.name}")
ans = input("Choose release number (or 'l' to list release notes, number to pick, q to cancel): ").strip()
if ans.lower() == "q":
return None, host, ""
if ans.lower() == "l":
sub = input("Release number to show notes: ").strip()
if sub.isdigit():
si = int(sub) - 1
if 0 <= si < len(releases):
print(releases[si].name)
print(releases[si].tag_name)
return None, host, ""
if not ans.isdigit():
return None, host, ""
ri = int(ans) - 1
if ri < 0 or ri >= len(releases):
return None, host, ""
chosen = releases[ri]
source = "local" if host == "local" else "remote"
return chosen, host, source
def main() -> int:
_no_color_if_not_tty()
@@ -473,6 +702,9 @@ def main() -> int:
parser.add_argument("--user", default="root", help="SSH username (default: root)")
parser.add_argument("--password", default="jibo", help="SSH password (default: jibo)")
parser.add_argument("--releases-api", default=DEFAULT_RELEASES_API, help="Gitea releases API URL")
parser.add_argument("--distributors", type=Path, default=Path("Distributors.json"), help="Path to Distributors.json to check multiple hosts")
parser.add_argument("--tui", action="store_true", help="Run an interactive text UI to pick host/release")
parser.add_argument("--updater-releases-api", default=DEFAULT_UPDATER_RELEASES_API, help="Releases API to check for updater updates")
parser.add_argument("--stable", action="store_true", help="Ignore prereleases")
parser.add_argument("--tag", help="Install a specific tag (e.g. v3.3.0) instead of latest")
@@ -501,11 +733,48 @@ def main() -> int:
_ensure_dirs()
logp = WORK_DIR / "updater.log"
logp.parent.mkdir(parents=True, exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s: %(message)s",
handlers=[logging.FileHandler(logp, encoding="utf-8"), logging.StreamHandler(sys.stdout)],
)
logging.info("jibo_updater starting, version %s", __version__)
spinner = _Spinner("Checking updater version...")
spinner.start()
try:
latest_tag, is_newer = check_updater_version(args.updater_releases_api, __version__)
finally:
spinner.stop()
if latest_tag:
if is_newer:
msg = f"Updater update available: {latest_tag} (current {__version__})"
print_warning(msg)
logging.info(msg)
else:
msg = f"Updater is up-to-date ({__version__})"
print_info(msg)
logging.info(msg)
else:
logging.info("Updater version check failed or no releases found")
allow_prerelease = not args.stable
print_info("Checking latest release...")
if args.tag:
# Fetch all releases and pick the one matching tag
chosen_remote_source: Optional[str] = None
chosen_source_type = "remote"
if args.tui:
rel_choice, host_choice, source = prompt_select_release_and_host(args.distributors)
if rel_choice is None:
print_info("No release selected; aborting.")
return 2
release = rel_choice
chosen_remote_source = host_choice
chosen_source_type = source
elif args.tag:
raw = http_get_json(args.releases_api)
if not isinstance(raw, list):
raise RuntimeError("Unexpected releases API response")
@@ -550,27 +819,33 @@ def main() -> int:
print_info("Aborted.")
return 2
# Download + extract
archive_name = f"{release.tag_name}.tar.gz"
archive_path = UPDATES_DIR / "downloads" / archive_name
extract_dir = UPDATES_DIR / "extracted" / release.tag_name
if chosen_remote_source and chosen_source_type == "remote":
tarball_url = normalize_download_url(release.tarball_url, chosen_remote_source)
elif chosen_source_type == "local":
tarball_url = release.tarball_url
else:
tarball_url = normalize_download_url(release.tarball_url, args.releases_api)
try:
if isinstance(tarball_url, str) and Path(tarball_url).exists():
archive_path = Path(tarball_url)
print_info(f"Using local archive: {archive_path}")
else:
_download(tarball_url, archive_path, force=args.force)
except urllib.error.URLError as e:
raise RuntimeError(f"Download failed: {e}")
_extract(archive_path, extract_dir, force=args.force)
# Gitea archives usually create a single top-level folder. Prefer that as the search root.
children = [p for p in extract_dir.iterdir() if p.is_dir()]
search_root = children[0] if len(children) == 1 else extract_dir
build_dir = find_build_dir(search_root, args.build_path)
# Connect and update
print_info(f"Connecting to {args.user}@{args.host} ...")
client = ssh_connect(args.host, args.user, args.password, timeout=args.ssh_timeout)
try:
@@ -619,7 +894,6 @@ def main() -> int:
sftp.close()
if not args.dry_run:
# Update local state
if isinstance(state, dict):
state[args.host] = release.tag_name
save_state(args.state_file, state)

View File

@@ -16,3 +16,7 @@ if command -v python3 >/dev/null 2>&1; then
fi
exec python "$SCRIPT_DIR/jibo_updater.py" "$@"
# NOTE: The updater uses only the standard library and `paramiko`.
# If you don't have `paramiko` installed in your environment, install it:
# python3 -m pip install paramiko

190
jibo_updater_tui.py Normal file
View File

@@ -0,0 +1,190 @@
#!/usr/bin/env python3
"""Simple curses TUI for selecting a distribution host and release.
This TUI reuses helper functions from `jibo_updater.py` to probe hosts and
list local archives. It prints a JSON object to stdout with the chosen
selection so other tools or a GUI wrapper can invoke it.
Usage:
python3 jibo_updater_tui.py [--distributors Distributors.json]
Output (on success): JSON to stdout, e.g.
{"host": "https://...","source":"remote","tag":"v3.3.0","tarball_url":"https://..."}
Keyboard:
- Up/Down: navigate
- Enter: select
- b: back
- v: view release notes/name
- q: quit
"""
from __future__ import annotations
import curses
import json
import sys
from pathlib import Path
from typing import List
try:
from jibo_updater import (
load_distributors_file,
measure_host_latency,
get_releases_from_host,
list_local_archives,
_version_tuple,
check_updater_version,
__version__ as UPDATER_VERSION,
DEFAULT_UPDATER_RELEASES_API,
)
except Exception as e:
print(f"Failed to import helpers from jibo_updater: {e}", file=sys.stderr)
raise
def gather_host_infos(distributors_path: Path):
d = load_distributors_file(distributors_path)
hosts = d.get("UpdateHosts") or d.get("OfficialHosts") or []
hosts = [h for h in hosts if isinstance(h, str)]
infos = []
for h in hosts:
lat = measure_host_latency(h)
rels = get_releases_from_host(h)
infos.append({"host": h, "lat": lat, "rels": rels})
local = list_local_archives()
if local:
infos.append({"host": "local", "lat": 0.0, "rels": local})
infos.sort(key=lambda x: x["lat"] if isinstance(x["lat"], float) else float("inf"))
return infos
class TUI:
def __init__(self, stdscr, distributors: Path):
self.stdscr = stdscr
self.distributors = distributors
curses.curs_set(0)
try:
latest, is_newer = check_updater_version(DEFAULT_UPDATER_RELEASES_API, UPDATER_VERSION)
if latest and is_newer:
self.updater_status = f"Updater update available: {latest} (current {UPDATER_VERSION})"
elif latest:
self.updater_status = f"Updater up-to-date ({UPDATER_VERSION})"
else:
self.updater_status = "Updater version check failed"
except Exception:
self.updater_status = "Updater version check failed"
self.hosts = gather_host_infos(distributors)
def draw_list(self, items: List[str], title: str, idx: int, offset: int = 0):
self.stdscr.clear()
h, w = self.stdscr.getmaxyx()
header = title[: w - 1]
status = (" - " + self.updater_status) if hasattr(self, "updater_status") else ""
if len(header) + len(status) < w - 1:
header = header + status
self.stdscr.addstr(0, 0, header[: w - 1])
for i, line in enumerate(items[offset : offset + h - 3]):
y = i + 2
style = curses.A_REVERSE if (i + offset) == idx else curses.A_NORMAL
try:
self.stdscr.addstr(y, 0, line[: w - 1], style)
except curses.error:
pass
self.stdscr.addstr(h - 1, 0, "Enter=select v=view b=back q=quit")
self.stdscr.refresh()
def run(self):
if not self.hosts:
self.stdscr.addstr(0, 0, "No hosts found in distributors file or no local archives.")
self.stdscr.addstr(2, 0, "Press any key to exit.")
self.stdscr.getch()
return 1
idx = 0
offset = 0
while True:
items = [f"{h['host']} ({'local' if h['host']=='local' else f'{h['lat']:.2f}s'}) - {len(h['rels'])} releases" for h in self.hosts]
self.draw_list(items, "Select host:", idx, offset)
c = self.stdscr.getch()
if c in (curses.KEY_DOWN, ord('j')):
if idx < len(items) - 1:
idx += 1
elif c in (curses.KEY_UP, ord('k')):
if idx > 0:
idx -= 1
elif c in (ord('\n'), ord('\r')):
choice = self.hosts[idx]
res = self.show_releases(choice)
if res:
print(json.dumps(res))
return 0
elif c in (ord('q'), 27):
return 1
def show_releases(self, host_info):
rels = host_info["rels"]
if not rels:
return None
rels.sort(key=lambda r: _version_tuple(r.tag_name), reverse=True)
idx = 0
while True:
items = [f"{r.tag_name}{' [prerelease]' if r.prerelease else ''} - {r.name}" for r in rels]
self.draw_list(items, f"Host: {host_info['host']}", idx)
c = self.stdscr.getch()
if c in (curses.KEY_DOWN, ord('j')):
if idx < len(items) - 1:
idx += 1
elif c in (curses.KEY_UP, ord('k')):
if idx > 0:
idx -= 1
elif c in (ord('b'), 8):
return None
elif c == ord('v'):
self.show_text(rels[idx].name or "(no notes)")
elif c in (ord('\n'), ord('\r')):
chosen = rels[idx]
res = {
"host": host_info["host"],
"source": "local" if host_info["host"] == "local" else "remote",
"tag": chosen.tag_name,
"tarball_url": chosen.tarball_url,
}
return res
elif c in (ord('q'), 27):
return None
def show_text(self, text: str):
self.stdscr.clear()
h, w = self.stdscr.getmaxyx()
lines = []
for ln in text.splitlines():
while ln:
lines.append(ln[: w - 1])
ln = ln[w - 1 :]
for i, ln in enumerate(lines[: h - 2]):
try:
self.stdscr.addstr(i, 0, ln)
except curses.error:
pass
self.stdscr.addstr(h - 1, 0, "Press any key to return")
self.stdscr.refresh()
self.stdscr.getch()
def main(argv):
import argparse
parser = argparse.ArgumentParser(description="Curses TUI for jibo_updater selection")
parser.add_argument("--distributors", type=Path, default=Path("Distributors.json"), help="Path to Distributors.json")
args = parser.parse_args(argv)
curses.wrapper(lambda stdscr: TUI(stdscr, args.distributors).run())
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]) or 0)