diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5b9bd87 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ + +*.bak diff --git a/Distributors.json b/Distributors.json new file mode 100644 index 0000000..5a7938c --- /dev/null +++ b/Distributors.json @@ -0,0 +1,8 @@ +{ + "LastUpdated": "28-3-26", + "OfficialHosts": [ + "https://code.zane.org/ZaneDev/JiboOs-Mirror/releases", + "https://kevinblog.sytes.net/Code/api/v1/repos/Kevin/JiboOs/releases" + ], + "UnoficialHosts": [] +} \ No newline at end of file diff --git a/JiboTools/JiboTools/gui/config_inventory.py b/JiboTools/JiboTools/gui/config_inventory.py index 38c9520..13fea99 100644 --- a/JiboTools/JiboTools/gui/config_inventory.py +++ b/JiboTools/JiboTools/gui/config_inventory.py @@ -67,13 +67,11 @@ def load_config_entries_from_values_md() -> list[ConfigEntry]: if path in seen: continue - # Filter out non-robot/server dev configs the user doesn't want here. if path.startswith("/hub-shim/"): continue if path.lower().endswith(".md"): continue - # Keep it focused on JSON files (these are strict JSON configs). if not path.lower().endswith(".json"): continue diff --git a/JiboTools/JiboTools/gui/main_panel.py b/JiboTools/JiboTools/gui/main_panel.py index 1ca9a96..845ae9e 100644 --- a/JiboTools/JiboTools/gui/main_panel.py +++ b/JiboTools/JiboTools/gui/main_panel.py @@ -50,12 +50,10 @@ class MainWindowController: self._identity: Optional[dict] = None self._connecting = False - # Tabs + connection pill self.tab_widget = require_child(self.window, "tabWidget", QTabWidget) self.connection_pill, self.conn_dot, self.conn_text = self._create_connection_pill() self.tab_widget.setCornerWidget(self.connection_pill, Qt.TopRightCorner) - # Jibo/config self.jibo_ip = require_child(self.window, "JiboIpField", QLineEdit) self.connect_button = require_child(self.window, "TryToConnect", QPushButton) self.jibo_title = require_child(self.window, "jiboTitle", QLabel) @@ -65,7 +63,6 @@ class MainWindowController: self.ha_enable = require_child(self.window, "haEnableCheck", QCheckBox) self.ha_server_ip = require_child(self.window, "haServerIpField", QLineEdit) - # AI Bridge (formerly "AI Provider") self.ai_enable = require_child(self.window, "aiEnableCheck", QCheckBox) self.ai_mode = require_child(self.window, "aiProviderCombo", QComboBox) self.ai_server_base_url = require_child(self.window, "aiEndpointField", QLineEdit) @@ -82,10 +79,8 @@ class MainWindowController: self._ai_bridge_obj: Optional[dict[str, Any]] = None - # Tool settings self.enable_logging_check = require_child(self.window, "enableLoggingCheck", QCheckBox) - # Config editor (main panel "Config" section) self.config_file_combo = require_child(self.window, "configFileCombo", QComboBox) self.config_read_button = require_child(self.window, "configReadButton", QPushButton) self.config_write_button = require_child(self.window, "configWriteButton", QPushButton) @@ -96,18 +91,15 @@ class MainWindowController: self._config_last_read_text: Optional[str] = None self._config_paths: list[str] = [] - # Jibo card controls self.robot_settings_button = require_child(self.window, "RobotSettings", QPushButton) self.robot_action_combo = require_child(self.window, "comboBox", QComboBox) self.jibo_image = require_child(self.window, "jiboImage", QLabel) self._robot_settings_window: Optional[object] = None - # Update page self.install_button = require_child(self.window, "installButton", QPushButton) self.check_updates_button = require_child(self.window, "checkUpdatesButton", QPushButton) - # Status page self.status_dot = require_child(self.window, "statusDot", QLabel) self.status_text = require_child(self.window, "statusText", QLabel) @@ -137,7 +129,6 @@ class MainWindowController: return self.session_connected def _configure_ui(self) -> None: - # Simple styling, roughly matching the previous QML look. self.connection_pill.setStyleSheet( "QFrame#connectionPill {" "background-color: #f6f6f6;" @@ -146,22 +137,18 @@ class MainWindowController: "}" ) - # AI Bridge mode choices self.ai_mode.clear() self.ai_mode.addItems(["TEXT", "AUDIO"]) - # Robot controls start disabled until connected. self.robot_settings_button.setEnabled(False) self.robot_action_combo.setEnabled(False) - # Config editor defaults self.config_editor.setPlaceholderText("Select a config file, then Read") self.config_activity_log.setReadOnly(True) self.config_activity_log.setPlaceholderText("Logging is disabled") self.config_read_button.setEnabled(False) self.config_write_button.setEnabled(False) - # Defaults self.connect_button.setText("Connect") self.jibo_title.setText("Connect Your Jibo") @@ -179,7 +166,6 @@ class MainWindowController: self.edit_ai_bridge_button.clicked.connect(self._jump_to_ai_bridge_config) - # Keep AI Bridge in-memory config in sync with UI edits. self.ai_enable.toggled.connect(self._sync_ai_bridge_obj_from_ui) self.ai_mode.currentIndexChanged.connect(self._sync_ai_bridge_obj_from_ui) self.ai_server_base_url.textChanged.connect(self._sync_ai_bridge_obj_from_ui) @@ -218,7 +204,6 @@ class MainWindowController: ssh_client=self._ssh_client, logging_enabled_check=self.enable_logging_check, ) - # Refresh the SSH client reference in case we reconnected. try: self._robot_settings_window.set_ssh_client(self._ssh_client) # type: ignore[attr-defined] except Exception: @@ -245,12 +230,10 @@ class MainWindowController: self.ai_followup_enabled.setEnabled(ai_enabled) self.ai_followup_delay_ms.setEnabled(ai_enabled) - # Connection button enabled unless a connect attempt is in progress. self.connect_button.setEnabled(not self._connecting) connected = self.session_connected self.config_read_button.setEnabled(connected and self.config_file_combo.count() > 0) - # write button is controlled by editor dirty state def _sync_all(self) -> None: host = self.host @@ -285,7 +268,6 @@ class MainWindowController: else: self.status_text.setText("No Jibo IP configured") - # Image swap assets = Path(__file__).resolve().parent / "Assets" / "Jibo" img_path = assets / ("JiboFaceForward.png" if visual_connected else "NoJiboConnected.png") pm = QPixmap(str(img_path)) @@ -340,7 +322,6 @@ class MainWindowController: self.config_activity_log.appendPlainText(message) def _populate_config_file_combo(self) -> None: - # Populate from inventory, excluding /usr/local/etc (those belong under Robot Settings) entries = load_config_entries_from_values_md() paths = [e.remote_path for e in entries if not e.is_usr_local_etc] paths = sorted(paths) @@ -369,8 +350,6 @@ class MainWindowController: self.config_file_combo.setCurrentIndex(idx) self._read_selected_config() - # Seed editor with the current AI Bridge UI state so the user can - # immediately press Write. try: merged = self._merged_ai_bridge_obj_from_ui() desired_text = json.dumps(merged, indent=2, ensure_ascii=False) + "\n" @@ -539,7 +518,6 @@ class MainWindowController: return base def _sync_ai_bridge_obj_from_ui(self, *_args: Any) -> None: - # Keep unknown keys (if any) from the on-robot JSON. try: self._ai_bridge_obj = self._merged_ai_bridge_obj_from_ui() except Exception: @@ -577,7 +555,6 @@ class MainWindowController: except Exception: old_obj = MISSING - # Safety: if a /usr/local path ever ends up here, handle remount. if p.startswith("/usr/local/"): cmd = "mount -o remount,rw /usr/local" self._log(f"EXEC {cmd}") @@ -666,12 +643,10 @@ class MainWindowController: identity = json.loads(raw_text) - # Success: store session. self._ssh_client = client self._identity = identity if isinstance(identity, dict) else None self.status_text.setText(f"Connected via SSH to {host}") - # Auto-populate AI Bridge section when connected. try: self._load_ai_bridge_from_robot() except Exception: @@ -705,7 +680,6 @@ class MainWindowController: layout.addWidget(dot) layout.addWidget(text) - # Keep it tight on the tab bar. pill.setSizePolicy(pill.sizePolicy().horizontalPolicy(), pill.sizePolicy().verticalPolicy()) pill.setMinimumHeight(28) return pill, dot, text diff --git a/JiboTools/JiboTools/gui/process_runner.py b/JiboTools/JiboTools/gui/process_runner.py index 6676d72..adb5d19 100644 --- a/JiboTools/JiboTools/gui/process_runner.py +++ b/JiboTools/JiboTools/gui/process_runner.py @@ -45,7 +45,6 @@ def resolve_python_invocation() -> tuple[str, list[str]]: if venv_py.exists(): return (str(venv_py), []) - # Prefer the current interpreter when running inside a venv (e.g. Qt Creator). try: if sys.executable and Path(sys.executable).exists(): return (sys.executable, []) @@ -64,7 +63,6 @@ def resolve_python_invocation() -> tuple[str, list[str]]: def resolve_python() -> str: program, prefix = resolve_python_invocation() if prefix: - # Best-effort string representation (mostly for display) return " ".join([program] + prefix) return program @@ -82,7 +80,6 @@ def _pick_terminal_command() -> Optional[list[str]]: return None candidates: list[list[str]] = [] - # Debian/Ubuntu alternative system candidates.append(["x-terminal-emulator", "-e"]) candidates.append(["gnome-terminal", "--"]) candidates.append(["konsole", "-e"]) @@ -102,8 +99,6 @@ def spawn_in_terminal(argv: list[str]) -> bool: """ if os.name == "nt": - # Use cmd.exe window, keep it open (/k) - # Build a single string command for cmd. cmdline = " ".join(shlex.quote(a) for a in argv) subprocess.Popen(["cmd", "/c", "start", "cmd", "/k", cmdline], shell=False) return True diff --git a/JiboTools/JiboTools/gui/robot_settings_window.py b/JiboTools/JiboTools/gui/robot_settings_window.py index 960d439..6c4ae44 100644 --- a/JiboTools/JiboTools/gui/robot_settings_window.py +++ b/JiboTools/JiboTools/gui/robot_settings_window.py @@ -44,13 +44,11 @@ class RobotSettingsWindow: splitter = QSplitter(Qt.Horizontal) outer.addWidget(splitter, 1) - # Left: tree self.tree = QTreeWidget() self.tree.setHeaderHidden(True) self.tree.setSelectionMode(QAbstractItemView.SingleSelection) splitter.addWidget(self.tree) - # Right: editor + buttons + log right = QWidget() right_layout = QVBoxLayout(right) right_layout.setContentsMargins(0, 0, 0, 0) @@ -168,7 +166,6 @@ class RobotSettingsWindow: @Slot() def _on_editor_changed(self) -> None: - # Enable write only if we have a loaded file and text changed. if not self._current_remote_path or self._last_read_text is None: self.write_button.setEnabled(False) return @@ -243,14 +240,12 @@ class RobotSettingsWindow: new_text_raw = self.editor.toPlainText() - # Validate JSON if possible; this tool is focused on strict JSON configs. try: new_obj = json.loads(new_text_raw) except Exception as e: QMessageBox.warning(self.window, "Invalid JSON", f"JSON parse failed: {e}") return - # Canonicalize to keep robot-side JSON strict/clean. new_text = json.dumps(new_obj, indent=2, ensure_ascii=False) + "\n" try: @@ -264,7 +259,6 @@ class RobotSettingsWindow: except Exception: old_obj = MISSING - # Mounted dir special case: /usr/local/* is often read-only until remount. if remote_path.startswith("/usr/local/"): cmd = "mount -o remount,rw /usr/local" self._log(f"EXEC {cmd}") @@ -280,7 +274,6 @@ class RobotSettingsWindow: if out.strip(): self._log(out.strip()) - # Compute diffs (best-effort). if old_obj is not MISSING: diffs = diff_json(old_obj, new_obj) if diffs: @@ -302,7 +295,6 @@ class RobotSettingsWindow: try: self._sftp_write_text(remote_path, new_text) self._log(f"WROTE {remote_path} ({len(new_text)} bytes)") - # Refresh read baseline. self.editor.setPlainText(new_text) self._last_read_text = new_text self.write_button.setEnabled(False) diff --git a/JiboTools/JiboTools/gui/tool_runner_window.py b/JiboTools/JiboTools/gui/tool_runner_window.py index c3916eb..77550d1 100644 --- a/JiboTools/JiboTools/gui/tool_runner_window.py +++ b/JiboTools/JiboTools/gui/tool_runner_window.py @@ -63,7 +63,6 @@ class ToolRunnerWindow(QObject): self._host_field.setVisible(self._is_updater) - # Installer-specific UX self._use_existing_dump.setVisible(self._is_installer) self._dump_path.setVisible(self._is_installer) self._browse_dump.setVisible(self._is_installer) @@ -108,7 +107,6 @@ class ToolRunnerWindow(QObject): self._sync_buttons() self._sync_status() - # Ensure the process is stopped when the window closes. self.window.closeEvent = self._on_close # type: ignore[assignment] def show(self) -> None: @@ -125,7 +123,6 @@ class ToolRunnerWindow(QObject): extra = self._extra_args.text().strip() extra_args: list[str] = shlex.split(extra) if extra else [] - # Installer convenience: if the user has an existing dump, pass it via --dump-path if self._is_installer and self._use_existing_dump.isChecked(): dump_path = self._dump_path.text().strip() if dump_path and "--dump-path" not in extra_args: @@ -150,7 +147,6 @@ class ToolRunnerWindow(QObject): self._status.setText("Dump file not found") return - # Reset progress state for a new run. self._output_buffer = "" self._last_step_total = None if self._is_installer: @@ -168,7 +164,6 @@ class ToolRunnerWindow(QObject): @Slot(str) def _append_output(self, chunk: str) -> None: - # Keep it simple: append and scroll to end. self._log.moveCursor(QTextCursor.End) self._log.insertPlainText(chunk) self._log.moveCursor(QTextCursor.End) @@ -181,7 +176,6 @@ class ToolRunnerWindow(QObject): self._start_stop.setText("Stop" if running else "Start") self._open_terminal.setEnabled(not running) if not running and self._is_installer: - # Leave progress/status in a meaningful final state. if self.runner.exitCode == 0 and self._last_step_total: self._progress.setRange(0, self._last_step_total) self._progress.setValue(self._last_step_total) @@ -194,7 +188,6 @@ class ToolRunnerWindow(QObject): def _sync_status(self) -> None: if self.runner.running: self._status.setText("Running...") - # Indeterminate until we see a structured step marker. if self._is_installer and self._last_step_total is None: self._progress.setRange(0, 0) return @@ -233,7 +226,6 @@ class ToolRunnerWindow(QObject): self._output_buffer += chunk lines = self._output_buffer.splitlines(keepends=True) - # Keep any partial line for the next chunk. if lines and not (lines[-1].endswith("\n") or lines[-1].endswith("\r")): self._output_buffer = lines[-1] lines = lines[:-1] @@ -245,7 +237,6 @@ class ToolRunnerWindow(QObject): if not clean: continue - # Also surface meaningful non-step status lines (RCM detection, warnings, etc.) if clean.startswith(("ℹ", "⚠", "✓", "✗")) or "RCM" in clean: msg = _clean_status_line(clean) if msg and not msg.startswith("["): @@ -258,7 +249,6 @@ class ToolRunnerWindow(QObject): total = int(m.group(2)) msg = m.group(3).strip() - # Some flows use [0/6] for dependency checks. if total > 0: self._last_step_total = total self._progress.setRange(0, total) @@ -284,9 +274,7 @@ def _strip_ansi(s: str) -> str: def _clean_status_line(s: str) -> str: - # Drop leading glyphs used by the CLI (info/warn/success/error) s = re.sub(r"^[✓⚠✗ℹ]\s+", "", s).strip() - # Collapse extra whitespace s = re.sub(r"\s+", " ", s).strip() return s diff --git a/JiboTools/JiboTools/gui/ui_loader.py b/JiboTools/JiboTools/gui/ui_loader.py index a5955f7..6ab0bcf 100644 --- a/JiboTools/JiboTools/gui/ui_loader.py +++ b/JiboTools/JiboTools/gui/ui_loader.py @@ -27,7 +27,6 @@ def load_ui(ui_path: Path) -> object: def require_child(parent: object, name: str, typ: type[T]) -> T: - # Qt objects implement findChild; keep typing light. child = parent.findChild(typ, name) # type: ignore[attr-defined] if child is None: raise RuntimeError(f"UI is missing required widget '{name}' ({typ.__name__})") diff --git a/JiboTools/JiboTools/gui/ui_tool_runner.py b/JiboTools/JiboTools/gui/ui_tool_runner.py index b732498..6fb668c 100644 --- a/JiboTools/JiboTools/gui/ui_tool_runner.py +++ b/JiboTools/JiboTools/gui/ui_tool_runner.py @@ -1,12 +1,5 @@ # -*- coding: utf-8 -*- -################################################################################ -## Form generated from reading UI file 'tool_runner.ui' -## -## Created by: Qt User Interface Compiler version 6.10.2 -## -## WARNING! All changes made in this file will be lost when recompiling UI file! -################################################################################ from PySide6.QtCore import (QCoreApplication, QDate, QDateTime, QLocale, QMetaObject, QObject, QPoint, QRect, @@ -149,7 +142,6 @@ class Ui_ToolRunnerWindow(object): self.retranslateUi(ToolRunnerWindow) QMetaObject.connectSlotsByName(ToolRunnerWindow) - # setupUi def retranslateUi(self, ToolRunnerWindow): ToolRunnerWindow.setWindowTitle(QCoreApplication.translate("ToolRunnerWindow", u"Tool", None)) @@ -165,5 +157,4 @@ class Ui_ToolRunnerWindow(object): self.currentStepLabel.setText(QCoreApplication.translate("ToolRunnerWindow", u"Idle", None)) self.statusLabel.setText(QCoreApplication.translate("ToolRunnerWindow", u"Idle", None)) self.clearLogButton.setText(QCoreApplication.translate("ToolRunnerWindow", u"Clear log", None)) - # retranslateUi diff --git a/JiboTools/JiboTools/ui_form.py b/JiboTools/JiboTools/ui_form.py index 47eae42..6ec8e53 100644 --- a/JiboTools/JiboTools/ui_form.py +++ b/JiboTools/JiboTools/ui_form.py @@ -1,12 +1,5 @@ # -*- coding: utf-8 -*- -################################################################################ -## Form generated from reading UI file 'form.ui' -## -## Created by: Qt User Interface Compiler version 6.10.2 -## -## WARNING! All changes made in this file will be lost when recompiling UI file! -################################################################################ from PySide6.QtCore import (QCoreApplication, QDate, QDateTime, QLocale, QMetaObject, QObject, QPoint, QRect, @@ -557,7 +550,6 @@ class Ui_MainWindow(object): QMetaObject.connectSlotsByName(MainWindow) - # setupUi def retranslateUi(self, MainWindow): MainWindow.setWindowTitle(QCoreApplication.translate("MainWindow", u"Jibo Tools", None)) @@ -627,5 +619,4 @@ class Ui_MainWindow(object): self.tabWidget.setTabText(self.tabWidget.indexOf(self.tabStatus), QCoreApplication.translate("MainWindow", u"Status", None)) self.robotOsComingSoon.setText(QCoreApplication.translate("MainWindow", u"Coming soon.", None)) self.tabWidget.setTabText(self.tabWidget.indexOf(self.tabRobotOs), QCoreApplication.translate("MainWindow", u"Robot OS", None)) - # retranslateUi diff --git a/README.md b/README.md index 3fd5d92..ba3d79c 100644 --- a/README.md +++ b/README.md @@ -398,6 +398,41 @@ Returning to normal mode After update, set /var/jibo/mode.json back to normal (no prompt). --no-return-normal +## Updater: Interactive TUI and GUI integration + +The bundled `jibo_updater.py` now supports a simple interactive text UI and a small programmatic interface intended for later GUI integration. + +- `--tui`: Launch a brief interactive text UI to pick a distribution host and release, or select a local archive under `jibo_work/updates/downloads/`. +- `--distributors `: Use `Distributors.json` (default) to get a list of release hosts to probe for latency and releases. + +Standalone TUI helper +--------------------- + +If you want a more polished terminal UI, use the new `jibo_updater_tui.py` curses-based helper. It provides +keyboard navigation and prints a JSON selection to stdout suitable for piping into other scripts. + +Usage: + +```bash +python3 jibo_updater_tui.py --distributors Distributors.json +``` + +The TUI outputs a JSON object describing the selected host and release, for example: + +```json +{"host": "https://code.zane.org/..", "source":"remote", "tag":"v3.3.0", "tarball_url":"https://..."} +``` + +Behavior notes: +- The TUI will probe hosts listed in `Distributors.json` and present latency and available releases. +- Local archives found in `jibo_work/updates/downloads/` are shown as a "local" source and can be chosen without downloading. +- When updating, uploaded files and directories are set to permissive `0777` to avoid boot failures caused by missing execute/read permissions. + +GUI integration: +- `jibo_updater.py` includes a simple programmatic surface (CLI flags and a small interactive mode) designed so a GUI can call it or be wired to a future HTTP/JSON control API. For now, use `--tui` to exercise the flow; GUI hooks will be documented in `CHECKLIST.md` for the next steps. + +Dependencies: no additional Python packages were added for this change (uses standard library + `paramiko` already required). If you use the GUI in future, update `requirements-gui.txt` accordingly. + Never prompt and never change mode back. Examples Update to latest: diff --git a/__pycache__/jibo_updater.cpython-314.pyc b/__pycache__/jibo_updater.cpython-314.pyc new file mode 100644 index 0000000..c351237 Binary files /dev/null and b/__pycache__/jibo_updater.cpython-314.pyc differ diff --git a/jibo_automod.py b/jibo_automod.py index f40fbed..11e0f52 100644 --- a/jibo_automod.py +++ b/jibo_automod.py @@ -24,19 +24,14 @@ from pathlib import Path from typing import Optional, Tuple, List from dataclasses import dataclass -# ============================================================================ -# Configuration -# ============================================================================ SCRIPT_DIR = Path(__file__).parent.resolve() SHOFEL_DIR = SCRIPT_DIR / "Shofel" WORK_DIR = SCRIPT_DIR / "jibo_work" -# eMMC dump parameters EMMC_TOTAL_SECTORS = 0x1D60000 # Total sectors to dump (~15GB) EMMC_SECTOR_SIZE = 512 -# Colors for terminal output class Colors: RED = '\033[91m' GREEN = '\033[92m' @@ -47,7 +42,6 @@ class Colors: RESET = '\033[0m' BOLD = '\033[1m' -# Disable colors on Windows unless using Windows Terminal if platform.system() == "Windows" and "WT_SESSION" not in os.environ: for attr in dir(Colors): if not attr.startswith('_'): @@ -64,9 +58,6 @@ class PartitionInfo: name: str -# ============================================================================ -# Utilities -# ============================================================================ def print_banner(): """Print the tool banner""" @@ -151,22 +142,17 @@ def _check_payloads_exist() -> bool: return all((SHOFEL_DIR / p).exists() for p in critical_payloads) -# ============================================================================ -# Dependency Checking -# ============================================================================ def check_linux_dependencies() -> Tuple[bool, List[str], List[str]]: """Check for required Linux dependencies""" missing = [] warnings = [] - # Required tools for host build required_tools = { "gcc": "build-essential or base-devel", "make": "build-essential or base-devel", } - # Optional tools (have fallbacks) optional_tools = { "lsusb": "usbutils (optional, used for device detection)", "fdisk": "util-linux (optional, has Python fallback)", @@ -180,12 +166,10 @@ def check_linux_dependencies() -> Tuple[bool, List[str], List[str]]: if not shutil.which(tool): warnings.append(f"{tool} ({package})") - # Check ARM toolchain only if payloads are missing if not _check_payloads_exist(): if not shutil.which("arm-none-eabi-gcc"): missing.append("arm-none-eabi-gcc (arm-none-eabi-gcc or arm-none-eabi-toolchain)") - # Check for libusb try: result = subprocess.run( ["pkg-config", "--exists", "libusb-1.0"], @@ -194,7 +178,6 @@ def check_linux_dependencies() -> Tuple[bool, List[str], List[str]]: if result.returncode != 0: missing.append("libusb-1.0-dev or libusb1-devel") except FileNotFoundError: - # pkg-config not found, try alternative check if not Path("/usr/include/libusb-1.0").exists() and \ not Path("/usr/local/include/libusb-1.0").exists(): missing.append("libusb-1.0-dev or libusb1-devel") @@ -207,20 +190,16 @@ def check_windows_dependencies() -> Tuple[bool, List[str], List[str]]: missing = [] warnings = [] - # Check for MinGW or MSYS2 if not shutil.which("gcc") and not shutil.which("x86_64-w64-mingw32-gcc"): missing.append("MinGW-w64 or MSYS2") - # Check for ARM toolchain only if payloads missing if not _check_payloads_exist(): if not shutil.which("arm-none-eabi-gcc"): missing.append("ARM GNU Toolchain (arm-none-eabi-gcc)") - # Check for make if not shutil.which("make") and not shutil.which("mingw32-make"): missing.append("GNU Make") - # Optional: debugfs for editing ext filesystem images without mounting if not shutil.which("debugfs") and not shutil.which("debugfs.exe"): warnings.append("debugfs (e2fsprogs) - optional but recommended for reliable mode.json edits on Windows") @@ -243,7 +222,6 @@ def print_install_instructions(system: str, missing: List[str], warnings: List[s print(f"\n{Colors.BOLD}Installation instructions:{Colors.RESET}") if system == "Linux": - # Detect distro distro = "unknown" if Path("/etc/arch-release").exists(): distro = "arch" @@ -289,9 +267,6 @@ def print_install_instructions(system: str, missing: List[str], warnings: List[s """) -# ============================================================================ -# Shofel Building -# ============================================================================ def check_shofel_built() -> bool: """Check if shofel2_t124 is already built""" @@ -333,18 +308,14 @@ def build_shofel(force_rebuild: bool = False) -> bool: print_info("Compiling shofel2_t124...") try: - # Only clean host build (preserves payload .bin files) if force_rebuild: run_command(["make", "clean"], cwd=SHOFEL_DIR, capture_output=True, check=False) - # Build (Makefile will skip existing payload .bin files) result = run_command(["make"], cwd=SHOFEL_DIR, capture_output=True, check=False) - # Check if the main executable was built if check_shofel_built(): print_success("Host tool (shofel2_t124) built successfully!") - # Check payloads again payloads_ok, missing_payloads = check_payloads_built() if not payloads_ok: print_error("ARM payload binaries are missing!") @@ -353,7 +324,6 @@ def build_shofel(force_rebuild: bool = False) -> bool: print(f"{Colors.YELLOW}The ARM toolchain (arm-none-eabi-gcc) is required to build payloads.{Colors.RESET}") print() - # Detect distro and provide instructions if Path("/etc/arch-release").exists(): print(f" {Colors.CYAN}Arch/CachyOS:{Colors.RESET} sudo pacman -S arm-none-eabi-gcc arm-none-eabi-newlib") elif Path("/etc/debian_version").exists(): @@ -381,20 +351,15 @@ def build_shofel(force_rebuild: bool = False) -> bool: return False -# ============================================================================ -# Jibo Detection -# ============================================================================ def detect_jibo_rcm() -> bool: """Detect if Jibo is connected in RCM mode""" print_info("Looking for Jibo in RCM mode (NVIDIA APX device)...") if platform.system() == "Linux": - # Try lsusb first if shutil.which("lsusb"): try: result = run_command(["lsusb"], capture_output=True) - # Jibo uses 0955:7740 (NVIDIA APX) if "0955:7740" in result.stdout: print_success("Found Jibo in RCM mode!") return True @@ -408,7 +373,6 @@ def detect_jibo_rcm() -> bool: except Exception as e: print_error(f"lsusb failed: {e}") - # Fallback: check /sys/bus/usb/devices try: usb_devices = Path("/sys/bus/usb/devices") if usb_devices.exists(): @@ -424,16 +388,13 @@ def detect_jibo_rcm() -> bool: except Exception: pass - # Final fallback: assume user will connect it print_warning("Cannot detect USB devices. Please ensure Jibo is in RCM mode.") print_info("The tool will attempt to connect anyway.") return True # Let shofel try elif platform.system() == "Windows": - # On Windows, we need to use different methods print_warning("Windows USB detection - please ensure Zadig drivers are installed") print_info("Run Zadig and install WinUSB driver for 'APX' device") - # Try to proceed anyway, shofel will detect it return True return False @@ -459,36 +420,25 @@ def wait_for_jibo_rcm(timeout: int = 60) -> bool: return False -# ============================================================================ -# GPT Partition Parsing -# ============================================================================ def parse_gpt_partitions(dump_path: Path) -> List[PartitionInfo]: """Parse GPT partition table from dump file""" partitions = [] with open(dump_path, "rb") as f: - # Read MBR (sector 0) - skip it f.seek(512) - # Read GPT header (sector 1) gpt_header = f.read(512) - # Check GPT signature signature = gpt_header[:8] if signature != b'EFI PART': print_warning("GPT signature not found, trying fdisk parsing...") return parse_partitions_fdisk(dump_path) - # Parse GPT header - # Offset 72: Partition entries start LBA (8 bytes) - # Offset 80: Number of partition entries (4 bytes) - # Offset 84: Size of partition entry (4 bytes) partition_entries_lba = struct.unpack(" List[PartitionInfo]: if len(entry) < 128: break - # Parse partition entry - # Offset 0: Partition type GUID (16 bytes) - # Offset 32: First LBA (8 bytes) - # Offset 40: Last LBA (8 bytes) - # Offset 56: Partition name (72 bytes, UTF-16LE) type_guid = entry[:16] if type_guid == b'\x00' * 16: @@ -509,7 +454,6 @@ def parse_gpt_partitions(dump_path: Path) -> List[PartitionInfo]: first_lba = struct.unpack(" List[PartitionInfo]: check=False ) - # Parse fdisk output for line in result.stdout.split('\n'): - # Look for lines like: dump.bin1 34 2048033 2048000 1000M Microsoft basic data if dump_path.name in line and not line.startswith("Disk"): parts = line.split() if len(parts) >= 4: try: - # Extract partition number from name (e.g., dump.bin5 -> 5) part_name = parts[0] part_num = int(''.join(c for c in part_name if c.isdigit()) or '0') @@ -570,15 +511,12 @@ def parse_partitions_fdisk(dump_path: Path) -> List[PartitionInfo]: def find_var_partition(partitions: List[PartitionInfo]) -> Optional[PartitionInfo]: """Find the /var partition (partition 5, ~500MB)""" - # The var partition is typically partition 5 with ~500MB size for part in partitions: if part.number == 5: - # Verify it's roughly the right size (450-550 MB) size_mb = (part.size_sectors * EMMC_SECTOR_SIZE) / (1024 * 1024) if 400 < size_mb < 600: return part - # Fallback: look for any ~500MB partition for part in partitions: size_mb = (part.size_sectors * EMMC_SECTOR_SIZE) / (1024 * 1024) if 450 < size_mb < 550: @@ -588,9 +526,6 @@ def find_var_partition(partitions: List[PartitionInfo]) -> Optional[PartitionInf return None -# ============================================================================ -# Partition Extraction and Modification -# ============================================================================ def extract_partition(dump_path: Path, partition: PartitionInfo, output_path: Path) -> bool: """Extract a partition from the dump""" @@ -626,8 +561,6 @@ def modify_mode_json_direct(partition_path: Path) -> bool: with open(partition_path, "r+b") as f: data = bytearray(f.read()) - # Best-effort raw replacement. - # IMPORTANT: never change image length and never shift bytes; only overwrite in-place. json_patterns = [ (b'{"mode":"normal"}', b'{"mode":"int-developer"}'), (b'{"mode": "normal"}', b'{"mode": "int-developer"}'), @@ -654,7 +587,6 @@ def modify_mode_json_direct(partition_path: Path) -> bool: return False region_len = len(new_json) - # Overwrite the JSON plus the padding region; do NOT shift bytes. data[offset:offset + region_len] = new_json f.seek(0) @@ -680,18 +612,15 @@ def modify_partition_mounted(partition_path: Path) -> bool: mount_point.mkdir(parents=True, exist_ok=True) try: - # Mount the partition print_info(f"Mounting partition at {mount_point}...") run_command( ["mount", "-o", "loop", str(partition_path), str(mount_point)], sudo=True ) - # Find and modify mode.json mode_json_path = mount_point / "jibo" / "mode.json" if not mode_json_path.exists(): - # Try alternative paths for alt_path in [ mount_point / "mode.json", mount_point / "etc" / "jibo" / "mode.json", @@ -703,7 +632,6 @@ def modify_partition_mounted(partition_path: Path) -> bool: if mode_json_path.exists(): print_info(f"Found mode.json at {mode_json_path}") - # Capture original permissions/ownership so we can restore after copy-write perm = None uid = None gid = None @@ -720,7 +648,6 @@ def modify_partition_mounted(partition_path: Path) -> bool: except Exception: pass - # Save a raw backup copy of mode.json for debugging/recovery try: backup_text = run_command( ["cat", str(mode_json_path)], @@ -732,7 +659,6 @@ def modify_partition_mounted(partition_path: Path) -> bool: except Exception: pass - # Read current content (prefer sudo cat so permissions don't bite us) try: mode_text = run_command( ["cat", str(mode_json_path)], @@ -742,23 +668,19 @@ def modify_partition_mounted(partition_path: Path) -> bool: ).stdout content = json.loads(mode_text) except Exception: - # Fallback: direct open (works if script is run with sudo) with open(mode_json_path, "r") as f: content = json.load(f) print_info(f"Current mode: {content.get('mode', 'unknown')}") - # Modify content["mode"] = "int-developer" - # Write back (need sudo) temp_json = WORK_DIR / "mode_temp.json" with open(temp_json, "w") as f: json.dump(content, f) run_command(["cp", str(temp_json), str(mode_json_path)], sudo=True) - # Restore permissions/ownership if we captured them if perm is not None: run_command(["chmod", perm, str(mode_json_path)], sudo=True, check=False) if uid is not None and gid is not None: @@ -784,7 +706,6 @@ def modify_partition_mounted(partition_path: Path) -> bool: return False finally: - # Always unmount try: run_command(["umount", str(mount_point)], sudo=True, check=False) except: @@ -811,14 +732,12 @@ def modify_partition_debugfs(partition_path: Path) -> bool: print_info("Attempting mode.json edit via debugfs (no mount)...") - # Potential locations inside /var candidate_paths = [ "/jibo/mode.json", "/mode.json", "/etc/jibo/mode.json", ] - # Find which path exists by trying to cat it existing_path: Optional[str] = None original_text: Optional[str] = None for p in candidate_paths: @@ -828,7 +747,6 @@ def modify_partition_debugfs(partition_path: Path) -> bool: capture_output=True, check=True, ) - # debugfs prints to stdout for cat if res.stdout and "File not found" not in res.stdout: existing_path = p original_text = res.stdout @@ -840,7 +758,6 @@ def modify_partition_debugfs(partition_path: Path) -> bool: print_warning("debugfs could not locate mode.json inside the image") return False - # Save backup try: (WORK_DIR / "mode.json.original").write_text(original_text) except Exception: @@ -858,9 +775,6 @@ def modify_partition_debugfs(partition_path: Path) -> bool: temp_json = WORK_DIR / "mode_temp.json" temp_json.write_text(new_text) - # Overwrite: remove then write to ensure replacement works even if size differs. - # This may change filesystem allocation, which is fine for full /var write, and - # our patch-write logic can still handle it. try: run_command([debugfs, "-w", "-R", f"rm {existing_path}", str(partition_path)], check=False, capture_output=True) run_command([debugfs, "-w", "-R", f"write {str(temp_json)} {existing_path}", str(partition_path)], capture_output=True) @@ -881,17 +795,14 @@ def modify_var_partition(partition_path: Path) -> bool: """Modify the var partition to enable developer mode""" print_step(4, 6, "Modifying var partition") - # On Linux, prefer mounting: it's the only truly safe way to update a file in an ext filesystem. if platform.system() == "Linux": if modify_partition_mounted(partition_path): return True print_warning("Mount-based edit failed; falling back to raw in-place patch") - # If mounting is unavailable (Windows/macOS) or failed, try debugfs (ext filesystem edit without mount) if modify_partition_debugfs(partition_path): return True - # Raw patch is a best-effort last resort if modify_mode_json_direct(partition_path): return True @@ -969,7 +880,6 @@ def compute_changed_sector_ranges(original_path: Path, modified_path: Path, sect base_sector += len(b1) // sector_size continue - # Chunk differs; identify sector-level diffs within this chunk sectors_in_chunk = min(len(b1), len(b2)) // sector_size for i in range(sectors_in_chunk): s1 = b1[i * sector_size:(i + 1) * sector_size] @@ -1025,9 +935,6 @@ def write_partition_patch_to_emmc(original_path: Path, modified_path: Path, base return True -# ============================================================================ -# eMMC Operations -# ============================================================================ def get_shofel_path() -> Path: """Get the path to shofel2_t124 executable""" @@ -1058,7 +965,6 @@ def dump_emmc(output_path: Path, start_sector: int = 0, num_sectors: int = EMMC_ str(output_path) ] - # Run with sudo on Linux if platform.system() == "Linux": cmd = ["sudo"] + cmd @@ -1136,7 +1042,6 @@ def verify_write(partition_path: Path, start_sector: int, num_sectors: int) -> b subprocess.run(cmd, cwd=SHOFEL_DIR, check=True) - # Compare hashes with open(partition_path, "rb") as f: original_hash = hashlib.md5(f.read()).hexdigest() @@ -1157,22 +1062,17 @@ def verify_write(partition_path: Path, start_sector: int, num_sectors: int) -> b return False -# ============================================================================ -# Main Workflow -# ============================================================================ def run_full_mod(args) -> bool: """Run the complete modding workflow""" print_banner() - # Check system sys_info = get_system_info() print_info(f"System: {sys_info['os']} ({sys_info['arch']})") if sys_info['is_wsl']: print_info("Running in WSL - USB passthrough may require additional setup") - # Check dependencies print_step(0, 6, "Checking dependencies") if sys_info['os'] == "Linux": @@ -1190,25 +1090,20 @@ def run_full_mod(args) -> bool: print_success("All required dependencies found!") - # Create work directory WORK_DIR.mkdir(parents=True, exist_ok=True) - # Build Shofel if not build_shofel(force_rebuild=args.rebuild_shofel): return False - # Detect or wait for Jibo if not args.skip_detection: if not detect_jibo_rcm(): if not wait_for_jibo_rcm(timeout=120): return False - # Paths dump_path = WORK_DIR / "jibo_full_dump.bin" var_partition_path = WORK_DIR / "var_partition.bin" backup_var_path = WORK_DIR / "var_partition_backup.bin" - # Dump eMMC (or use existing dump) if args.dump_path: dump_path = Path(args.dump_path) if not dump_path.exists(): @@ -1222,7 +1117,6 @@ def run_full_mod(args) -> bool: if not dump_emmc(dump_path): return False - # Parse partitions print_step(3, 6, "Analyzing partition table") partitions = parse_gpt_partitions(dump_path) @@ -1235,7 +1129,6 @@ def run_full_mod(args) -> bool: size_mb = (part.size_sectors * EMMC_SECTOR_SIZE) / (1024 * 1024) print(f" {part.number}: sectors {part.start_sector}-{part.end_sector} ({size_mb:.1f} MB) - {part.name}") - # Find var partition var_partition = find_var_partition(partitions) if not var_partition: print_error("Could not identify /var partition") @@ -1243,35 +1136,28 @@ def run_full_mod(args) -> bool: print_success(f"Identified /var partition: partition {var_partition.number}") - # Extract var partition if not extract_partition(dump_path, var_partition, var_partition_path): return False - # Create backup shutil.copy(var_partition_path, backup_var_path) print_info(f"Backup created: {backup_var_path}") - # Modify partition if not modify_var_partition(var_partition_path): return False - # Check if Jibo still connected (may need to re-enter RCM) if not args.skip_detection: print_info("Please ensure Jibo is still in RCM mode") print_info("If Jibo rebooted, re-enter RCM mode now") if not wait_for_jibo_rcm(timeout=60): print_warning("Continuing anyway...") - # Write modified partition if not write_partition_to_emmc(var_partition_path, var_partition.start_sector): return False - # Verify if args.verify: if not verify_write(var_partition_path, var_partition.start_sector, var_partition.size_sectors): print_warning("Verification failed, but write may still be successful") - # Done! print(f""" {Colors.GREEN}╔═══════════════════════════════════════════════════════════════════╗ ║ {Colors.BOLD}MODDING COMPLETE!{Colors.RESET}{Colors.GREEN} ║ @@ -1302,11 +1188,9 @@ def run_dump_only(args) -> bool: WORK_DIR.mkdir(parents=True, exist_ok=True) - # Build Shofel if not build_shofel(force_rebuild=args.rebuild_shofel): return False - # Wait for Jibo if not args.skip_detection: if not wait_for_jibo_rcm(timeout=120): return False @@ -1325,11 +1209,9 @@ def run_write_only(args) -> bool: print_error(f"Partition file not found: {partition_path}") return False - # Build Shofel if needed if not build_shofel(): return False - # Wait for Jibo if not args.skip_detection: if not wait_for_jibo_rcm(timeout=120): return False @@ -1344,16 +1226,13 @@ def run_mode_json_only(args) -> bool: WORK_DIR.mkdir(parents=True, exist_ok=True) - # Build Shofel if not build_shofel(force_rebuild=args.rebuild_shofel): return False - # Wait for Jibo if not args.skip_detection: if not wait_for_jibo_rcm(timeout=120): return False - # Dump GPT / partition table (small read) gpt_path = WORK_DIR / "gpt_dump.bin" gpt_sectors = 4096 # 2MB; safely covers typical GPT entry area print_info(f"Dumping GPT header/table ({gpt_sectors} sectors)...") @@ -1375,7 +1254,6 @@ def run_mode_json_only(args) -> bool: f"(start=0x{var_partition.start_sector:x}, sectors={var_partition.size_sectors})" ) - # Dump /var partition only original_var_path = WORK_DIR / "var_partition_original.bin" var_partition_path = WORK_DIR / "var_partition.bin" backup_var_path = WORK_DIR / "var_partition_backup.bin" @@ -1388,17 +1266,14 @@ def run_mode_json_only(args) -> bool: shutil.copy(original_var_path, backup_var_path) print_info(f"Backup created: {backup_var_path}") - # Modify mode.json inside /var if not modify_var_partition(var_partition_path): return False - # Re-check connectivity (optional) if not args.skip_detection: print_info("Please ensure Jibo is still in RCM mode") if not wait_for_jibo_rcm(timeout=60): print_warning("Continuing anyway...") - # Write back: patch by default, full write if requested if args.full_var_write: print_info("Writing full /var partition back to device...") if not write_partition_to_emmc(var_partition_path, var_partition.start_sector): @@ -1408,7 +1283,6 @@ def run_mode_json_only(args) -> bool: if not write_partition_patch_to_emmc(original_var_path, var_partition_path, var_partition.start_sector): return False - # Verify (reads back full /var; optional) if args.verify: if not verify_write(var_partition_path, var_partition.start_sector, var_partition.size_sectors): print_warning("Verification failed, but write may still be successful") @@ -1419,9 +1293,6 @@ def run_mode_json_only(args) -> bool: return True -# ============================================================================ -# CLI -# ============================================================================ def main(): parser = argparse.ArgumentParser( @@ -1436,7 +1307,6 @@ Examples: """ ) - # Operation modes mode_group = parser.add_mutually_exclusive_group() mode_group.add_argument("--dump-only", action="store_true", help="Only dump the eMMC without modifying") @@ -1445,7 +1315,6 @@ Examples: mode_group.add_argument("--mode-json-only", action="store_true", help="Fast mode: dump GPT + /var only, patch /var/jibo/mode.json, write back minimal changes") - # Options parser.add_argument("--dump-path", metavar="FILE", help="Use existing dump file instead of dumping") parser.add_argument("--output", "-o", metavar="FILE", @@ -1467,11 +1336,9 @@ Examples: args = parser.parse_args() - # Validate arguments if args.write_partition and not args.start_sector: parser.error("--write-partition requires --start-sector") - # Run appropriate mode try: if args.dump_only: success = run_dump_only(args) diff --git a/jibo_updater.py b/jibo_updater.py index 1b67520..6d4144e 100644 --- a/jibo_updater.py +++ b/jibo_updater.py @@ -31,9 +31,14 @@ import urllib.error import urllib.parse import urllib.request import zipfile +import logging from dataclasses import dataclass from pathlib import Path from typing import Iterable, Optional +import socket +import threading +import http.server +import socketserver import paramiko @@ -43,6 +48,10 @@ WORK_DIR = SCRIPT_DIR / "jibo_work" UPDATES_DIR = WORK_DIR / "updates" STATE_FILE_DEFAULT = WORK_DIR / "update_state.json" +__version__ = "0.2.0" + +DEFAULT_UPDATER_RELEASES_API = "https://kevinblog.sytes.net/Code/api/v1/repos/Kevin/JiboUpdater/releases" + DEFAULT_RELEASES_API = "https://kevinblog.sytes.net/Code/api/v1/repos/Kevin/JiboOs/releases" @@ -115,6 +124,65 @@ def http_get_json(url: str, timeout: int = 20) -> object: return json.loads(data.decode("utf-8", errors="replace")) +def check_updater_version(releases_api: str, current_version: str) -> tuple[Optional[str], bool]: + """Return (latest_tag, is_newer) comparing semantic-ish tags. + + If the check fails, returns (None, False). + """ + try: + raw = http_get_json(releases_api) + except Exception: + return None, False + + if not isinstance(raw, list) or not raw: + return None, False + + tags: list[str] = [] + for item in raw: + if not isinstance(item, dict): + continue + tags.append(str(item.get("tag_name", ""))) + + tags = [t for t in tags if t] + if not tags: + return None, False + + tags.sort(key=_version_tuple, reverse=True) + latest = tags[0] + try: + is_newer = _version_tuple(latest) > _version_tuple(current_version) + except Exception: + is_newer = False + return latest, is_newer + + +class _Spinner: + def __init__(self, message: str = ""): + self._stop = threading.Event() + self._thread: Optional[threading.Thread] = None + self.message = message + + def start(self): + def _spin(): + chars = "|/-\\" + i = 0 + while not self._stop.is_set(): + sys.stdout.write(f"\r{self.message} {chars[i % len(chars)]}") + sys.stdout.flush() + i += 1 + time.sleep(0.12) + sys.stdout.write("\r" + " " * (len(self.message) + 4) + "\r") + sys.stdout.flush() + + self._thread = threading.Thread(target=_spin, daemon=True) + self._thread.start() + + def stop(self): + self._stop.set() + if self._thread: + self._thread.join(timeout=1) + + _VERSION_RE = re.compile(r"^v?(\d+)(?:\.(\d+))?(?:\.(\d+))?") @@ -153,7 +221,6 @@ def get_latest_release(releases_api: str, allow_prerelease: bool) -> Release: if not releases: raise RuntimeError("No releases found (after prerelease filtering)") - # Gitea usually returns newest first, but sort by semver-ish tag to be safe. releases.sort(key=lambda r: _version_tuple(r.tag_name), reverse=True) return releases[0] @@ -171,11 +238,9 @@ def normalize_download_url(download_url: str, base_url: str) -> str: base = urllib.parse.urlparse(base_url) dl = urllib.parse.urlparse(download_url) - # If already matches, keep as-is. if dl.scheme == base.scheme and dl.netloc == base.netloc: return download_url - # If download URL is missing components or has a different host, rewrite it. return urllib.parse.urlunparse( (base.scheme, base.netloc, dl.path, dl.params, dl.query, dl.fragment) ) @@ -261,7 +326,6 @@ def _extract(archive: Path, extract_dir: Path, *, force: bool = False) -> Path: member_path = extract_dir / member.name if not _is_within(extract_dir, member_path): raise RuntimeError(f"Unsafe path in tar archive: {member.name}") - # Python 3.14 changes tar default filtering behavior; be explicit. try: tf.extractall(extract_dir, filter="data") except TypeError: @@ -291,7 +355,6 @@ def _score_build_dir(path: Path) -> int: for name, weight in (("etc", 5), ("opt", 5), ("var", 2), ("usr", 2), ("lib", 1), ("bin", 1)): if (path / name).exists(): score += weight - # Prefer build dirs that are under a version folder like V3.1/build parts = {p.lower() for p in path.parts} if any(re.fullmatch(r"v\d+(?:\.\d+)*", p, flags=re.IGNORECASE) for p in parts): score += 2 @@ -362,7 +425,6 @@ def ssh_exec(client: paramiko.SSHClient, command: str, timeout: int = 60) -> tup def ensure_remote_dir(sftp: paramiko.SFTPClient, remote_dir: str) -> None: - # Create each path component if missing. parts = [p for p in remote_dir.split("/") if p] cur = "/" for part in parts: @@ -394,6 +456,10 @@ def upload_tree( if dry_run: continue ensure_remote_dir(sftp, remote_path) + try: + sftp.chmod(remote_path, 0o777) + except Exception: + pass continue if p.is_symlink(): @@ -401,19 +467,20 @@ def upload_tree( if dry_run: sent += 1 continue - # Ensure parent exists ensure_remote_dir(sftp, posixpath.dirname(remote_path)) try: - # Remove if exists try: sftp.remove(remote_path) except IOError: pass sftp.symlink(target, remote_path) except Exception: - # Fallback: dereference and upload file content real_path = p.resolve() sftp.put(str(real_path), remote_path) + try: + sftp.chmod(remote_path, 0o777) + except Exception: + pass sent += 1 if sent % 200 == 0: print_info(f"Uploaded {sent}/{total} entries...") @@ -427,8 +494,7 @@ def upload_tree( ensure_remote_dir(sftp, posixpath.dirname(remote_path)) sftp.put(str(p), remote_path) try: - mode = p.stat().st_mode & 0o777 - sftp.chmod(remote_path, mode) + sftp.chmod(remote_path, 0o777) except Exception: pass @@ -455,16 +521,179 @@ def set_mode_json_to_normal(sftp: paramiko.SFTPClient) -> None: data["mode"] = "normal" new_content = json.dumps(data, separators=(",", ": ")) + "\n" except Exception: - # Fallback for non-standard formatting new_content = re.sub(r'("mode"\s*:\s*")([^"]+)(")', r'\1normal\3', content) if new_content == content: - # As a last resort, overwrite with a minimal JSON. new_content = '{"mode": "normal"}\n' with sftp.open(remote, "w") as f: f.write(new_content.encode("utf-8")) +def load_distributors_file(path: Path) -> dict: + try: + raw = json.loads(path.read_text("utf-8")) + return raw if isinstance(raw, dict) else {} + except Exception: + return {} + + +def measure_host_latency(url: str, timeout: int = 5) -> float: + start = time.time() + try: + req = urllib.request.Request(url, headers={"User-Agent": "JiboUpdater/1.0"}) + with urllib.request.urlopen(req, timeout=timeout) as resp: + resp.read(512) + return time.time() - start + except Exception: + return float("inf") + + +def get_releases_from_host(api_url: str) -> list[Release]: + try: + raw = http_get_json(api_url) + except Exception: + return [] + releases: list[Release] = [] + if isinstance(raw, list): + for item in raw: + if not isinstance(item, dict): + continue + releases.append( + Release( + tag_name=str(item.get("tag_name", "")), + name=str(item.get("name", "")), + prerelease=bool(item.get("prerelease", False)), + tarball_url=str(item.get("tarball_url", "")), + zipball_url=str(item.get("zipball_url", "")), + ) + ) + return releases + + +def list_local_archives() -> list[Release]: + dl = UPDATES_DIR / "downloads" + found: list[Release] = [] + if not dl.exists(): + return found + for p in dl.iterdir(): + if not p.is_file(): + continue + name = p.name + if name.endswith((".tar.gz", ".tgz", ".zip")): + tag = name.rsplit(".", 2)[0] + found.append(Release(tag_name=tag, name=tag, prerelease=False, tarball_url=str(p), zipball_url="")) + return found + + +def robots_config_path() -> Path: + return WORK_DIR / "robots.json" + + +def load_robots() -> dict: + p = robots_config_path() + if not p.exists(): + return {} + try: + return json.loads(p.read_text("utf-8")) + except Exception: + return {} + + +def save_robots(data: dict) -> None: + p = robots_config_path() + p.parent.mkdir(parents=True, exist_ok=True) + p.write_text(json.dumps(data, indent=2) + "\n", encoding="utf-8") + + +def fetch_robot_identity(host: str, user: str, password: str, timeout: int = 10) -> Optional[str]: + try: + client = ssh_connect(host, user, password, timeout=timeout) + try: + sftp = client.open_sftp() + try: + with sftp.open("/var/jibo/identity.json", "r") as f: + content = f.read().decode("utf-8", errors="replace") + data = json.loads(content) + name = None + if isinstance(data, dict): + name = data.get("name") or data.get("robot_name") + if isinstance(name, str): + return name + finally: + sftp.close() + finally: + client.close() + except Exception: + return None + return None + + +def prompt_select_release_and_host(distributors_file: Path) -> tuple[Optional[Release], Optional[str], str]: + d = load_distributors_file(distributors_file) + hosts = d.get("UpdateHosts") or d.get("OfficialHosts") or [] + hosts = [h for h in hosts if isinstance(h, str)] + + print_info("Checking hosts for latency and available releases...") + host_infos = [] + for h in hosts: + lat = measure_host_latency(h) + releases = get_releases_from_host(h) + host_infos.append((h, lat, releases)) + + local_releases = list_local_archives() + if local_releases: + host_infos.append(("local", 0.0, local_releases)) + + print("Hosts (lower latency preferred):") + host_infos.sort(key=lambda t: (t[1] if isinstance(t[1], float) else float("inf"))) + for idx, (h, lat, rels) in enumerate(host_infos, start=1): + label = f"{h} ({'local' if h=='local' else f'{lat:.2f}s'}) - {len(rels)} releases" + print(f"{idx}) {label}") + + chosen_host_idx = None + while chosen_host_idx is None: + ans = input("Choose host number to browse releases (or q to cancel): ").strip() + if ans.lower() in {"q", "quit", "exit"}: + return None, None, "" + if not ans.isdigit(): + print("Enter a number.") + continue + i = int(ans) + if i < 1 or i > len(host_infos): + print("Out of range") + continue + chosen_host_idx = i - 1 + + host, lat, releases = host_infos[chosen_host_idx] + if not releases: + print_warning("No releases found for that host.") + return None, host, "remote" + + releases.sort(key=lambda r: _version_tuple(r.tag_name), reverse=True) + for idx, r in enumerate(releases, start=1): + pre = " [prerelease]" if r.prerelease else "" + print(f"{idx}) {r.tag_name}{pre} - {r.name}") + ans = input("Choose release number (or 'l' to list release notes, number to pick, q to cancel): ").strip() + if ans.lower() == "q": + return None, host, "" + if ans.lower() == "l": + sub = input("Release number to show notes: ").strip() + if sub.isdigit(): + si = int(sub) - 1 + if 0 <= si < len(releases): + print(releases[si].name) + print(releases[si].tag_name) + return None, host, "" + if not ans.isdigit(): + return None, host, "" + ri = int(ans) - 1 + if ri < 0 or ri >= len(releases): + return None, host, "" + chosen = releases[ri] + source = "local" if host == "local" else "remote" + return chosen, host, source + + def main() -> int: _no_color_if_not_tty() @@ -473,6 +702,9 @@ def main() -> int: parser.add_argument("--user", default="root", help="SSH username (default: root)") parser.add_argument("--password", default="jibo", help="SSH password (default: jibo)") parser.add_argument("--releases-api", default=DEFAULT_RELEASES_API, help="Gitea releases API URL") + parser.add_argument("--distributors", type=Path, default=Path("Distributors.json"), help="Path to Distributors.json to check multiple hosts") + parser.add_argument("--tui", action="store_true", help="Run an interactive text UI to pick host/release") + parser.add_argument("--updater-releases-api", default=DEFAULT_UPDATER_RELEASES_API, help="Releases API to check for updater updates") parser.add_argument("--stable", action="store_true", help="Ignore prereleases") parser.add_argument("--tag", help="Install a specific tag (e.g. v3.3.0) instead of latest") @@ -501,11 +733,48 @@ def main() -> int: _ensure_dirs() + logp = WORK_DIR / "updater.log" + logp.parent.mkdir(parents=True, exist_ok=True) + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s: %(message)s", + handlers=[logging.FileHandler(logp, encoding="utf-8"), logging.StreamHandler(sys.stdout)], + ) + logging.info("jibo_updater starting, version %s", __version__) + + spinner = _Spinner("Checking updater version...") + spinner.start() + try: + latest_tag, is_newer = check_updater_version(args.updater_releases_api, __version__) + finally: + spinner.stop() + + if latest_tag: + if is_newer: + msg = f"Updater update available: {latest_tag} (current {__version__})" + print_warning(msg) + logging.info(msg) + else: + msg = f"Updater is up-to-date ({__version__})" + print_info(msg) + logging.info(msg) + else: + logging.info("Updater version check failed or no releases found") + allow_prerelease = not args.stable print_info("Checking latest release...") - if args.tag: - # Fetch all releases and pick the one matching tag + chosen_remote_source: Optional[str] = None + chosen_source_type = "remote" + if args.tui: + rel_choice, host_choice, source = prompt_select_release_and_host(args.distributors) + if rel_choice is None: + print_info("No release selected; aborting.") + return 2 + release = rel_choice + chosen_remote_source = host_choice + chosen_source_type = source + elif args.tag: raw = http_get_json(args.releases_api) if not isinstance(raw, list): raise RuntimeError("Unexpected releases API response") @@ -550,27 +819,33 @@ def main() -> int: print_info("Aborted.") return 2 - # Download + extract archive_name = f"{release.tag_name}.tar.gz" archive_path = UPDATES_DIR / "downloads" / archive_name extract_dir = UPDATES_DIR / "extracted" / release.tag_name - tarball_url = normalize_download_url(release.tarball_url, args.releases_api) + if chosen_remote_source and chosen_source_type == "remote": + tarball_url = normalize_download_url(release.tarball_url, chosen_remote_source) + elif chosen_source_type == "local": + tarball_url = release.tarball_url + else: + tarball_url = normalize_download_url(release.tarball_url, args.releases_api) try: - _download(tarball_url, archive_path, force=args.force) + if isinstance(tarball_url, str) and Path(tarball_url).exists(): + archive_path = Path(tarball_url) + print_info(f"Using local archive: {archive_path}") + else: + _download(tarball_url, archive_path, force=args.force) except urllib.error.URLError as e: raise RuntimeError(f"Download failed: {e}") _extract(archive_path, extract_dir, force=args.force) - # Gitea archives usually create a single top-level folder. Prefer that as the search root. children = [p for p in extract_dir.iterdir() if p.is_dir()] search_root = children[0] if len(children) == 1 else extract_dir build_dir = find_build_dir(search_root, args.build_path) - # Connect and update print_info(f"Connecting to {args.user}@{args.host} ...") client = ssh_connect(args.host, args.user, args.password, timeout=args.ssh_timeout) try: @@ -619,7 +894,6 @@ def main() -> int: sftp.close() if not args.dry_run: - # Update local state if isinstance(state, dict): state[args.host] = release.tag_name save_state(args.state_file, state) diff --git a/jibo_updater.sh b/jibo_updater.sh index 11f49a0..8893bb4 100755 --- a/jibo_updater.sh +++ b/jibo_updater.sh @@ -16,3 +16,7 @@ if command -v python3 >/dev/null 2>&1; then fi exec python "$SCRIPT_DIR/jibo_updater.py" "$@" + +# NOTE: The updater uses only the standard library and `paramiko`. +# If you don't have `paramiko` installed in your environment, install it: +# python3 -m pip install paramiko diff --git a/jibo_updater_tui.py b/jibo_updater_tui.py new file mode 100644 index 0000000..a1fc91a --- /dev/null +++ b/jibo_updater_tui.py @@ -0,0 +1,190 @@ +#!/usr/bin/env python3 +"""Simple curses TUI for selecting a distribution host and release. + +This TUI reuses helper functions from `jibo_updater.py` to probe hosts and +list local archives. It prints a JSON object to stdout with the chosen +selection so other tools or a GUI wrapper can invoke it. + +Usage: + python3 jibo_updater_tui.py [--distributors Distributors.json] + +Output (on success): JSON to stdout, e.g. + {"host": "https://...","source":"remote","tag":"v3.3.0","tarball_url":"https://..."} + +Keyboard: +- Up/Down: navigate +- Enter: select +- b: back +- v: view release notes/name +- q: quit +""" + +from __future__ import annotations + +import curses +import json +import sys +from pathlib import Path +from typing import List + +try: + from jibo_updater import ( + load_distributors_file, + measure_host_latency, + get_releases_from_host, + list_local_archives, + _version_tuple, + check_updater_version, + __version__ as UPDATER_VERSION, + DEFAULT_UPDATER_RELEASES_API, + ) +except Exception as e: + print(f"Failed to import helpers from jibo_updater: {e}", file=sys.stderr) + raise + + +def gather_host_infos(distributors_path: Path): + d = load_distributors_file(distributors_path) + hosts = d.get("UpdateHosts") or d.get("OfficialHosts") or [] + hosts = [h for h in hosts if isinstance(h, str)] + + infos = [] + for h in hosts: + lat = measure_host_latency(h) + rels = get_releases_from_host(h) + infos.append({"host": h, "lat": lat, "rels": rels}) + + local = list_local_archives() + if local: + infos.append({"host": "local", "lat": 0.0, "rels": local}) + + infos.sort(key=lambda x: x["lat"] if isinstance(x["lat"], float) else float("inf")) + return infos + + +class TUI: + def __init__(self, stdscr, distributors: Path): + self.stdscr = stdscr + self.distributors = distributors + curses.curs_set(0) + try: + latest, is_newer = check_updater_version(DEFAULT_UPDATER_RELEASES_API, UPDATER_VERSION) + if latest and is_newer: + self.updater_status = f"Updater update available: {latest} (current {UPDATER_VERSION})" + elif latest: + self.updater_status = f"Updater up-to-date ({UPDATER_VERSION})" + else: + self.updater_status = "Updater version check failed" + except Exception: + self.updater_status = "Updater version check failed" + + self.hosts = gather_host_infos(distributors) + + def draw_list(self, items: List[str], title: str, idx: int, offset: int = 0): + self.stdscr.clear() + h, w = self.stdscr.getmaxyx() + header = title[: w - 1] + status = (" - " + self.updater_status) if hasattr(self, "updater_status") else "" + if len(header) + len(status) < w - 1: + header = header + status + self.stdscr.addstr(0, 0, header[: w - 1]) + for i, line in enumerate(items[offset : offset + h - 3]): + y = i + 2 + style = curses.A_REVERSE if (i + offset) == idx else curses.A_NORMAL + try: + self.stdscr.addstr(y, 0, line[: w - 1], style) + except curses.error: + pass + self.stdscr.addstr(h - 1, 0, "Enter=select v=view b=back q=quit") + self.stdscr.refresh() + + def run(self): + if not self.hosts: + self.stdscr.addstr(0, 0, "No hosts found in distributors file or no local archives.") + self.stdscr.addstr(2, 0, "Press any key to exit.") + self.stdscr.getch() + return 1 + + idx = 0 + offset = 0 + while True: + items = [f"{h['host']} ({'local' if h['host']=='local' else f'{h['lat']:.2f}s'}) - {len(h['rels'])} releases" for h in self.hosts] + self.draw_list(items, "Select host:", idx, offset) + c = self.stdscr.getch() + if c in (curses.KEY_DOWN, ord('j')): + if idx < len(items) - 1: + idx += 1 + elif c in (curses.KEY_UP, ord('k')): + if idx > 0: + idx -= 1 + elif c in (ord('\n'), ord('\r')): + choice = self.hosts[idx] + res = self.show_releases(choice) + if res: + print(json.dumps(res)) + return 0 + elif c in (ord('q'), 27): + return 1 + + def show_releases(self, host_info): + rels = host_info["rels"] + if not rels: + return None + rels.sort(key=lambda r: _version_tuple(r.tag_name), reverse=True) + idx = 0 + while True: + items = [f"{r.tag_name}{' [prerelease]' if r.prerelease else ''} - {r.name}" for r in rels] + self.draw_list(items, f"Host: {host_info['host']}", idx) + c = self.stdscr.getch() + if c in (curses.KEY_DOWN, ord('j')): + if idx < len(items) - 1: + idx += 1 + elif c in (curses.KEY_UP, ord('k')): + if idx > 0: + idx -= 1 + elif c in (ord('b'), 8): + return None + elif c == ord('v'): + self.show_text(rels[idx].name or "(no notes)") + elif c in (ord('\n'), ord('\r')): + chosen = rels[idx] + res = { + "host": host_info["host"], + "source": "local" if host_info["host"] == "local" else "remote", + "tag": chosen.tag_name, + "tarball_url": chosen.tarball_url, + } + return res + elif c in (ord('q'), 27): + return None + + def show_text(self, text: str): + self.stdscr.clear() + h, w = self.stdscr.getmaxyx() + lines = [] + for ln in text.splitlines(): + while ln: + lines.append(ln[: w - 1]) + ln = ln[w - 1 :] + for i, ln in enumerate(lines[: h - 2]): + try: + self.stdscr.addstr(i, 0, ln) + except curses.error: + pass + self.stdscr.addstr(h - 1, 0, "Press any key to return") + self.stdscr.refresh() + self.stdscr.getch() + + +def main(argv): + import argparse + + parser = argparse.ArgumentParser(description="Curses TUI for jibo_updater selection") + parser.add_argument("--distributors", type=Path, default=Path("Distributors.json"), help="Path to Distributors.json") + args = parser.parse_args(argv) + + curses.wrapper(lambda stdscr: TUI(stdscr, args.distributors).run()) + + +if __name__ == "__main__": + sys.exit(main(sys.argv[1:]) or 0)