Files
JiboOs/V3.1/build/hub-shim/index.js

783 lines
25 KiB
JavaScript

/* eslint-disable no-console */
const fs = require('fs');
const http = require('http');
const https = require('https');
const crypto = require('crypto');
let WebSocket;
function requireWs() {
try {
// Prefer a normal node resolution (server install, or local node_modules).
return require('ws');
} catch (_) {
// Fallback for on-robot usage where `ws` already exists under the skills runtime.
return require('/opt/jibo/Jibo/Skills/@be/be/node_modules/ws');
}
}
WebSocket = requireWs();
function nowMs() {
return Date.now();
}
function uuid() {
// Node 14+: crypto.randomUUID exists; otherwise fallback.
if (typeof crypto.randomUUID === 'function') return crypto.randomUUID();
return [4, 2, 2, 2, 6].map((len) => crypto.randomBytes(len).toString('hex')).join('-');
}
function loadJson(filePath) {
const raw = fs.readFileSync(filePath, 'utf8');
return JSON.parse(raw);
}
function sleep(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
function logFactory(level) {
const order = { error: 0, warn: 1, info: 2, debug: 3 };
const threshold = order[level] ?? 2;
return {
error: (...args) => threshold >= 0 && console.error('[hub-shim]', ...args),
warn: (...args) => threshold >= 1 && console.warn('[hub-shim]', ...args),
info: (...args) => threshold >= 2 && console.log('[hub-shim]', ...args),
debug: (...args) => threshold >= 3 && console.log('[hub-shim]', ...args),
};
}
function normalizeText(text) {
return String(text || '').trim();
}
function classifyYesNo(text) {
const t = normalizeText(text).toLowerCase();
// Negative first to avoid matching "no" in "not"?? Keep it simple.
if (/\b(no|nope|nah|not really|don't|do not|didn't|did not)\b/.test(t)) return 'no';
if (/\b(yes|yep|yeah|ya|sure|ok|okay|i did|i do|i liked it|liked it|that was good|that was great|great|good|awesome|amazing)\b/.test(t)) return 'yes';
return '';
}
function buildAsrResult(text) {
return {
text,
confidence: 1,
// Keep these optional fields minimal.
alternates: [],
};
}
function buildNluResult(intent, rules = [], entities = {}, slotActionsOverride) {
const normalizedIntent = intent || '';
const outEntities = entities && typeof entities === 'object' ? { ...entities } : {};
// Many flows either use `intent` directly or compute firstGrammarTag from
// entities[slotActions[0]]. Keep defaults backwards compatible, but allow
// callers to provide explicit slotActions for launch-style grammars.
let slotActions = [];
if (Array.isArray(slotActionsOverride) && slotActionsOverride.length) {
slotActions = slotActionsOverride.map((s) => String(s)).filter(Boolean);
} else if (normalizedIntent) {
slotActions = [normalizedIntent];
if (outEntities[normalizedIntent] == null) outEntities[normalizedIntent] = normalizedIntent;
}
return {
rules: Array.isArray(rules) ? rules : [],
intent: normalizedIntent,
entities: outEntities,
slotActions,
source: 'hub-shim',
confidence: normalizedIntent ? 1 : 0,
};
}
function hasRule(rules, want) {
if (!Array.isArray(rules) || !want) return false;
const w = String(want).toLowerCase();
for (const r of rules) {
const s = String(r || '').toLowerCase();
if (!s) continue;
if (s === w) return true;
if (s.endsWith('/' + w)) return true;
// Common aliasing patterns.
if (w === 'launch' && (s.includes('global_commands_launch') || s.includes('commands_launch'))) return true;
if (w === 'dance' && s.includes('tutorial/dance')) return true;
if ((w === 'take_photo' || w === 'photo') && (s.includes('tutorial/take_photo') || s.includes('take_photo'))) return true;
}
return false;
}
function inferNluFromText(text, rules) {
// Minimal, rule-ish: only try to satisfy common skills.
const yn = classifyYesNo(text);
if (yn) {
// Many flows use the intent string as grammar tag.
return buildNluResult(yn, rules, {});
}
// If Jetstream provided tutorial/global rules, prefer returning the rule itself as intent.
// This matches flows that transition on a specific grammar tag.
if (Array.isArray(rules)) {
const t = normalizeText(text).toLowerCase();
// Many builds pass short rule names (e.g., 'launch', 'dance') instead of full paths.
if (hasRule(rules, 'dance') && /\bdance\b/.test(t)) return buildNluResult('dance', rules, {});
if (hasRule(rules, 'take_photo') && /\b(photo|picture|take a photo|take a picture)\b/.test(t)) return buildNluResult('take_photo', rules, {});
// Launch/global command grammar: some builds only provide 'launch' + globals/global_commands_launch
// for many commands. When the utterance clearly matches a known command, emit that intent so flows
// do not treat it as a generic launch request.
if (hasRule(rules, 'launch')) {
// Many global command handlers expect a launch intent with a target skill in entities.skill.
if (/\bdance\b/.test(t)) return buildNluResult('launch', rules, { skill: 'dance', query: normalizeText(text) }, ['skill']);
if (/\b(photo|picture|take a photo|take a picture|selfie)\b/.test(t)) return buildNluResult('launch', rules, { skill: 'photobooth', query: normalizeText(text) }, ['skill']);
// Otherwise: route to chitchat so the robot actually speaks an answer.
// We keep intent='launch' but provide a concrete skill target.
return buildNluResult('launch', rules, { skill: '@be/chitchat', domain: 'chitchat', query: normalizeText(text) }, ['skill']);
}
}
// Simple tutorial-ish intents.
const t = normalizeText(text).toLowerCase();
if (/\b(dance|do a dance)\b/.test(t)) return buildNluResult('dance', rules, {});
if (/\b(photo|picture|take a photo|take a picture)\b/.test(t)) return buildNluResult('take_photo', rules, {});
// If rules indicate yes/no, but we couldn't classify, mark noMatch.
if (Array.isArray(rules) && rules.some((r) => /yes[_-]?no/i.test(r))) {
return buildNluResult('', rules, {});
}
return buildNluResult('', rules, {});
}
function httpJsonPostRaw(urlString, payload, timeoutMs) {
timeoutMs = typeof timeoutMs === 'number' ? timeoutMs : 6000;
const u = new URL(urlString);
const bodyStr = JSON.stringify(payload || {});
const body = Buffer.from(bodyStr, 'utf8');
const isHttps = u.protocol === 'https:';
const requestOptions = {
protocol: u.protocol,
hostname: u.hostname,
port: u.port || (isHttps ? 443 : 80),
path: u.pathname + (u.search || ''),
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Content-Length': body.length,
},
timeout: timeoutMs,
};
return new Promise((resolve, reject) => {
const req = (isHttps ? https : http).request(requestOptions, (res) => {
const chunks = [];
res.on('data', (d) => chunks.push(d));
res.on('end', () => {
resolve({
statusCode: res.statusCode,
headers: res.headers || {},
body: Buffer.concat(chunks).toString('utf8'),
});
});
});
req.on('error', reject);
req.on('timeout', () => {
req.destroy(new Error('POST timeout'));
});
req.write(body);
req.end();
});
}
function readJsonBody(req, maxBytes = 256 * 1024) {
return new Promise((resolve, reject) => {
let total = 0;
const chunks = [];
req.on('data', (d) => {
total += d.length || 0;
if (total > maxBytes) {
reject(new Error('request body too large'));
try { req.destroy(); } catch (_) { /* ignore */ }
return;
}
chunks.push(d);
});
req.on('end', () => {
const raw = Buffer.concat(chunks).toString('utf8');
if (!raw) return resolve({});
try {
resolve(JSON.parse(raw));
} catch (e) {
reject(new Error('invalid json body'));
}
});
req.on('error', reject);
});
}
async function ollamaAnswer({ baseUrl, model, system, prompt, timeoutMs }) {
baseUrl = String(baseUrl || 'http://127.0.0.1:11434').replace(/\/+$/, '');
model = String(model || 'llama3');
system = typeof system === 'string' ? system : 'Answer conversationally and concisely.';
prompt = String(prompt || '').trim();
if (!prompt) return '';
// Prefer /api/generate; fallback to /api/chat.
try {
const r = await httpJsonPostRaw(`${baseUrl}/api/generate`, {
model,
system,
prompt,
stream: false,
}, timeoutMs);
if (r.statusCode >= 200 && r.statusCode < 300) {
const obj = JSON.parse(r.body || '{}');
return String(obj.response || '').trim();
}
} catch (_) {
// fall through
}
const r2 = await httpJsonPostRaw(`${baseUrl}/api/chat`, {
model,
stream: false,
messages: [
{ role: 'system', content: system },
{ role: 'user', content: prompt },
],
}, timeoutMs);
if (!(r2.statusCode >= 200 && r2.statusCode < 300)) {
throw new Error(`ollama http ${r2.statusCode}`);
}
const obj2 = JSON.parse(r2.body || '{}');
return String(obj2.message && obj2.message.content ? obj2.message.content : '').trim();
}
function createGqaShim(config, logger) {
const gqa = (config && config.gqaShim) || {};
const enabled = !!gqa.enabled || String(process.env.JIBO_GQA_SHIM_ENABLE || '').toLowerCase() === '1' || String(process.env.JIBO_GQA_SHIM_ENABLE || '').toLowerCase() === 'true';
if (!enabled) return null;
const bindHost = gqa.bindHost || process.env.JIBO_GQA_SHIM_BIND_HOST || '0.0.0.0';
const port = Number(gqa.port || process.env.JIBO_GQA_SHIM_PORT || 8080);
const timeoutMs = Number(gqa.timeoutMs || process.env.JIBO_GQA_SHIM_TIMEOUT_MS || 20000);
const ollama = (gqa.ollama || {});
const ollamaBaseUrl = ollama.baseUrl || process.env.OLLAMA_BASE_URL || 'http://127.0.0.1:11434';
const ollamaModel = ollama.model || process.env.OLLAMA_MODEL || 'llama3';
const systemPrompt = ollama.systemPrompt || process.env.OLLAMA_SYSTEM_PROMPT || 'You are Jibo, a friendly social robot. Reply in 1-2 short spoken sentences.';
const server = http.createServer(async (req, res) => {
try {
if (req.method === 'GET') {
res.writeHead(200, { 'content-type': 'text/plain' });
res.end('jibo-gqa-shim\n');
return;
}
if (req.method !== 'POST') {
res.writeHead(405, { 'content-type': 'application/json' });
res.end(JSON.stringify({ message: 'method not allowed' }));
return;
}
const target = String((req.headers && (req.headers['x-amz-target'] || req.headers['X-Amz-Target'])) || '').trim();
const op = target.includes('.') ? target.split('.').pop() : '';
const body = await readJsonBody(req);
if (op === 'Question') {
const input = (body && (body.Input || body.input)) ? String(body.Input || body.input) : '';
logger.info('gqa Question', { input: input.slice(0, 120) });
let answer = '';
try {
answer = await ollamaAnswer({
baseUrl: ollamaBaseUrl,
model: ollamaModel,
system: systemPrompt,
prompt: input,
timeoutMs,
});
} catch (e) {
logger.warn('ollama failed', { err: String(e && (e.stack || e.message || e)) });
answer = '';
}
const ok = !!answer;
const resp = {
success: ok,
source: ok ? 'MOCK' : 'No Match',
message: ok ? undefined : 'No response',
type: ok ? 'answer' : 'error',
timestamps: {},
response: {
type: 'string',
payload: answer,
},
version: '0.0.1',
};
res.writeHead(200, { 'content-type': 'application/x-amz-json-1.0' });
res.end(JSON.stringify(resp));
return;
}
if (op === 'ListAttribution') {
const resp = { success: true, attributions: [], timestamps: {}, version: '0.0.1' };
res.writeHead(200, { 'content-type': 'application/x-amz-json-1.0' });
res.end(JSON.stringify(resp));
return;
}
res.writeHead(400, { 'content-type': 'application/x-amz-json-1.0' });
res.end(JSON.stringify({ message: 'unknown operation', target }));
} catch (e) {
res.writeHead(500, { 'content-type': 'application/x-amz-json-1.0' });
res.end(JSON.stringify({ message: String(e && (e.message || e)) }));
}
});
logger.info('gqa http', { bindHost, port, ollamaBaseUrl, ollamaModel });
return {
start: () => new Promise((resolve, reject) => server.listen(port, bindHost, (err) => (err ? reject(err) : resolve()))),
stop: () => new Promise((resolve) => server.close(() => resolve())),
};
}
function pickBestAsrUtterance(utterances) {
try {
if (!utterances || !utterances.length) return '';
let bestText = '';
let bestScore = -1e99;
for (const u of utterances) {
let text = u && (u.utterance || u.Utterance || u.text) ? String(u.utterance || u.Utterance || u.text) : '';
text = text.trim();
if (!text) continue;
if (text.length >= 2 && text[0].toLowerCase && text[1].toLowerCase && text[0].toLowerCase() === text[1].toLowerCase()) {
text = text.slice(1);
}
const score = typeof u.score === 'number' ? u.score : (typeof u.Score === 'number' ? u.Score : 0);
if (!bestText || score > bestScore) {
bestText = text;
bestScore = score;
}
}
return bestText;
} catch (_) {
return '';
}
}
function extractFinalTextFromAsrEvent(evt) {
// jibo-asr-service has had multiple JSON shapes across builds. Prefer utterances,
// but accept common fallback fields.
try {
const utterances = evt.utterances || evt.Utterances || (evt.payload && (evt.payload.utterances || evt.payload.Utterances));
const best = pickBestAsrUtterance(utterances);
if (best && String(best).trim()) return String(best).trim();
const candidates = [
evt.text,
evt.transcript,
evt.utterance,
evt.Utterance,
evt.payload && (evt.payload.text || evt.payload.transcript || evt.payload.utterance || evt.payload.Utterance),
];
for (const c of candidates) {
if (typeof c === 'string' && c.trim()) return c.trim();
}
return '';
} catch (_) {
return '';
}
}
async function asrServiceSttOnce(asrBaseUrl, wsPath, timeoutMs, audioSourceId, logger) {
// jibo-asr-service protocol:
// 1) Connect to WS /simple_port (event stream)
// 2) HTTP POST /asr_simple_interface {command:'start', task_id, audio_source_id, ...}
// 3) Wait for event_type=='speech_to_text_final' and extract utterance
// 4) HTTP POST /asr_simple_interface {command:'stop', task_id}
const url = new URL(asrBaseUrl);
const wsUrl = `${url.protocol === 'https:' ? 'wss' : 'ws'}://${url.host}${wsPath}`;
const baseHttp = `${url.protocol}//${url.host}`;
const taskId = `hub-shim-${Date.now()}-${Math.floor(Math.random() * 1e9)}`;
const requestId = `stt_start_${Date.now()}_${Math.floor(Math.random() * 1e9)}`;
logger.debug('asr connect', { wsUrl, baseHttp, taskId });
return await new Promise((resolve, reject) => {
const ws = new WebSocket(wsUrl);
let done = false;
const t0 = nowMs();
let timer;
function finish(err, value) {
if (done) return;
done = true;
if (timer) clearTimeout(timer);
try {
ws.close();
} catch (_) {
// ignore
}
if (err) reject(err);
else resolve(value);
}
async function stopAlways() {
const stopPayload = {
command: 'stop',
task_id: taskId,
request_id: `stt_stop_${Date.now()}_${Math.floor(Math.random() * 1e9)}`,
};
try {
await httpJsonPostRaw(`${baseHttp}/asr_simple_interface`, stopPayload, 6000);
} catch (_) {
// ignore
}
}
timer = setTimeout(() => {
stopAlways().finally(() => {
finish(new Error(`asr timeout after ${timeoutMs}ms`));
});
}, timeoutMs);
ws.on('open', async () => {
const startPayload = {
command: 'start',
task_id: taskId,
audio_source_id: String(audioSourceId || 'alsa1'),
hotphrase: 'none',
speech_to_text: true,
request_id: requestId,
};
try {
const resp = await httpJsonPostRaw(`${baseHttp}/asr_simple_interface`, startPayload, 8000);
if (!resp || !resp.statusCode || resp.statusCode < 200 || resp.statusCode >= 300) {
await stopAlways();
finish(new Error(`asr start failed: HTTP ${resp && resp.statusCode} ${resp && resp.body ? resp.body.slice(0, 200) : ''}`));
return;
}
logger.info('asr start', { ms: nowMs() - t0, status: resp.statusCode });
} catch (e) {
await stopAlways();
finish(e);
}
});
ws.on('message', (data) => {
let evt;
try {
evt = JSON.parse(data.toString('utf8'));
} catch (_) {
return;
}
if (!evt) return;
// Legacy/simple protocols: {type:'final', text:'...'}
const isFinal = evt.type === 'final' || evt.final === true || evt.isFinal === true;
const simpleText = evt.text || evt.transcript;
if (isFinal && typeof simpleText === 'string') {
stopAlways().finally(() => {
logger.info('asr final(simple)', { ms: nowMs() - t0, text: String(simpleText).slice(0, 80) });
finish(null, normalizeText(simpleText));
});
return;
}
const eventType = evt.event_type || evt.eventType || evt.event || evt.type;
if (eventType !== 'speech_to_text_final') return;
// Correlate ids when possible, but do not hard-require a match.
const evTaskId = evt.task_id || evt.taskId || (evt.payload && (evt.payload.task_id || evt.payload.taskId));
const evRequestId = evt.request_id || evt.requestId || (evt.payload && (evt.payload.request_id || evt.payload.requestId));
const idMatches = (!evTaskId && !evRequestId) || (evTaskId && String(evTaskId) === String(taskId)) || (evRequestId && String(evRequestId) === String(requestId));
if (!idMatches) {
logger.debug('asr final id mismatch (accepting)', {
taskId,
requestId,
evTaskId,
evRequestId,
});
}
const finalText = extractFinalTextFromAsrEvent(evt);
if (!finalText) {
logger.debug('asr final but no text extracted', {
keys: Object.keys(evt || {}).slice(0, 30),
payloadKeys: evt && evt.payload ? Object.keys(evt.payload).slice(0, 30) : undefined,
});
return;
}
stopAlways().finally(() => {
logger.info('asr final', { ms: nowMs() - t0, text: String(finalText).slice(0, 80) });
finish(null, normalizeText(finalText));
});
});
ws.on('error', async (e) => {
await stopAlways();
finish(e);
});
ws.on('close', () => {
if (!done) {
finish(new Error('asr ws closed before final'));
}
});
});
}
function send(ws, obj) {
ws.send(JSON.stringify(obj));
}
function extractTransIdFromHeaders(req) {
// Jetstream sets an x-jibo-transid header (see @jibo/interfaces Headers.transID).
const h = req.headers || {};
return h['x-jibo-transid'] || h['X-Jibo-TransId'] || h['x-jibo-transId'] || h['x-jibo-transID'] || '';
}
function createHubShim(configPath) {
const config = loadJson(configPath);
const logger = logFactory(config?.logging?.level || 'info');
const listen = config.listen || {};
const port = Number(listen.port || 9000);
const bindHost = listen.bindHost || '0.0.0.0';
const path = listen.path || '/v1/listen';
const asr = config.asrService || {};
const asrBaseUrl = asr.baseUrl || 'http://127.0.0.1:8088';
const wsPath = asr.wsPath || '/simple_port';
const timeoutMs = Number(asr.timeoutMs || 15000);
const audioSourceId = asr.audioSourceId || 'alsa1';
// The robot ASR service is effectively a shared hardware resource.
// Jetstream may open multiple WS connections/listens concurrently; serialize STT
// to avoid crosstalk and reduce timeouts.
let asrQueue = Promise.resolve();
function runAsrQueued(fn) {
const queuedAt = nowMs();
const run = async () => {
const waited = nowMs() - queuedAt;
if (waited > 50) logger.debug('asr queue wait', { waitedMs: waited });
return await fn();
};
const p = asrQueue.then(run, run);
asrQueue = p.catch(() => undefined);
return p;
}
const server = http.createServer((req, res) => {
res.writeHead(200, { 'content-type': 'text/plain' });
res.end('jibo-hub-shim\n');
});
const wss = new WebSocket.Server({ server, path });
logger.info('listen ws', { bindHost, port, path, asrBaseUrl, audioSourceId });
const gqaShim = createGqaShim(config, logger);
wss.on('connection', (ws, req) => {
const connId = uuid().slice(0, 8);
const transID = extractTransIdFromHeaders(req);
logger.info('ws connected', { connId, transID: transID || undefined });
let binaryFrames = 0;
let binaryBytes = 0;
let lastContext = null;
let pendingListen = null;
async function maybeHandleListen() {
if (!lastContext || !pendingListen) return;
const listenMsg = pendingListen;
const mode = listenMsg?.data?.mode;
// If the client is responsible for supplying ASR or NLU, wait until we receive it.
if (mode === 'CLIENT_ASR' && !listenMsg._clientAsrText) return;
if (mode === 'CLIENT_NLU' && !listenMsg._clientNlu) return;
pendingListen = null;
const t0 = nowMs();
const base = {
msgID: uuid(),
ts: nowMs(),
};
// Emit SOS immediately so the robot transitions into listening.
send(ws, { ...base, type: 'SOS', data: null, final: false });
let text = '';
try {
if (mode === 'CLIENT_ASR') {
text = normalizeText(listenMsg._clientAsrText);
} else {
text = await runAsrQueued(() => asrServiceSttOnce(asrBaseUrl, wsPath, timeoutMs, audioSourceId, logger));
}
} catch (e) {
logger.warn('asr failed', { connId, err: String(e && (e.stack || e.message || e)) });
// Still emit EOS and an empty listen result.
}
const rules = Array.isArray(listenMsg?.data?.rules) ? listenMsg.data.rules : [];
const nluRes =
(mode === 'CLIENT_NLU' && listenMsg._clientNlu)
? listenMsg._clientNlu
: ((config?.nlu?.enabled === false) ? buildNluResult('', rules, {}) : inferNluFromText(text, rules));
const asrRes = buildAsrResult(text);
logger.info('listen result', {
connId,
text: String(text || '').slice(0, 120),
intent: nluRes && nluRes.intent,
slot0: nluRes && Array.isArray(nluRes.slotActions) ? nluRes.slotActions[0] : undefined,
skill: nluRes && nluRes.entities ? nluRes.entities.skill : undefined,
rule0: Array.isArray(rules) ? rules[0] : undefined,
rulesCount: Array.isArray(rules) ? rules.length : 0,
rules: Array.isArray(rules) ? rules.slice(0, 6) : [],
});
// Optionally provide incremental ASR/NLU; Jetstream consumers often listen for these.
send(ws, { ...base, type: 'ASR', data: asrRes, final: false });
send(ws, { ...base, type: 'NLU', data: nluRes, final: false });
// Final listen response.
const listenResp = {
type: 'LISTEN',
msgID: uuid(),
ts: nowMs(),
data: {
asr: asrRes,
nlu: nluRes,
},
final: true,
timings: { total: nowMs() - t0 },
};
send(ws, listenResp);
// Emit EOS to complete the listen lifecycle.
send(ws, { ...base, type: 'EOS', data: null, final: true });
}
ws.on('message', async (data, isBinary) => {
if (isBinary) {
binaryFrames += 1;
try {
binaryBytes += (data && (data.length || data.byteLength)) || 0;
} catch (_) {
// ignore
}
if (binaryFrames <= 3 || (binaryFrames % 50) === 0) {
logger.debug('rx binary', { connId, frames: binaryFrames, bytes: binaryBytes });
}
// Ignore audio/binary frames; this shim does not decode audio.
return;
}
let msg;
try {
msg = JSON.parse(data.toString('utf8'));
} catch (e) {
logger.warn('bad json', { connId, sample: String(data || '').slice(0, 120) });
return;
}
if (!msg || typeof msg.type !== 'string') return;
logger.debug('rx', { connId, type: msg.type });
switch (msg.type) {
case 'CONTEXT':
lastContext = msg.data;
logger.debug('context', {
connId,
hasSkill: !!(msg.data && msg.data.skill),
hasRuntime: !!(msg.data && msg.data.runtime),
});
break;
case 'LISTEN':
pendingListen = msg;
logger.debug('listen req', {
connId,
hotphrase: !!(msg.data && msg.data.hotphrase),
mode: msg.data && msg.data.mode,
rules: Array.isArray(msg.data && msg.data.rules) ? msg.data.rules : [],
});
break;
case 'CLIENT_ASR':
// Accept client-provided text (requires a LISTEN message too).
pendingListen = pendingListen || { type: 'LISTEN', msgID: uuid(), ts: nowMs(), data: { rules: [], mode: 'CLIENT_ASR' } };
pendingListen._clientAsrText = msg.data?.text;
break;
case 'CLIENT_NLU':
pendingListen = pendingListen || { type: 'LISTEN', msgID: uuid(), ts: nowMs(), data: { rules: [], mode: 'CLIENT_NLU' } };
pendingListen._clientNlu = msg.data;
break;
default:
// TRIGGER/proactive not implemented yet.
break;
}
// Normal path: wait for both CONTEXT and LISTEN then handle.
try {
await maybeHandleListen();
} catch (e) {
logger.warn('listen handler error', { connId, err: String(e && (e.stack || e.message || e)) });
}
});
ws.on('close', () => {
logger.info('ws closed', { connId });
});
ws.on('error', (e) => {
logger.warn('ws error', { connId, err: String(e && (e.stack || e.message || e)) });
});
});
return {
start: () => new Promise((resolve, reject) => {
server.listen(port, bindHost, async (err) => {
if (err) return reject(err);
try {
if (gqaShim) await gqaShim.start();
resolve();
} catch (e) {
reject(e);
}
});
}),
stop: () => new Promise((resolve) => {
const done = async () => {
try {
if (gqaShim) await gqaShim.stop();
} finally {
resolve();
}
};
server.close(() => { done().catch(() => resolve()); });
}),
config,
};
}
async function main() {
const configPath = process.argv[2] || process.env.JIBO_HUB_SHIM_CONFIG || './config.json';
if (!fs.existsSync(configPath)) {
console.error('Config file not found:', configPath);
process.exit(2);
}
const shim = createHubShim(configPath);
await shim.start();
// Keep process alive.
for (;;) await sleep(60_000);
}
if (require.main === module) {
main().catch((e) => {
console.error('[hub-shim] fatal:', e && (e.stack || e.message || e));
process.exit(1);
});
}