#!/usr/bin/env python3 """Minimal AI Bridge server for Jibo. Endpoints: - POST /v1/chat/text {"text": "..."} -> {"reply": "..."} - POST /v1/chat/audio {"wav_base64": "...", "sample_rate": 16000} -> {"reply": "...", "text": ""} LLM: - Uses Ollama Chat API by default: http://localhost:11434/api/chat Env: OLLAMA_URL (default: http://127.0.0.1:11434/api/chat) OLLAMA_MODEL (default: phi3.5) STT (optional, for /audio): - If `faster-whisper` is installed, it will be used. Env: WHISPER_MODEL (default: base) Run: python3 server.py --host 0.0.0.0 --port 8020 """ from __future__ import annotations import argparse import array import base64 import io import json import os import tempfile import time import traceback import wave from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from urllib.error import URLError from urllib.request import Request, urlopen def _ts() -> str: # ISO-ish timestamp in local time; good enough for debugging return time.strftime("%Y-%m-%d %H:%M:%S") def _log(msg: str): # Server previously muted logs; we want visibility while debugging. print(f"[{_ts()}] {msg}", flush=True) def _json_response(handler: BaseHTTPRequestHandler, status: int, payload: dict): body = json.dumps(payload).encode("utf-8") handler.send_response(status) handler.send_header("Content-Type", "application/json") handler.send_header("Content-Length", str(len(body))) handler.end_headers() handler.wfile.write(body) def _read_json(handler: BaseHTTPRequestHandler) -> dict: length = int(handler.headers.get("Content-Length", "0")) raw = handler.rfile.read(length) if length else b"{}" return json.loads(raw.decode("utf-8")) def _ollama_chat(user_text: str) -> str: ollama_url = os.environ.get("OLLAMA_URL", "http://127.0.0.1:11434/api/chat") model = os.environ.get("OLLAMA_MODEL", "phi3.5") req_body = { "model": model, "stream": False, "messages": [ {"role": "system", "content": "You are Jibo, a friendly home robot. Keep replies short and spoken."}, {"role": "user", "content": user_text}, ], } req = Request( ollama_url, data=json.dumps(req_body).encode("utf-8"), headers={"Content-Type": "application/json"}, method="POST", ) try: with urlopen(req, timeout=60) as resp: data = json.loads(resp.read().decode("utf-8")) except Exception as e: # Let caller decide how to respond; include a useful hint in logs. _log(f"Ollama request failed url={ollama_url!r} err={e!r}") raise msg = data.get("message") or {} content = msg.get("content") if not content: raise RuntimeError(f"Unexpected Ollama response: {data}") return content.strip() def _short_err(e: BaseException) -> str: s = str(e) or e.__class__.__name__ s = " ".join(s.split()) if len(s) > 240: s = s[:240] + "..." return s def _ollama_down_reply() -> str: # Keep it short and speakable. return "My AI server isn't reachable right now. Please start Ollama on the computer, then try again." def _wav_diagnostics(wav_bytes: bytes) -> dict: """Best-effort WAV parsing + signal stats for debugging mic capture.""" info: dict = {"bytes": len(wav_bytes)} try: with wave.open(io.BytesIO(wav_bytes), "rb") as wf: # type: ignore[name-defined] nch = wf.getnchannels() sw = wf.getsampwidth() fr = wf.getframerate() nframes = wf.getnframes() info.update({"channels": nch, "sample_width": sw, "frame_rate": fr, "frames": nframes}) # Read up to ~3 seconds of audio for stats (avoid huge CPU) max_frames = min(nframes, fr * 3) frames = wf.readframes(max_frames) except Exception as e: info["parse_error"] = str(e) return info # Only compute stats for 16-bit PCM (most common). if info.get("sample_width") != 2: return info try: samples = array.array("h") samples.frombytes(frames) if not samples: return info mn = min(samples) mx = max(samples) zeros = sum(1 for s in samples if s == 0) # RMS over interleaved samples (good enough for quick signal presence) n = float(len(samples)) rms = (sum(float(s) * float(s) for s in samples) / n) ** 0.5 # Per-channel RMS (helps debug mic arrays) ch = int(info.get("channels") or 1) channel_rms = None if ch > 1: channel_rms = [] for c in range(ch): chan = samples[c::ch] if not chan: channel_rms.append(0.0) else: nn = float(len(chan)) channel_rms.append((sum(float(s) * float(s) for s in chan) / nn) ** 0.5) info.update( { "min": int(mn), "max": int(mx), "rms": float(rms), "zero_frac": float(zeros) / n, "channel_rms": channel_rms, } ) except Exception as e: info["stats_error"] = str(e) return info def _to_loudest_channel_mono_wav(wav_bytes: bytes) -> tuple[bytes, dict]: """If WAV is multi-channel 16-bit PCM, pick loudest channel and return mono WAV bytes.""" try: with wave.open(io.BytesIO(wav_bytes), "rb") as wf: nch = wf.getnchannels() sw = wf.getsampwidth() fr = wf.getframerate() nframes = wf.getnframes() frames = wf.readframes(nframes) except Exception as e: return wav_bytes, {"convert_error": str(e)} if nch <= 1 or sw != 2: return wav_bytes, {"converted": False, "channels": nch, "sample_width": sw} samples = array.array("h") samples.frombytes(frames) if not samples: return wav_bytes, {"converted": False, "reason": "empty_samples"} # Choose loudest channel by RMS rms_list: list[float] = [] for c in range(nch): chan = samples[c::nch] if not chan: rms_list.append(0.0) else: nn = float(len(chan)) rms_list.append((sum(float(s) * float(s) for s in chan) / nn) ** 0.5) best = max(range(nch), key=lambda i: rms_list[i]) mono = samples[best::nch] out = io.BytesIO() with wave.open(out, "wb") as ow: ow.setnchannels(1) ow.setsampwidth(2) ow.setframerate(fr) ow.writeframes(mono.tobytes()) return out.getvalue(), {"converted": True, "picked_channel": best, "channel_rms": rms_list, "frame_rate": fr} class _Whisper: def __init__(self): self._model = None def available(self) -> bool: try: import faster_whisper # noqa: F401 return True except Exception: return False def transcribe_wav_bytes(self, wav_bytes: bytes) -> str: try: from faster_whisper import WhisperModel except Exception as e: raise RuntimeError( "Audio mode requires `faster-whisper` (pip install faster-whisper) and ffmpeg on your PC" ) from e model_name = os.environ.get("WHISPER_MODEL", "base") if self._model is None: # CPU-friendly default; user can override via WHISPER_MODEL and faster-whisper params if needed. self._model = WhisperModel(model_name, device="cpu", compute_type="int8") with tempfile.NamedTemporaryFile(suffix=".wav", delete=True) as f: f.write(wav_bytes) f.flush() segments, info = self._model.transcribe(f.name) text = "".join(seg.text for seg in segments).strip() return text _whisper = _Whisper() class Handler(BaseHTTPRequestHandler): server_version = "JiboAIBridge/1.0" def do_POST(self): try: client = f"{self.client_address[0]}:{self.client_address[1]}" length = int(self.headers.get("Content-Length", "0") or "0") _log(f"{client} POST {self.path} len={length}") if self.path == "/v1/chat/text": payload = _read_json(self) text = (payload.get("text") or "").strip() if not text: _json_response(self, 400, {"error": "Missing 'text'"}) return _log(f"{client} /text prompt_chars={len(text)} prompt={text[:200]!r}") try: reply = _ollama_chat(text) _log(f"{client} /text ok reply_chars={len(reply)}") _json_response(self, 200, {"reply": reply}) except URLError as e: _log(f"{client} /text ollama_unreachable err={_short_err(e)!r}") _json_response(self, 200, {"reply": _ollama_down_reply(), "ollama_ok": False, "ollama_error": _short_err(e)}) except ConnectionRefusedError as e: _log(f"{client} /text ollama_refused err={_short_err(e)!r}") _json_response(self, 200, {"reply": _ollama_down_reply(), "ollama_ok": False, "ollama_error": _short_err(e)}) return if self.path == "/v1/chat/audio": payload = _read_json(self) b64 = payload.get("wav_base64") if not b64: _json_response(self, 400, {"error": "Missing 'wav_base64'"}) return if not _whisper.available(): _log(f"{client} /audio STT unavailable (faster-whisper not installed)") _json_response( self, 503, { "error": "STT unavailable: install faster-whisper and ffmpeg on this PC", "hint": "pip install faster-whisper (and install ffmpeg)", }, ) return wav_bytes = base64.b64decode(b64) diag = _wav_diagnostics(wav_bytes) wav_for_stt, conv = _to_loudest_channel_mono_wav(wav_bytes) if conv.get("converted"): diag_mono = _wav_diagnostics(wav_for_stt) _log( f"{client} /audio wav_diag={json.dumps(diag, sort_keys=True)} " f"mono={json.dumps(diag_mono, sort_keys=True)} conv={json.dumps(conv, sort_keys=True)}" ) else: _log(f"{client} /audio wav_diag={json.dumps(diag, sort_keys=True)} conv={json.dumps(conv, sort_keys=True)}") try: # Save for debugging (overwrite each time) out_path = os.environ.get("AI_BRIDGE_LAST_WAV", "jibo_last.wav") with open(out_path, "wb") as f: f.write(wav_bytes) _log(f"{client} /audio decoded bytes={len(wav_bytes)} saved={out_path}") except Exception as e: _log(f"{client} /audio failed saving wav: {e}") transcript = _whisper.transcribe_wav_bytes(wav_for_stt) if not transcript: _log(f"{client} /audio empty transcript") _json_response(self, 200, {"reply": "I didn't catch that. Could you say it again?", "text": ""}) return _log(f"{client} /audio transcript_chars={len(transcript)} transcript={transcript[:200]!r}") try: reply = _ollama_chat(transcript) _log(f"{client} /audio ok text_chars={len(transcript)} reply_chars={len(reply)}") _json_response(self, 200, {"reply": reply, "text": transcript}) except URLError as e: _log(f"{client} /audio ollama_unreachable err={_short_err(e)!r}") _json_response( self, 200, {"reply": _ollama_down_reply(), "text": transcript, "ollama_ok": False, "ollama_error": _short_err(e)}, ) except ConnectionRefusedError as e: _log(f"{client} /audio ollama_refused err={_short_err(e)!r}") _json_response( self, 200, {"reply": _ollama_down_reply(), "text": transcript, "ollama_ok": False, "ollama_error": _short_err(e)}, ) return _json_response(self, 404, {"error": "Not found"}) except Exception as e: _log(f"ERROR {self.path}: {e}\n{traceback.format_exc()}") _json_response( self, 500, { "error": str(e), "trace": traceback.format_exc(), }, ) def log_message(self, format, *args): # Keep BaseHTTPRequestHandler from double-logging; we do our own. return def main(): ap = argparse.ArgumentParser() ap.add_argument("--host", default="0.0.0.0") ap.add_argument("--port", type=int, default=8020) args = ap.parse_args() server = ThreadingHTTPServer((args.host, args.port), Handler) _log(f"AI Bridge server listening on http://{args.host}:{args.port}") _log( "Ollama: " + os.environ.get("OLLAMA_URL", "http://127.0.0.1:11434/api/chat") + " model=" + os.environ.get("OLLAMA_MODEL", "phi3.5") ) _log("Ollama health check: curl -s http://127.0.0.1:11434/api/tags | head") if not _whisper.available(): _log("STT: faster-whisper not installed; /v1/chat/audio will return 503") server.serve_forever() if __name__ == "__main__": main()