Files
JiboOs/V3.1/build/ai_bridge_server/server.py

384 lines
14 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""Minimal AI Bridge server for Jibo.
Endpoints:
- POST /v1/chat/text {"text": "..."} -> {"reply": "..."}
- POST /v1/chat/audio {"wav_base64": "...", "sample_rate": 16000} -> {"reply": "...", "text": "<transcript>"}
LLM:
- Uses Ollama Chat API by default: http://localhost:11434/api/chat
Env:
OLLAMA_URL (default: http://127.0.0.1:11434/api/chat)
OLLAMA_MODEL (default: phi3.5)
STT (optional, for /audio):
- If `faster-whisper` is installed, it will be used.
Env:
WHISPER_MODEL (default: base)
Run:
python3 server.py --host 0.0.0.0 --port 8020
"""
from __future__ import annotations
import argparse
import array
import base64
import io
import json
import os
import tempfile
import time
import traceback
import wave
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
2026-03-19 14:11:26 +02:00
from urllib.error import URLError
from urllib.request import Request, urlopen
def _ts() -> str:
# ISO-ish timestamp in local time; good enough for debugging
return time.strftime("%Y-%m-%d %H:%M:%S")
def _log(msg: str):
# Server previously muted logs; we want visibility while debugging.
print(f"[{_ts()}] {msg}", flush=True)
def _json_response(handler: BaseHTTPRequestHandler, status: int, payload: dict):
body = json.dumps(payload).encode("utf-8")
handler.send_response(status)
handler.send_header("Content-Type", "application/json")
handler.send_header("Content-Length", str(len(body)))
handler.end_headers()
handler.wfile.write(body)
def _read_json(handler: BaseHTTPRequestHandler) -> dict:
length = int(handler.headers.get("Content-Length", "0"))
raw = handler.rfile.read(length) if length else b"{}"
return json.loads(raw.decode("utf-8"))
def _ollama_chat(user_text: str) -> str:
ollama_url = os.environ.get("OLLAMA_URL", "http://127.0.0.1:11434/api/chat")
model = os.environ.get("OLLAMA_MODEL", "phi3.5")
req_body = {
"model": model,
"stream": False,
"messages": [
{"role": "system", "content": "You are Jibo, a friendly home robot. Keep replies short and spoken."},
{"role": "user", "content": user_text},
],
}
req = Request(
ollama_url,
data=json.dumps(req_body).encode("utf-8"),
headers={"Content-Type": "application/json"},
method="POST",
)
2026-03-19 14:11:26 +02:00
try:
with urlopen(req, timeout=60) as resp:
data = json.loads(resp.read().decode("utf-8"))
except Exception as e:
# Let caller decide how to respond; include a useful hint in logs.
_log(f"Ollama request failed url={ollama_url!r} err={e!r}")
raise
msg = data.get("message") or {}
content = msg.get("content")
if not content:
raise RuntimeError(f"Unexpected Ollama response: {data}")
return content.strip()
2026-03-19 14:11:26 +02:00
def _short_err(e: BaseException) -> str:
s = str(e) or e.__class__.__name__
s = " ".join(s.split())
if len(s) > 240:
s = s[:240] + "..."
return s
def _ollama_down_reply() -> str:
# Keep it short and speakable.
return "My AI server isn't reachable right now. Please start Ollama on the computer, then try again."
def _wav_diagnostics(wav_bytes: bytes) -> dict:
"""Best-effort WAV parsing + signal stats for debugging mic capture."""
info: dict = {"bytes": len(wav_bytes)}
try:
with wave.open(io.BytesIO(wav_bytes), "rb") as wf: # type: ignore[name-defined]
nch = wf.getnchannels()
sw = wf.getsampwidth()
fr = wf.getframerate()
nframes = wf.getnframes()
info.update({"channels": nch, "sample_width": sw, "frame_rate": fr, "frames": nframes})
# Read up to ~3 seconds of audio for stats (avoid huge CPU)
max_frames = min(nframes, fr * 3)
frames = wf.readframes(max_frames)
except Exception as e:
info["parse_error"] = str(e)
return info
# Only compute stats for 16-bit PCM (most common).
if info.get("sample_width") != 2:
return info
try:
samples = array.array("h")
samples.frombytes(frames)
if not samples:
return info
mn = min(samples)
mx = max(samples)
zeros = sum(1 for s in samples if s == 0)
# RMS over interleaved samples (good enough for quick signal presence)
n = float(len(samples))
rms = (sum(float(s) * float(s) for s in samples) / n) ** 0.5
# Per-channel RMS (helps debug mic arrays)
ch = int(info.get("channels") or 1)
channel_rms = None
if ch > 1:
channel_rms = []
for c in range(ch):
chan = samples[c::ch]
if not chan:
channel_rms.append(0.0)
else:
nn = float(len(chan))
channel_rms.append((sum(float(s) * float(s) for s in chan) / nn) ** 0.5)
info.update(
{
"min": int(mn),
"max": int(mx),
"rms": float(rms),
"zero_frac": float(zeros) / n,
"channel_rms": channel_rms,
}
)
except Exception as e:
info["stats_error"] = str(e)
return info
def _to_loudest_channel_mono_wav(wav_bytes: bytes) -> tuple[bytes, dict]:
"""If WAV is multi-channel 16-bit PCM, pick loudest channel and return mono WAV bytes."""
try:
with wave.open(io.BytesIO(wav_bytes), "rb") as wf:
nch = wf.getnchannels()
sw = wf.getsampwidth()
fr = wf.getframerate()
nframes = wf.getnframes()
frames = wf.readframes(nframes)
except Exception as e:
return wav_bytes, {"convert_error": str(e)}
if nch <= 1 or sw != 2:
return wav_bytes, {"converted": False, "channels": nch, "sample_width": sw}
samples = array.array("h")
samples.frombytes(frames)
if not samples:
return wav_bytes, {"converted": False, "reason": "empty_samples"}
# Choose loudest channel by RMS
rms_list: list[float] = []
for c in range(nch):
chan = samples[c::nch]
if not chan:
rms_list.append(0.0)
else:
nn = float(len(chan))
rms_list.append((sum(float(s) * float(s) for s in chan) / nn) ** 0.5)
best = max(range(nch), key=lambda i: rms_list[i])
mono = samples[best::nch]
out = io.BytesIO()
with wave.open(out, "wb") as ow:
ow.setnchannels(1)
ow.setsampwidth(2)
ow.setframerate(fr)
ow.writeframes(mono.tobytes())
return out.getvalue(), {"converted": True, "picked_channel": best, "channel_rms": rms_list, "frame_rate": fr}
class _Whisper:
def __init__(self):
self._model = None
def available(self) -> bool:
try:
import faster_whisper # noqa: F401
return True
except Exception:
return False
def transcribe_wav_bytes(self, wav_bytes: bytes) -> str:
try:
from faster_whisper import WhisperModel
except Exception as e:
raise RuntimeError(
"Audio mode requires `faster-whisper` (pip install faster-whisper) and ffmpeg on your PC"
) from e
model_name = os.environ.get("WHISPER_MODEL", "base")
if self._model is None:
# CPU-friendly default; user can override via WHISPER_MODEL and faster-whisper params if needed.
self._model = WhisperModel(model_name, device="cpu", compute_type="int8")
with tempfile.NamedTemporaryFile(suffix=".wav", delete=True) as f:
f.write(wav_bytes)
f.flush()
segments, info = self._model.transcribe(f.name)
text = "".join(seg.text for seg in segments).strip()
return text
_whisper = _Whisper()
class Handler(BaseHTTPRequestHandler):
server_version = "JiboAIBridge/1.0"
def do_POST(self):
try:
client = f"{self.client_address[0]}:{self.client_address[1]}"
length = int(self.headers.get("Content-Length", "0") or "0")
_log(f"{client} POST {self.path} len={length}")
if self.path == "/v1/chat/text":
payload = _read_json(self)
text = (payload.get("text") or "").strip()
if not text:
_json_response(self, 400, {"error": "Missing 'text'"})
return
_log(f"{client} /text prompt_chars={len(text)} prompt={text[:200]!r}")
2026-03-19 14:11:26 +02:00
try:
reply = _ollama_chat(text)
_log(f"{client} /text ok reply_chars={len(reply)}")
_json_response(self, 200, {"reply": reply})
except URLError as e:
_log(f"{client} /text ollama_unreachable err={_short_err(e)!r}")
_json_response(self, 200, {"reply": _ollama_down_reply(), "ollama_ok": False, "ollama_error": _short_err(e)})
except ConnectionRefusedError as e:
_log(f"{client} /text ollama_refused err={_short_err(e)!r}")
_json_response(self, 200, {"reply": _ollama_down_reply(), "ollama_ok": False, "ollama_error": _short_err(e)})
return
if self.path == "/v1/chat/audio":
payload = _read_json(self)
b64 = payload.get("wav_base64")
if not b64:
_json_response(self, 400, {"error": "Missing 'wav_base64'"})
return
if not _whisper.available():
_log(f"{client} /audio STT unavailable (faster-whisper not installed)")
_json_response(
self,
503,
{
"error": "STT unavailable: install faster-whisper and ffmpeg on this PC",
"hint": "pip install faster-whisper (and install ffmpeg)",
},
)
return
wav_bytes = base64.b64decode(b64)
diag = _wav_diagnostics(wav_bytes)
wav_for_stt, conv = _to_loudest_channel_mono_wav(wav_bytes)
if conv.get("converted"):
diag_mono = _wav_diagnostics(wav_for_stt)
_log(
f"{client} /audio wav_diag={json.dumps(diag, sort_keys=True)} "
f"mono={json.dumps(diag_mono, sort_keys=True)} conv={json.dumps(conv, sort_keys=True)}"
)
else:
_log(f"{client} /audio wav_diag={json.dumps(diag, sort_keys=True)} conv={json.dumps(conv, sort_keys=True)}")
try:
# Save for debugging (overwrite each time)
out_path = os.environ.get("AI_BRIDGE_LAST_WAV", "jibo_last.wav")
with open(out_path, "wb") as f:
f.write(wav_bytes)
_log(f"{client} /audio decoded bytes={len(wav_bytes)} saved={out_path}")
except Exception as e:
_log(f"{client} /audio failed saving wav: {e}")
transcript = _whisper.transcribe_wav_bytes(wav_for_stt)
if not transcript:
_log(f"{client} /audio empty transcript")
_json_response(self, 200, {"reply": "I didn't catch that. Could you say it again?", "text": ""})
return
_log(f"{client} /audio transcript_chars={len(transcript)} transcript={transcript[:200]!r}")
2026-03-19 14:11:26 +02:00
try:
reply = _ollama_chat(transcript)
_log(f"{client} /audio ok text_chars={len(transcript)} reply_chars={len(reply)}")
_json_response(self, 200, {"reply": reply, "text": transcript})
except URLError as e:
_log(f"{client} /audio ollama_unreachable err={_short_err(e)!r}")
_json_response(
self,
200,
{"reply": _ollama_down_reply(), "text": transcript, "ollama_ok": False, "ollama_error": _short_err(e)},
)
except ConnectionRefusedError as e:
_log(f"{client} /audio ollama_refused err={_short_err(e)!r}")
_json_response(
self,
200,
{"reply": _ollama_down_reply(), "text": transcript, "ollama_ok": False, "ollama_error": _short_err(e)},
)
return
_json_response(self, 404, {"error": "Not found"})
except Exception as e:
_log(f"ERROR {self.path}: {e}\n{traceback.format_exc()}")
_json_response(
self,
500,
{
"error": str(e),
"trace": traceback.format_exc(),
},
)
def log_message(self, format, *args):
# Keep BaseHTTPRequestHandler from double-logging; we do our own.
return
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--host", default="0.0.0.0")
ap.add_argument("--port", type=int, default=8020)
args = ap.parse_args()
server = ThreadingHTTPServer((args.host, args.port), Handler)
_log(f"AI Bridge server listening on http://{args.host}:{args.port}")
_log(
"Ollama: "
+ os.environ.get("OLLAMA_URL", "http://127.0.0.1:11434/api/chat")
+ " model="
+ os.environ.get("OLLAMA_MODEL", "phi3.5")
)
2026-03-19 14:11:26 +02:00
_log("Ollama health check: curl -s http://127.0.0.1:11434/api/tags | head")
if not _whisper.available():
_log("STT: faster-whisper not installed; /v1/chat/audio will return 503")
server.serve_forever()
if __name__ == "__main__":
main()