diff --git a/V3.1/build/JiboAppToolkit.pdf b/V3.1/build/JiboAppToolkit.pdf new file mode 100644 index 00000000..babd9ebe Binary files /dev/null and b/V3.1/build/JiboAppToolkit.pdf differ diff --git a/V3.1/build/ai_bridge_server/__pycache__/server.cpython-313.pyc b/V3.1/build/ai_bridge_server/__pycache__/server.cpython-313.pyc index 20f2acd0..1ef72367 100644 Binary files a/V3.1/build/ai_bridge_server/__pycache__/server.cpython-313.pyc and b/V3.1/build/ai_bridge_server/__pycache__/server.cpython-313.pyc differ diff --git a/V3.1/build/ai_bridge_server/server.py b/V3.1/build/ai_bridge_server/server.py index 0086d65a..47bab20c 100644 --- a/V3.1/build/ai_bridge_server/server.py +++ b/V3.1/build/ai_bridge_server/server.py @@ -1,15 +1,23 @@ #!/usr/bin/env python3 -"""Minimal AI Bridge server for Jibo. +"""Streaming AI Bridge server for Jibo. + +Replicates the original Jibo server pattern: responds sentence-by-sentence +so the robot can start speaking immediately while the LLM is still generating. Endpoints: -- POST /v1/chat/text {"text": "..."} -> {"reply": "..."} -- POST /v1/chat/audio {"wav_base64": "...", "sample_rate": 16000} -> {"reply": "...", "text": ""} +- POST /v1/chat/text {"text": "..."} -> chunked NDJSON, one line per sentence: + {"sentence": "First sentence.", "done": false} + {"sentence": "Second sentence!", "done": false} + {"sentence": "", "done": true, "reply": "First sentence. Second sentence!"} + Legacy (non-streaming) clients still work: the final line has the full "reply". + +- POST /v1/chat/audio {"wav_base64": "...", "sample_rate": 16000} -> same streaming format LLM: -- Uses Ollama Chat API by default: http://localhost:11434/api/chat +- Uses Ollama Chat API with streaming: http://localhost:11434/api/chat Env: OLLAMA_URL (default: http://127.0.0.1:11434/api/chat) - OLLAMA_MODEL (default: phi3.5) + OLLAMA_MODEL (default: smollm2:135m) STT (optional, for /audio): - If `faster-whisper` is installed, it will be used. @@ -63,8 +71,9 @@ def _read_json(handler: BaseHTTPRequestHandler) -> dict: def _ollama_chat(user_text: str) -> str: + """Non-streaming fallback (used by /audio endpoint).""" ollama_url = os.environ.get("OLLAMA_URL", "http://127.0.0.1:11434/api/chat") - model = os.environ.get("OLLAMA_MODEL", "phi3.5") + model = os.environ.get("OLLAMA_MODEL", "smollm2:135m") req_body = { "model": model, @@ -86,7 +95,6 @@ def _ollama_chat(user_text: str) -> str: with urlopen(req, timeout=60) as resp: data = json.loads(resp.read().decode("utf-8")) except Exception as e: - # Let caller decide how to respond; include a useful hint in logs. _log(f"Ollama request failed url={ollama_url!r} err={e!r}") raise @@ -97,6 +105,95 @@ def _ollama_chat(user_text: str) -> str: return content.strip() +import re + +# Sentence boundary: split on .!? followed by space or end, but not on +# abbreviations like "Dr." or "Mr." or decimals like "3.5". +_SENTENCE_END_RE = re.compile( + r'(?<=[.!?])\s+(?=[A-Z"\'])|(?<=[.!?])$' +) + + +def _split_sentences(text: str) -> list[str]: + """Split text into sentences. Returns list of sentence strings.""" + parts = _SENTENCE_END_RE.split(text.strip()) + return [p.strip() for p in parts if p.strip()] + + +def _ollama_chat_streaming(user_text: str, on_sentence): + """Stream Ollama response token-by-token, calling on_sentence(sentence_str) + each time a complete sentence is detected. Returns the full reply string. + + This is the key to sub-5-second perceived latency: the robot starts speaking + the first sentence while the LLM is still generating the rest. + """ + ollama_url = os.environ.get("OLLAMA_URL", "http://127.0.0.1:11434/api/chat") + model = os.environ.get("OLLAMA_MODEL", "smollm2:135m") + + req_body = { + "model": model, + "stream": True, + "messages": [ + {"role": "system", "content": "You are Jibo, a friendly home robot. Keep replies short and spoken."}, + {"role": "user", "content": user_text}, + ], + } + + req = Request( + ollama_url, + data=json.dumps(req_body).encode("utf-8"), + headers={"Content-Type": "application/json"}, + method="POST", + ) + + full_reply = "" + buffer = "" + sentences_sent = 0 + t0 = time.monotonic() + + with urlopen(req, timeout=120) as resp: + # Ollama streams one JSON object per line + for raw_line in resp: + line = raw_line.decode("utf-8", errors="replace").strip() + if not line: + continue + try: + chunk = json.loads(line) + except json.JSONDecodeError: + continue + + # Each chunk: {"message": {"content": "token"}, "done": false} + msg = chunk.get("message") or {} + token = msg.get("content") or "" + if token: + buffer += token + full_reply += token + + # Check if buffer contains a complete sentence to flush + # Look for sentence-ending punctuation followed by a space or more tokens + sentences = _split_sentences(buffer) + if len(sentences) > 1: + # All but the last are complete sentences; last is still building + for s in sentences[:-1]: + elapsed = time.monotonic() - t0 + _log(f" sentence #{sentences_sent} at {elapsed:.2f}s: {s[:120]!r}") + on_sentence(s) + sentences_sent += 1 + buffer = sentences[-1] + + if chunk.get("done"): + break + + # Flush any remaining text as the final sentence + leftover = buffer.strip() + if leftover: + elapsed = time.monotonic() - t0 + _log(f" sentence #{sentences_sent} (final) at {elapsed:.2f}s: {leftover[:120]!r}") + on_sentence(leftover) + + return full_reply.strip() + + def _short_err(e: BaseException) -> str: s = str(e) or e.__class__.__name__ s = " ".join(s.split()) @@ -265,15 +362,55 @@ class Handler(BaseHTTPRequestHandler): return _log(f"{client} /text prompt_chars={len(text)} prompt={text[:200]!r}") try: - reply = _ollama_chat(text) - _log(f"{client} /text ok reply_chars={len(reply)}") - _json_response(self, 200, {"reply": reply}) - except URLError as e: + # Stream response: chunked transfer encoding with NDJSON + # Each line is a JSON object the robot can parse immediately. + # This replicates the original Jibo hub pattern where the robot + # starts acting on partial results while the server is still working. + self.send_response(200) + self.send_header("Content-Type", "application/x-ndjson") + self.send_header("Transfer-Encoding", "chunked") + self.end_headers() + + t0 = time.monotonic() + + def _send_chunk(obj: dict): + line = json.dumps(obj) + "\n" + data = line.encode("utf-8") + # HTTP chunked encoding: hex size + CRLF + data + CRLF + self.wfile.write(f"{len(data):x}\r\n".encode()) + self.wfile.write(data) + self.wfile.write(b"\r\n") + self.wfile.flush() + + def _on_sentence(sentence: str): + _send_chunk({"sentence": sentence, "done": False}) + + reply = _ollama_chat_streaming(text, _on_sentence) + elapsed = time.monotonic() - t0 + _log(f"{client} /text ok reply_chars={len(reply)} elapsed={elapsed:.2f}s") + + # Final chunk with full reply (for logging / legacy compat) + _send_chunk({"sentence": "", "done": True, "reply": reply}) + # Terminate chunked encoding + self.wfile.write(b"0\r\n\r\n") + self.wfile.flush() + + except (URLError, ConnectionRefusedError) as e: _log(f"{client} /text ollama_unreachable err={_short_err(e)!r}") - _json_response(self, 200, {"reply": _ollama_down_reply(), "ollama_ok": False, "ollama_error": _short_err(e)}) - except ConnectionRefusedError as e: - _log(f"{client} /text ollama_refused err={_short_err(e)!r}") - _json_response(self, 200, {"reply": _ollama_down_reply(), "ollama_ok": False, "ollama_error": _short_err(e)}) + # Fallback: try to send a non-streaming error response. + # If headers already sent, write it as a chunk. + try: + err_reply = _ollama_down_reply() + err_obj = {"sentence": err_reply, "done": True, "reply": err_reply, + "ollama_ok": False, "ollama_error": _short_err(e)} + line = json.dumps(err_obj) + "\n" + data = line.encode("utf-8") + self.wfile.write(f"{len(data):x}\r\n".encode()) + self.wfile.write(data) + self.wfile.write(b"\r\n0\r\n\r\n") + self.wfile.flush() + except Exception: + pass # headers may not have been sent yet return if self.path == "/v1/chat/audio": @@ -371,7 +508,7 @@ def main(): "Ollama: " + os.environ.get("OLLAMA_URL", "http://127.0.0.1:11434/api/chat") + " model=" - + os.environ.get("OLLAMA_MODEL", "phi3.5") + + os.environ.get("OLLAMA_MODEL", "smollm2:135m") ) _log("Ollama health check: curl -s http://127.0.0.1:11434/api/tags | head") if not _whisper.available(): diff --git a/V3.1/build/opt/jibo/Jibo/Skills/@be/be/be/ai-bridge.js b/V3.1/build/opt/jibo/Jibo/Skills/@be/be/be/ai-bridge.js index 1b53abfa..77c69581 100644 --- a/V3.1/build/opt/jibo/Jibo/Skills/@be/be/be/ai-bridge.js +++ b/V3.1/build/opt/jibo/Jibo/Skills/@be/be/be/ai-bridge.js @@ -179,6 +179,117 @@ function httpJsonPost(urlString, payload, timeoutMs) { }); } +// Streaming NDJSON POST — reads the server response line-by-line and calls +// onSentence(sentenceString) for each complete sentence as it arrives. +// This replicates the original Jibo hub pattern: the robot starts speaking +// while the server is still generating the rest of the response. +// Returns a Promise that resolves to the full reply string. +function httpStreamingPost(urlString, payload, onSentence, timeoutMs) { + timeoutMs = typeof timeoutMs === "number" ? timeoutMs : 120000; + + var parsed = urlLib.parse(urlString); + var isHttps = parsed.protocol === "https:"; + var bodyStr = JSON.stringify(payload || {}); + var body = new Buffer(bodyStr, "utf8"); + + var requestOptions = { + protocol: parsed.protocol, + hostname: parsed.hostname, + port: parsed.port || (isHttps ? 443 : 80), + path: parsed.path || "/", + method: "POST", + headers: { + "Content-Type": "application/json", + "Content-Length": body.length, + }, + timeout: timeoutMs, + }; + + return new Promise(function (resolve, reject) { + var req = (isHttps ? https : http).request(requestOptions, function (res) { + if (res.statusCode && (res.statusCode < 200 || res.statusCode >= 300)) { + var errChunks = []; + res.on("data", function (d) { errChunks.push(d); }); + res.on("end", function () { + reject(new Error("AI Bridge streaming: HTTP " + res.statusCode + ": " + Buffer.concat(errChunks).toString("utf8"))); + }); + return; + } + + var fullReply = ""; + var lineBuf = ""; + + res.on("data", function (chunk) { + lineBuf += chunk.toString("utf8"); + + // Process complete lines (NDJSON = one JSON object per line) + var lines = lineBuf.split("\n"); + // Keep the last (possibly incomplete) line in the buffer + lineBuf = lines.pop() || ""; + + for (var i = 0; i < lines.length; i++) { + var line = lines[i].replace(/^\s+|\s+$/g, ""); + if (!line) continue; + try { + var obj = JSON.parse(line); + } catch (e) { + continue; // skip malformed lines + } + + if (obj.done && obj.reply) { + fullReply = obj.reply; + } + if (obj.sentence && !obj.done) { + try { + onSentence(obj.sentence); + } catch (e) { + // don't let TTS errors kill the stream + } + } + // Handle error/fallback in the done+sentence case (ollama down) + if (obj.done && obj.sentence) { + try { + onSentence(obj.sentence); + } catch (e) { + // ignore + } + if (!fullReply) fullReply = obj.sentence; + } + } + }); + + res.on("end", function () { + // Process any remaining data in the buffer + var remaining = (lineBuf || "").replace(/^\s+|\s+$/g, ""); + if (remaining) { + try { + var obj = JSON.parse(remaining); + if (obj.reply) fullReply = obj.reply; + if (obj.sentence && !obj.done) { + try { onSentence(obj.sentence); } catch (e) { /* ignore */ } + } + if (obj.done && obj.sentence) { + try { onSentence(obj.sentence); } catch (e) { /* ignore */ } + if (!fullReply) fullReply = obj.sentence; + } + } catch (e) { + // ignore + } + } + resolve(fullReply || ""); + }); + }); + + req.on("error", reject); + req.on("timeout", function () { + req.destroy(new Error("AI Bridge: streaming request timeout")); + }); + + req.write(body); + req.end(); + }); +} + function httpJsonPostRaw(urlString, payload, timeoutMs) { timeoutMs = typeof timeoutMs === "number" ? timeoutMs : 6000; @@ -2481,7 +2592,7 @@ AIBridge.prototype._sendText = function (text, source) { var t0 = Date.now(); var url = self.serverBaseUrl.replace(/\/+$/, "") + "/v1/chat/text"; if (rlog) { - rlog.info("ai-bridge", "sending text", { + rlog.info("ai-bridge", "sending text (streaming)", { source: source || "text", chars: String(text).length, url: url, @@ -2489,15 +2600,71 @@ AIBridge.prototype._sendText = function (text, source) { }); } - return httpJsonPost(url, { text: text }) - .then(function (resp) { + // TTS sentence queue: speak sentences in order, overlapping network + TTS. + // This is how the original Jibo system achieved sub-5-second perceived latency: + // start speaking the first sentence while the LLM is still generating the rest. + var ttsQueue = []; + var ttsRunning = false; + var firstSentenceAt = 0; + + function drainTtsQueue() { + if (ttsRunning) return; + if (ttsQueue.length === 0) return; + ttsRunning = true; + var sentence = ttsQueue.shift(); + if (!firstSentenceAt) { + firstSentenceAt = Date.now(); if (rlog) { - rlog.info("ai-bridge", "text request complete", { ms: Date.now() - t0, ok: !!(resp && resp.reply) }); + rlog.info("ai-bridge", "first sentence TTS start", { + ms: firstSentenceAt - t0, + chars: String(sentence).length, + sentence: String(sentence).slice(0, 120), + }); } - var reply = resp && resp.reply ? String(resp.reply) : ""; - if (!reply) return { reply: "" }; - return self._speak(reply).then(function () { - return { reply: reply }; + } + self._speak(sentence).then(function () { + ttsRunning = false; + drainTtsQueue(); + }).catch(function () { + ttsRunning = false; + drainTtsQueue(); + }); + } + + function onSentence(sentence) { + if (!sentence || !String(sentence).trim()) return; + if (rlog) { + rlog.info("ai-bridge", "stream sentence received", { + ms: Date.now() - t0, + chars: String(sentence).length, + sentence: String(sentence).slice(0, 120), + queueLen: ttsQueue.length, + }); + } + ttsQueue.push(String(sentence)); + drainTtsQueue(); + } + + return httpStreamingPost(url, { text: text }, onSentence) + .then(function (fullReply) { + if (rlog) { + rlog.info("ai-bridge", "streaming request complete", { + ms: Date.now() - t0, + firstSentenceMs: firstSentenceAt ? firstSentenceAt - t0 : null, + replyChars: String(fullReply).length, + }); + } + + // Wait for remaining TTS queue to finish before releasing in-flight. + return new Promise(function (resolve) { + function waitForTts() { + if (ttsQueue.length === 0 && !ttsRunning) { + resolve({ reply: fullReply || "" }); + } else { + setTimeout(waitForTts, 100); + } + } + waitForTts(); }); }) .catch(function (e) { diff --git a/V3.1/build/opt/jibo/Jibo/Skills/@be/be/be/rosbridge.js b/V3.1/build/opt/jibo/Jibo/Skills/@be/be/be/rosbridge.js index dc2c9d55..6863fd31 100644 --- a/V3.1/build/opt/jibo/Jibo/Skills/@be/be/be/rosbridge.js +++ b/V3.1/build/opt/jibo/Jibo/Skills/@be/be/be/rosbridge.js @@ -13,6 +13,7 @@ var state = { ws: null, subId: null, reconnectTimer: null, + lastProcessed: {}, }; // Robot logger (available on BE runtime) @@ -43,6 +44,51 @@ function parseWsUrl(s) { try { return String(s || '').trim(); } catch (e) { return DEFAULT_WS; } } +function getCandidateWsUrls(beRuntime) { + var list = []; + try { + var envUrl = process.env.ROSBRIDGE_WS; + if (envUrl) list.push(parseWsUrl(envUrl)); + } catch (e) {} + try { + var cfg = beRuntime && beRuntime.config && beRuntime.config.rosbridge && beRuntime.config.rosbridge.ws; + if (cfg) list.push(parseWsUrl(cfg)); + } catch (e) {} + + // Common fallbacks + list.push('ws://127.0.0.1:9090'); + list.push(DEFAULT_WS); + + // Attempt gateway-derived host if available + try { + var os = require('os'); + var ifaces = os.networkInterfaces(); + Object.keys(ifaces || {}).forEach(function (k) { + (ifaces[k] || []).forEach(function (info) { + if (!info || info.internal || info.family !== 'IPv4') return; + var parts = String(info.address).split('.'); + if (parts.length === 4) { + // guess the gateway as .1 + parts[3] = '1'; + list.push('ws://' + parts.join('.') + ':9090'); + } + }); + }); + } catch (e) {} + + // Deduplicate while keeping order + var seen = {}; + var out = []; + for (var i = 0; i < list.length; i++) { + try { + var v = String(list[i] || '').trim(); + if (!v) continue; + if (!seen[v]) { seen[v] = true; out.push(v); } + } catch (e) {} + } + return out; +} + function sendWs(obj) { try { if (!state.ws || state.ws.readyState !== 1) { @@ -103,8 +149,29 @@ function connect(wsUrl, onMessage) { try { data = JSON.parse(evt.data); } catch (e) { rlogWarn('rosbridge', 'json parse failed', { err: String(e), raw: String(evt && evt.data) }); return; } // rosbridge wraps messages with { op: 'publish', topic: '...', msg: {...} } if (data && data.op === 'publish') { - rlogInfo('rosbridge', 'publish received', { topic: data.topic, msg: data.msg }); - if (data.msg) onMessage && onMessage(data.msg, data.topic); + try { + var topic = data.topic || 'unknown'; + // Throttle frequent messages per-topic to avoid blocking the BE event loop. + var minMs = parseInt(process.env.ROSBRIDGE_MIN_INTERVAL_MS || '200', 10) || 200; + var now = Date.now(); + var last = state.lastProcessed[topic] || 0; + if (now - last < minMs) { + rlogInfo('rosbridge', 'throttled publish', { topic: topic, droppedMs: now - last, minMs: minMs }); + return; + } + state.lastProcessed[topic] = now; + + // Defer handling so heavy work doesn't block the socket message parser. + var handler = function () { + try { + rlogInfo('rosbridge', 'publish received', { topic: data.topic, msg: data.msg }); + if (data.msg) onMessage && onMessage(data.msg, data.topic); + } catch (e) { rlogWarn('rosbridge', 'publish handler error', { err: String(e) }); } + }; + if (typeof setImmediate === 'function') setImmediate(handler); else setTimeout(handler, 0); + } catch (e) { + rlogWarn('rosbridge', 'error handling publish', { err: String(e) }); + } return; } rlogInfo('rosbridge', 'ws message', { op: data && data.op, data: data }); @@ -124,6 +191,82 @@ function scheduleReconnect(wsUrl, onMessage) { }, 5000); } +// Try a list of candidate URLs sequentially until one connects. +function connectToCandidates(beRuntime, onMessage) { + var candidates = getCandidateWsUrls(beRuntime); + var idx = 0; + + function tryNext() { + if (state.ws && state.ws.readyState === 1) return; // already connected + if (idx >= candidates.length) { + rlogWarn('rosbridge', 'no candidates left, will schedule reconnect'); + scheduleReconnect(candidates[0], onMessage); + return; + } + var url = candidates[idx++]; + rlogInfo('rosbridge', 'trying candidate', { url: url }); + + // attempt connect and use a short timeout to move to next candidate + var tried = false; + var timeout = setTimeout(function () { + if (tried) return; + tried = true; + try { if (state.ws) state.ws.close(); } catch (e) {} + rlogWarn('rosbridge', 'candidate timeout, trying next', { url: url }); + // small delay before next + setTimeout(tryNext, 250); + }, 3500); + + try { + // reuse existing connect path but attach temporary handlers + var prevOnOpen = state.ws && state.ws.onopen; + connect(url, function (msg, topic) { + clearTimeout(timeout); + onMessage && onMessage(msg, topic); + }); + // when open, cancel other attempts + (function (u) { + var wsInst = state.ws; + if (!wsInst) return; + var origOnOpen = wsInst.onopen; + wsInst.onopen = function (ev) { + clearTimeout(timeout); + rlogInfo('rosbridge', 'connected candidate', { url: u }); + try { if (typeof origOnOpen === 'function') origOnOpen.call(wsInst, ev); } catch (e) {} + }; + // if it closes or errors before open, try next + var origOnClose = wsInst.onclose; + wsInst.onclose = function (ev) { + clearTimeout(timeout); + if (!tried) { + tried = true; + rlogWarn('rosbridge', 'candidate closed before ready, next', { url: u }); + setTimeout(tryNext, 250); + } + try { if (typeof origOnClose === 'function') origOnClose.call(wsInst, ev); } catch (e) {} + }; + var origOnError = wsInst.onerror; + wsInst.onerror = function (err) { + clearTimeout(timeout); + if (!tried) { + tried = true; + rlogWarn('rosbridge', 'candidate error, next', { url: u, err: String(err) }); + try { if (wsInst) wsInst.close(); } catch (e) {} + setTimeout(tryNext, 250); + } + try { if (typeof origOnError === 'function') origOnError.call(wsInst, err); } catch (e) {} + }; + })(url); + } catch (e) { + clearTimeout(timeout); + rlogWarn('rosbridge', 'connect threw, trying next', { url: url, err: String(e) }); + setTimeout(tryNext, 250); + } + } + + tryNext(); +} + function close() { rlogInfo('rosbridge', 'close requested'); try { unsubscribe(); } catch (e) { rlogWarn('rosbridge', 'unsubscribe failed', { err: String(e) }); } @@ -255,8 +398,7 @@ exports.init = function (beRuntime, jibo) { jibo.tts.speak(payload, { mode: jibo.tts.TTSMode ? jibo.tts.TTSMode.SSML : undefined }); } else if (jibo && jibo.tts && typeof jibo.tts.speak === 'function') { jibo.tts.speak(String(t), { mode: jibo.tts.TTSMode ? jibo.tts.TTSMode.TEXT : undefined }); - } else if (beRuntime && beRuntime.api && typeof beRuntime.api.speak === 'function') { - beRuntime.api.speak({ text: String(t), mode: useEsml ? 'ssml' : 'text' }); + } else if (beRuntime && beRuntime.api && typeof beRuntime.api.speak === 'function') { beRuntime.api.speak({ text: String(t), mode: useEsml ? 'ssml' : 'text' }); } else if (jibo && jibo.api && typeof jibo.api.speak === 'function') { jibo.api.speak({ text: String(t), mode: useEsml ? 'ssml' : 'text' }); } else {