#!/usr/bin/env python3 """Streaming AI Bridge server for Jibo. Replicates the original Jibo server pattern: responds sentence-by-sentence so the robot can start speaking immediately while the LLM is still generating. Endpoints: - POST /v1/chat/text {"text": "..."} -> chunked NDJSON, one line per sentence: {"sentence": "First sentence.", "done": false} {"sentence": "Second sentence!", "done": false} {"sentence": "", "done": true, "reply": "First sentence. Second sentence!"} Legacy (non-streaming) clients still work: the final line has the full "reply". - POST /v1/chat/audio {"wav_base64": "...", "sample_rate": 16000} -> same streaming format LLM: - Uses Ollama Chat API with streaming: http://localhost:11434/api/chat Env: OLLAMA_URL (default: http://127.0.0.1:11434/api/chat) OLLAMA_MODEL (default: smollm2:135m) STT (optional, for /audio): - If `faster-whisper` is installed, it will be used. Env: WHISPER_MODEL (default: base) Run: python3 server.py --host 0.0.0.0 --port 8020 """ from __future__ import annotations import argparse import array import base64 import io import json import os import tempfile import time import traceback import wave from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from urllib.error import URLError from urllib.request import Request, urlopen def _ts() -> str: # ISO-ish timestamp in local time; good enough for debugging return time.strftime("%Y-%m-%d %H:%M:%S") def _log(msg: str): # Server previously muted logs; we want visibility while debugging. print(f"[{_ts()}] {msg}", flush=True) def _json_response(handler: BaseHTTPRequestHandler, status: int, payload: dict): body = json.dumps(payload).encode("utf-8") handler.send_response(status) handler.send_header("Content-Type", "application/json") handler.send_header("Content-Length", str(len(body))) handler.end_headers() handler.wfile.write(body) def _read_json(handler: BaseHTTPRequestHandler) -> dict: length = int(handler.headers.get("Content-Length", "0")) raw = handler.rfile.read(length) if length else b"{}" return json.loads(raw.decode("utf-8")) def _ollama_chat(user_text: str) -> str: """Non-streaming fallback (used by /audio endpoint).""" ollama_url = os.environ.get("OLLAMA_URL", "http://127.0.0.1:11434/api/chat") model = os.environ.get("OLLAMA_MODEL", "smollm2:135m") req_body = { "model": model, "stream": False, "messages": [ {"role": "system", "content": "You are Jibo, a friendly home robot. Keep replies short and spoken."}, {"role": "user", "content": user_text}, ], } req = Request( ollama_url, data=json.dumps(req_body).encode("utf-8"), headers={"Content-Type": "application/json"}, method="POST", ) try: with urlopen(req, timeout=60) as resp: data = json.loads(resp.read().decode("utf-8")) except Exception as e: _log(f"Ollama request failed url={ollama_url!r} err={e!r}") raise msg = data.get("message") or {} content = msg.get("content") if not content: raise RuntimeError(f"Unexpected Ollama response: {data}") return content.strip() import re # Sentence boundary: split on .!? followed by space or end, but not on # abbreviations like "Dr." or "Mr." or decimals like "3.5". _SENTENCE_END_RE = re.compile( r'(?<=[.!?])\s+(?=[A-Z"\'])|(?<=[.!?])$' ) def _split_sentences(text: str) -> list[str]: """Split text into sentences. Returns list of sentence strings.""" parts = _SENTENCE_END_RE.split(text.strip()) return [p.strip() for p in parts if p.strip()] def _ollama_chat_streaming(user_text: str, on_sentence): """Stream Ollama response token-by-token, calling on_sentence(sentence_str) each time a complete sentence is detected. Returns the full reply string. This is the key to sub-5-second perceived latency: the robot starts speaking the first sentence while the LLM is still generating the rest. """ ollama_url = os.environ.get("OLLAMA_URL", "http://127.0.0.1:11434/api/chat") model = os.environ.get("OLLAMA_MODEL", "smollm2:135m") req_body = { "model": model, "stream": True, "messages": [ {"role": "system", "content": "You are Jibo, a friendly home robot. Keep replies short and spoken."}, {"role": "user", "content": user_text}, ], } req = Request( ollama_url, data=json.dumps(req_body).encode("utf-8"), headers={"Content-Type": "application/json"}, method="POST", ) full_reply = "" buffer = "" sentences_sent = 0 t0 = time.monotonic() with urlopen(req, timeout=120) as resp: # Ollama streams one JSON object per line for raw_line in resp: line = raw_line.decode("utf-8", errors="replace").strip() if not line: continue try: chunk = json.loads(line) except json.JSONDecodeError: continue # Each chunk: {"message": {"content": "token"}, "done": false} msg = chunk.get("message") or {} token = msg.get("content") or "" if token: buffer += token full_reply += token # Check if buffer contains a complete sentence to flush # Look for sentence-ending punctuation followed by a space or more tokens sentences = _split_sentences(buffer) if len(sentences) > 1: # All but the last are complete sentences; last is still building for s in sentences[:-1]: elapsed = time.monotonic() - t0 _log(f" sentence #{sentences_sent} at {elapsed:.2f}s: {s[:120]!r}") on_sentence(s) sentences_sent += 1 buffer = sentences[-1] if chunk.get("done"): break # Flush any remaining text as the final sentence leftover = buffer.strip() if leftover: elapsed = time.monotonic() - t0 _log(f" sentence #{sentences_sent} (final) at {elapsed:.2f}s: {leftover[:120]!r}") on_sentence(leftover) return full_reply.strip() def _short_err(e: BaseException) -> str: s = str(e) or e.__class__.__name__ s = " ".join(s.split()) if len(s) > 240: s = s[:240] + "..." return s def _ollama_down_reply() -> str: # Keep it short and speakable. return "My AI server isn't reachable right now. Please start Ollama on the computer, then try again." def _wav_diagnostics(wav_bytes: bytes) -> dict: """Best-effort WAV parsing + signal stats for debugging mic capture.""" info: dict = {"bytes": len(wav_bytes)} try: with wave.open(io.BytesIO(wav_bytes), "rb") as wf: # type: ignore[name-defined] nch = wf.getnchannels() sw = wf.getsampwidth() fr = wf.getframerate() nframes = wf.getnframes() info.update({"channels": nch, "sample_width": sw, "frame_rate": fr, "frames": nframes}) # Read up to ~3 seconds of audio for stats (avoid huge CPU) max_frames = min(nframes, fr * 3) frames = wf.readframes(max_frames) except Exception as e: info["parse_error"] = str(e) return info # Only compute stats for 16-bit PCM (most common). if info.get("sample_width") != 2: return info try: samples = array.array("h") samples.frombytes(frames) if not samples: return info mn = min(samples) mx = max(samples) zeros = sum(1 for s in samples if s == 0) # RMS over interleaved samples (good enough for quick signal presence) n = float(len(samples)) rms = (sum(float(s) * float(s) for s in samples) / n) ** 0.5 # Per-channel RMS (helps debug mic arrays) ch = int(info.get("channels") or 1) channel_rms = None if ch > 1: channel_rms = [] for c in range(ch): chan = samples[c::ch] if not chan: channel_rms.append(0.0) else: nn = float(len(chan)) channel_rms.append((sum(float(s) * float(s) for s in chan) / nn) ** 0.5) info.update( { "min": int(mn), "max": int(mx), "rms": float(rms), "zero_frac": float(zeros) / n, "channel_rms": channel_rms, } ) except Exception as e: info["stats_error"] = str(e) return info def _to_loudest_channel_mono_wav(wav_bytes: bytes) -> tuple[bytes, dict]: """If WAV is multi-channel 16-bit PCM, pick loudest channel and return mono WAV bytes.""" try: with wave.open(io.BytesIO(wav_bytes), "rb") as wf: nch = wf.getnchannels() sw = wf.getsampwidth() fr = wf.getframerate() nframes = wf.getnframes() frames = wf.readframes(nframes) except Exception as e: return wav_bytes, {"convert_error": str(e)} if nch <= 1 or sw != 2: return wav_bytes, {"converted": False, "channels": nch, "sample_width": sw} samples = array.array("h") samples.frombytes(frames) if not samples: return wav_bytes, {"converted": False, "reason": "empty_samples"} # Choose loudest channel by RMS rms_list: list[float] = [] for c in range(nch): chan = samples[c::nch] if not chan: rms_list.append(0.0) else: nn = float(len(chan)) rms_list.append((sum(float(s) * float(s) for s in chan) / nn) ** 0.5) best = max(range(nch), key=lambda i: rms_list[i]) mono = samples[best::nch] out = io.BytesIO() with wave.open(out, "wb") as ow: ow.setnchannels(1) ow.setsampwidth(2) ow.setframerate(fr) ow.writeframes(mono.tobytes()) return out.getvalue(), {"converted": True, "picked_channel": best, "channel_rms": rms_list, "frame_rate": fr} class _Whisper: def __init__(self): self._model = None def available(self) -> bool: try: import faster_whisper # noqa: F401 return True except Exception: return False def transcribe_wav_bytes(self, wav_bytes: bytes) -> str: try: from faster_whisper import WhisperModel except Exception as e: raise RuntimeError( "Audio mode requires `faster-whisper` (pip install faster-whisper) and ffmpeg on your PC" ) from e model_name = os.environ.get("WHISPER_MODEL", "base") if self._model is None: # CPU-friendly default; user can override via WHISPER_MODEL and faster-whisper params if needed. self._model = WhisperModel(model_name, device="cpu", compute_type="int8") with tempfile.NamedTemporaryFile(suffix=".wav", delete=True) as f: f.write(wav_bytes) f.flush() segments, info = self._model.transcribe(f.name) text = "".join(seg.text for seg in segments).strip() return text _whisper = _Whisper() class Handler(BaseHTTPRequestHandler): server_version = "JiboAIBridge/1.0" def do_POST(self): try: client = f"{self.client_address[0]}:{self.client_address[1]}" length = int(self.headers.get("Content-Length", "0") or "0") _log(f"{client} POST {self.path} len={length}") if self.path == "/v1/chat/text": payload = _read_json(self) text = (payload.get("text") or "").strip() if not text: _json_response(self, 400, {"error": "Missing 'text'"}) return _log(f"{client} /text prompt_chars={len(text)} prompt={text[:200]!r}") try: # Stream response: chunked transfer encoding with NDJSON # Each line is a JSON object the robot can parse immediately. # This replicates the original Jibo hub pattern where the robot # starts acting on partial results while the server is still working. self.send_response(200) self.send_header("Content-Type", "application/x-ndjson") self.send_header("Transfer-Encoding", "chunked") self.end_headers() t0 = time.monotonic() def _send_chunk(obj: dict): line = json.dumps(obj) + "\n" data = line.encode("utf-8") # HTTP chunked encoding: hex size + CRLF + data + CRLF self.wfile.write(f"{len(data):x}\r\n".encode()) self.wfile.write(data) self.wfile.write(b"\r\n") self.wfile.flush() def _on_sentence(sentence: str): _send_chunk({"sentence": sentence, "done": False}) reply = _ollama_chat_streaming(text, _on_sentence) elapsed = time.monotonic() - t0 _log(f"{client} /text ok reply_chars={len(reply)} elapsed={elapsed:.2f}s") # Final chunk with full reply (for logging / legacy compat) _send_chunk({"sentence": "", "done": True, "reply": reply}) # Terminate chunked encoding self.wfile.write(b"0\r\n\r\n") self.wfile.flush() except (URLError, ConnectionRefusedError) as e: _log(f"{client} /text ollama_unreachable err={_short_err(e)!r}") # Fallback: try to send a non-streaming error response. # If headers already sent, write it as a chunk. try: err_reply = _ollama_down_reply() err_obj = {"sentence": err_reply, "done": True, "reply": err_reply, "ollama_ok": False, "ollama_error": _short_err(e)} line = json.dumps(err_obj) + "\n" data = line.encode("utf-8") self.wfile.write(f"{len(data):x}\r\n".encode()) self.wfile.write(data) self.wfile.write(b"\r\n0\r\n\r\n") self.wfile.flush() except Exception: pass # headers may not have been sent yet return if self.path == "/v1/chat/audio": payload = _read_json(self) b64 = payload.get("wav_base64") if not b64: _json_response(self, 400, {"error": "Missing 'wav_base64'"}) return if not _whisper.available(): _log(f"{client} /audio STT unavailable (faster-whisper not installed)") _json_response( self, 503, { "error": "STT unavailable: install faster-whisper and ffmpeg on this PC", "hint": "pip install faster-whisper (and install ffmpeg)", }, ) return wav_bytes = base64.b64decode(b64) diag = _wav_diagnostics(wav_bytes) wav_for_stt, conv = _to_loudest_channel_mono_wav(wav_bytes) if conv.get("converted"): diag_mono = _wav_diagnostics(wav_for_stt) _log( f"{client} /audio wav_diag={json.dumps(diag, sort_keys=True)} " f"mono={json.dumps(diag_mono, sort_keys=True)} conv={json.dumps(conv, sort_keys=True)}" ) else: _log(f"{client} /audio wav_diag={json.dumps(diag, sort_keys=True)} conv={json.dumps(conv, sort_keys=True)}") try: # Save for debugging (overwrite each time) out_path = os.environ.get("AI_BRIDGE_LAST_WAV", "jibo_last.wav") with open(out_path, "wb") as f: f.write(wav_bytes) _log(f"{client} /audio decoded bytes={len(wav_bytes)} saved={out_path}") except Exception as e: _log(f"{client} /audio failed saving wav: {e}") transcript = _whisper.transcribe_wav_bytes(wav_for_stt) if not transcript: _log(f"{client} /audio empty transcript") _json_response(self, 200, {"reply": "I didn't catch that. Could you say it again?", "text": ""}) return _log(f"{client} /audio transcript_chars={len(transcript)} transcript={transcript[:200]!r}") try: reply = _ollama_chat(transcript) _log(f"{client} /audio ok text_chars={len(transcript)} reply_chars={len(reply)}") _json_response(self, 200, {"reply": reply, "text": transcript}) except URLError as e: _log(f"{client} /audio ollama_unreachable err={_short_err(e)!r}") _json_response( self, 200, {"reply": _ollama_down_reply(), "text": transcript, "ollama_ok": False, "ollama_error": _short_err(e)}, ) except ConnectionRefusedError as e: _log(f"{client} /audio ollama_refused err={_short_err(e)!r}") _json_response( self, 200, {"reply": _ollama_down_reply(), "text": transcript, "ollama_ok": False, "ollama_error": _short_err(e)}, ) return _json_response(self, 404, {"error": "Not found"}) except Exception as e: _log(f"ERROR {self.path}: {e}\n{traceback.format_exc()}") _json_response( self, 500, { "error": str(e), "trace": traceback.format_exc(), }, ) def log_message(self, format, *args): # Keep BaseHTTPRequestHandler from double-logging; we do our own. return def main(): ap = argparse.ArgumentParser() ap.add_argument("--host", default="0.0.0.0") ap.add_argument("--port", type=int, default=8020) args = ap.parse_args() server = ThreadingHTTPServer((args.host, args.port), Handler) _log(f"AI Bridge server listening on http://{args.host}:{args.port}") _log( "Ollama: " + os.environ.get("OLLAMA_URL", "http://127.0.0.1:11434/api/chat") + " model=" + os.environ.get("OLLAMA_MODEL", "smollm2:135m") ) _log("Ollama health check: curl -s http://127.0.0.1:11434/api/tags | head") if not _whisper.available(): _log("STT: faster-whisper not installed; /v1/chat/audio will return 503") server.serve_forever() if __name__ == "__main__": main()