diff --git a/OpenJibo/docs/development-plan.md b/OpenJibo/docs/development-plan.md index c84caf0..c9beb51 100644 --- a/OpenJibo/docs/development-plan.md +++ b/OpenJibo/docs/development-plan.md @@ -88,6 +88,11 @@ Current websocket scope: - active local prompt preservation so `shared/yes_no`, clock, gallery, and settings prompts can still consume transcript-bearing short replies even when the stock skill reports a local context - binary audio ignored for an existing transID until a fresh `LISTEN` has been seen, preventing context-only or post-speech tails from reopening an endless buffered turn - blank-audio hotphrase turns clear pending listen state and install a short late-audio ignore window +- first GLSM-aligned listener telemetry and recovery slice is now in source: + - derived phase labels (`HJ_LISTENING`, `LISTENING`, `WAIT_LISTEN_FINISHED`, `DISPATCH_DIALOG`, `PROCESS_LISTENER_QUEUE`) + - `glsm_phase_transition` turn diagnostics + - websocket turn events with `glsmPhase` snapshots + - stale pending-listen recovery for long-open no-context/no-audio listens before processing a new hotphrase listen - unknown inbound websocket types dropped silently instead of echoing stock-OS-unknown OpenJibo events - file telemetry and fixture export for HTTP, websocket, and turn captures @@ -145,6 +150,7 @@ Use these sources as evidence, not as code to copy blindly: - User-provided original source snapshot: `..\jibo` when extracted locally - Original Pegasus cloud source inside that snapshot: `pegasus` - Original SDK and skill source inside that snapshot: `sdk` +- Legacy listener flow reference diagram: `..\jibo\sdk\packages\skills-service-manager\resources\state-diagrams\glsm.png` - JiboOS reference tree: `..\JiboOS` - JiboOS skill snapshot: `..\JiboOS\opt\jibo\Jibo\Skills\@be` diff --git a/OpenJibo/docs/feature-backlog.md b/OpenJibo/docs/feature-backlog.md index bb0192d..663f537 100644 --- a/OpenJibo/docs/feature-backlog.md +++ b/OpenJibo/docs/feature-backlog.md @@ -301,6 +301,20 @@ Current release theme: - Follow-up: - live smoke should confirm `cloud version` speaks `1.0.18`, carries `match.skipSurprises = true`, does not stop itself on the word `Jibo`, and settles without a generic `I heard...` reply or a local surprise handoff +### GLSM Listener Flow Capture And Recovery + +- Status: `implemented` +- Tags: `protocol`, `docs` +- Result: + - the legacy listener state machine source (`sdk ... glsm.png`) is now captured in current planning docs + - runtime now emits GLSM-aligned phase snapshots (`HJ_LISTENING`, `LISTENING`, `WAIT_LISTEN_FINISHED`, `DISPATCH_DIALOG`, `PROCESS_LISTENER_QUEUE`) + - turn diagnostics now include `glsm_phase_transition` for phase changes + - websocket telemetry now records `glsmPhase` on binary/context/turn events + - stale pending-listen recovery is now in source so a long-open no-context/no-audio listen can be cleared when the next hotphrase listen arrives +- Follow-up: + - live-capture proof is still required against the recurring blue-ring/stuck-listening sequence + - deeper GLSM parity (`Interrupt Listeners`, launch/global parse branches) should be tackled after this first capture slice is validated on-device + ### End-Of-Skill Surprise Suppression - Status: `implemented` diff --git a/OpenJibo/docs/release-1.0.19-plan.md b/OpenJibo/docs/release-1.0.19-plan.md index 3d4af6d..7263019 100644 --- a/OpenJibo/docs/release-1.0.19-plan.md +++ b/OpenJibo/docs/release-1.0.19-plan.md @@ -119,7 +119,7 @@ Reference: ## Next Queued Task (`2026-05-06`) -Queued next `1.0.19` implementation task: +Queued next `1.0.19` implementation task (now started): - dialog parsing expansion and ambiguity guardrails @@ -129,6 +129,12 @@ Execution focus: - reduce trigger-only captures that drop the rest of the utterance - preserve command-vs-question personality split and local skill payload compatibility - add focused tests for new phrase families and ambiguity boundaries +- keep listener-state observability aligned with the legacy GLSM flow while phrase guardrails are added + +First completed guardrail slice under this queue: + +- GLSM listener flow capture + telemetry mapping +- stale pending-listen recovery path for long-open no-context/no-audio listens ## Next Slices diff --git a/OpenJibo/docs/system-diagram-alignment.md b/OpenJibo/docs/system-diagram-alignment.md index 64f1221..643c46c 100644 --- a/OpenJibo/docs/system-diagram-alignment.md +++ b/OpenJibo/docs/system-diagram-alignment.md @@ -16,6 +16,7 @@ As-of date: `2026-05-06` - Legacy system architecture: `C:\Projects\jibo\pegasus\resources\system_diagram.png` - Legacy generic skill scaffold: `C:\Projects\jibo\pegasus\packages\template-skill\docs\TemplateSkill.png` +- Legacy listener state machine: `C:\Projects\jibo\sdk\packages\skills-service-manager\resources\state-diagrams\glsm.png` ## Template Skill Verdict @@ -45,6 +46,30 @@ Conclusion: do not treat template-skill flow as a port target. Treat it as a sha | `Proactivity Catalog` | in-code candidate lists/weights | explicit catalog service with tuned weights and operator controls | | `Audio Logs` | file telemetry sinks in infrastructure telemetry | hosted indexed capture/retention for multi-operator analysis | +## GLSM Listener Flow Alignment (`2026-05-06`) + +Captured source: + +- `C:\Projects\jibo\sdk\packages\skills-service-manager\resources\state-diagrams\glsm.png` + +First OpenJibo support slice (implemented): + +- explicit derived listener phases are now emitted in cloud diagnostics: + - `HJ_LISTENING` + - `LISTENING` + - `WAIT_LISTEN_FINISHED` + - `DISPATCH_DIALOG` + - `PROCESS_LISTENER_QUEUE` +- turn telemetry now records `glsm_phase_transition` with previous/next state and trigger +- websocket telemetry now includes `glsmPhase` on binary, context, and turn-processed events +- stale pending-listen recovery is now implemented: + - when a pending `LISTEN` stays open long enough with no context/audio, a new hotphrase listen can recover the stuck state before continuing + +Current parity boundary: + +- this slice focuses on listener lifecycle observability plus stuck-listen recovery +- deeper explicit parity states from GLSM (`Interrupt Listeners`, `Handle Launch Parse`, `Handle Global Parse`, `Dispatch Dialog` sub-branches) are next candidates once this capture-driven slice is validated live + ## Where We Were Legacy cloud design was service-oriented around: diff --git a/OpenJibo/src/Jibo.Cloud/dotnet/src/Jibo.Cloud.Application/Services/JiboWebSocketService.cs b/OpenJibo/src/Jibo.Cloud/dotnet/src/Jibo.Cloud.Application/Services/JiboWebSocketService.cs index 9c71791..6a953a6 100644 --- a/OpenJibo/src/Jibo.Cloud/dotnet/src/Jibo.Cloud.Application/Services/JiboWebSocketService.cs +++ b/OpenJibo/src/Jibo.Cloud/dotnet/src/Jibo.Cloud.Application/Services/JiboWebSocketService.cs @@ -25,7 +25,8 @@ public sealed class JiboWebSocketService( var replies = await turnFinalizationService.HandleBinaryAudioAsync(session, envelope, cancellationToken); await telemetrySink.RecordTurnEventAsync(envelope, session, "binary_audio_received", new Dictionary { - ["bytes"] = envelope.Binary?.Length ?? 0 + ["bytes"] = envelope.Binary?.Length ?? 0, + ["glsmPhase"] = WebSocketTurnFinalizationService.ResolveGlsmPhase(session) }, cancellationToken); return replies; } @@ -33,6 +34,8 @@ public sealed class JiboWebSocketService( var parsedType = ReadMessageType(envelope.Text); session.LastMessageType = parsedType; var containsInlineTurnPayload = parsedType == "LISTEN" && ContainsInlineTurnPayload(envelope.Text); + var staleListenRecovered = false; + var staleListenAgeMs = 0; if (parsedType == "LISTEN" && !containsInlineTurnPayload && WebSocketTurnFinalizationService.ShouldIgnoreLateListenSetup(session, envelope.Text)) @@ -57,6 +60,19 @@ public sealed class JiboWebSocketService( return replies; } + if (parsedType == "LISTEN" && + !containsInlineTurnPayload && + WebSocketTurnFinalizationService.TryRecoverStalePendingListen(session, out staleListenAgeMs)) + { + staleListenRecovered = true; + await telemetrySink.RecordTurnEventAsync(envelope, session, "glsm_stale_listen_recovered", new Dictionary + { + ["staleAgeMs"] = staleListenAgeMs, + ["transID"] = session.TurnState.TransId, + ["glsmPhase"] = WebSocketTurnFinalizationService.ResolveGlsmPhase(session) + }, cancellationToken); + } + WebSocketTurnFinalizationService.ObserveIncomingMessage(session, envelope.Text); switch (parsedType) @@ -66,7 +82,8 @@ public sealed class JiboWebSocketService( var replies = await turnFinalizationService.HandleContextAsync(session, envelope, cancellationToken); await telemetrySink.RecordTurnEventAsync(envelope, session, "context_received", new Dictionary { - ["transID"] = session.TurnState.TransId + ["transID"] = session.TurnState.TransId, + ["glsmPhase"] = WebSocketTurnFinalizationService.ResolveGlsmPhase(session) }, cancellationToken); return replies; } @@ -80,7 +97,10 @@ public sealed class JiboWebSocketService( ["messageType"] = parsedType, ["replyCount"] = replies.Count, ["transcript"] = session.LastTranscript, - ["intent"] = session.LastIntent + ["intent"] = session.LastIntent, + ["glsmPhase"] = WebSocketTurnFinalizationService.ResolveGlsmPhase(session), + ["staleListenRecovered"] = staleListenRecovered, + ["staleListenAgeMs"] = staleListenAgeMs }, cancellationToken); return replies; } @@ -92,7 +112,8 @@ public sealed class JiboWebSocketService( ["messageType"] = parsedType, ["replyCount"] = replies.Count, ["transcript"] = session.LastTranscript, - ["intent"] = session.LastIntent + ["intent"] = session.LastIntent, + ["glsmPhase"] = WebSocketTurnFinalizationService.ResolveGlsmPhase(session) }, cancellationToken); return replies; } diff --git a/OpenJibo/src/Jibo.Cloud/dotnet/src/Jibo.Cloud.Application/Services/WebSocketTurnFinalizationService.cs b/OpenJibo/src/Jibo.Cloud/dotnet/src/Jibo.Cloud.Application/Services/WebSocketTurnFinalizationService.cs index 872be39..ff20b57 100644 --- a/OpenJibo/src/Jibo.Cloud/dotnet/src/Jibo.Cloud.Application/Services/WebSocketTurnFinalizationService.cs +++ b/OpenJibo/src/Jibo.Cloud/dotnet/src/Jibo.Cloud.Application/Services/WebSocketTurnFinalizationService.cs @@ -14,9 +14,11 @@ public sealed partial class WebSocketTurnFinalizationService( { private const int AutoFinalizeMinBufferedAudioBytes = 15000; private const int AutoFinalizeMinBufferedAudioChunks = 5; + private const string GlsmPhaseMetadataKey = "glsmPhase"; private static readonly TimeSpan AutoFinalizeMinTurnAge = TimeSpan.FromMilliseconds(1800); private static readonly TimeSpan AutoFinalizeMissingTranscriptFallbackAge = TimeSpan.FromMilliseconds(4200); private static readonly TimeSpan AutoFinalizeContinuationDeferralMaxAge = TimeSpan.FromMilliseconds(3600); + private static readonly TimeSpan StaleListenSetupRecoveryAge = TimeSpan.FromSeconds(9); private const int AutoFinalizeContinuationDeferralMaxAttempts = 2; private static readonly HashSet PegasusAffinityContinuationStems = new(StringComparer.Ordinal) { @@ -61,54 +63,61 @@ public sealed partial class WebSocketTurnFinalizationService( WebSocketMessageEnvelope envelope, CancellationToken cancellationToken = default) { - var turnState = session.TurnState; - var ignoreLateAudio = ShouldIgnoreLateAudio(session); - var ignoreAudioWithoutListen = ShouldIgnoreAudioWithoutListen(turnState); - if (ignoreLateAudio || ignoreAudioWithoutListen) + try { - await sink.RecordTurnDiagnosticAsync("binary_audio_ignored", BuildTurnDiagnosticSnapshot(session, envelope, new Dictionary + var turnState = session.TurnState; + var ignoreLateAudio = ShouldIgnoreLateAudio(session); + var ignoreAudioWithoutListen = ShouldIgnoreAudioWithoutListen(turnState); + if (ignoreLateAudio || ignoreAudioWithoutListen) + { + await sink.RecordTurnDiagnosticAsync("binary_audio_ignored", BuildTurnDiagnosticSnapshot(session, envelope, new Dictionary + { + ["ignored"] = true, + ["ignoreLateAudio"] = ignoreLateAudio, + ["ignoreAudioWithoutListen"] = ignoreAudioWithoutListen, + ["awaitingTurnCompletion"] = turnState.AwaitingTurnCompletion, + ["bufferedAudioBytes"] = turnState.BufferedAudioBytes, + ["bufferedAudioChunks"] = turnState.BufferedAudioChunkCount, + ["sawListen"] = turnState.SawListen, + ["sawContext"] = turnState.SawContext + }), cancellationToken); + return []; + } + + session.LastMessageType = "BINARY_AUDIO"; + turnState.FirstAudioReceivedUtc ??= DateTimeOffset.UtcNow; + turnState.BufferedAudioChunkCount += 1; + turnState.BufferedAudioBytes += envelope.Binary?.Length ?? 0; + if (envelope.Binary is { Length: > 0 }) + { + turnState.BufferedAudioFrames.Add([.. envelope.Binary]); + } + turnState.LastAudioReceivedUtc = DateTimeOffset.UtcNow; + turnState.AwaitingTurnCompletion = true; + session.Metadata["lastAudioBytes"] = envelope.Binary?.Length ?? 0; + await sink.RecordTurnDiagnosticAsync("binary_audio_received", BuildTurnDiagnosticSnapshot(session, envelope, new Dictionary { - ["ignored"] = true, - ["ignoreLateAudio"] = ignoreLateAudio, - ["ignoreAudioWithoutListen"] = ignoreAudioWithoutListen, - ["awaitingTurnCompletion"] = turnState.AwaitingTurnCompletion, ["bufferedAudioBytes"] = turnState.BufferedAudioBytes, ["bufferedAudioChunks"] = turnState.BufferedAudioChunkCount, + ["awaitingTurnCompletion"] = turnState.AwaitingTurnCompletion, ["sawListen"] = turnState.SawListen, - ["sawContext"] = turnState.SawContext + ["sawContext"] = turnState.SawContext, + ["listenRules"] = turnState.ListenRules, + ["listenAsrHints"] = turnState.ListenAsrHints, + ["yesNoRule"] = turnState.ListenRules.FirstOrDefault(IsConstrainedYesNoRule) }), cancellationToken); + + if (ShouldAutoFinalize(session)) + { + return await FinalizeTurnAsync(session, envelope, "AUTO_FINALIZE", allowFallbackOnMissingTranscript: true, cancellationToken); + } + return []; } - - session.LastMessageType = "BINARY_AUDIO"; - turnState.FirstAudioReceivedUtc ??= DateTimeOffset.UtcNow; - turnState.BufferedAudioChunkCount += 1; - turnState.BufferedAudioBytes += envelope.Binary?.Length ?? 0; - if (envelope.Binary is { Length: > 0 }) + finally { - turnState.BufferedAudioFrames.Add([.. envelope.Binary]); + await TrackGlsmPhaseAsync(session, envelope, "binary_audio", cancellationToken); } - turnState.LastAudioReceivedUtc = DateTimeOffset.UtcNow; - turnState.AwaitingTurnCompletion = true; - session.Metadata["lastAudioBytes"] = envelope.Binary?.Length ?? 0; - await sink.RecordTurnDiagnosticAsync("binary_audio_received", BuildTurnDiagnosticSnapshot(session, envelope, new Dictionary - { - ["bufferedAudioBytes"] = turnState.BufferedAudioBytes, - ["bufferedAudioChunks"] = turnState.BufferedAudioChunkCount, - ["awaitingTurnCompletion"] = turnState.AwaitingTurnCompletion, - ["sawListen"] = turnState.SawListen, - ["sawContext"] = turnState.SawContext, - ["listenRules"] = turnState.ListenRules, - ["listenAsrHints"] = turnState.ListenAsrHints, - ["yesNoRule"] = turnState.ListenRules.FirstOrDefault(IsConstrainedYesNoRule) - }), cancellationToken); - - if (ShouldAutoFinalize(session)) - { - return await FinalizeTurnAsync(session, envelope, "AUTO_FINALIZE", allowFallbackOnMissingTranscript: true, cancellationToken); - } - - return []; } public async Task> HandleContextAsync( @@ -116,34 +125,40 @@ public sealed partial class WebSocketTurnFinalizationService( WebSocketMessageEnvelope envelope, CancellationToken cancellationToken = default) { - var turnState = session.TurnState; - turnState.SawContext = true; - turnState.ContextPayload = ExtractDataPayload(envelope.Text); - session.Metadata["context"] = turnState.ContextPayload; - - if (TryReadContextProperty(envelope.Text, "audioTranscriptHint", out var transcriptHint) && - !string.IsNullOrWhiteSpace(transcriptHint)) + try { - turnState.AudioTranscriptHint = transcriptHint; - session.Metadata["audioTranscriptHint"] = transcriptHint; - } + var turnState = session.TurnState; + turnState.SawContext = true; + turnState.ContextPayload = ExtractDataPayload(envelope.Text); + session.Metadata["context"] = turnState.ContextPayload; + + if (TryReadContextProperty(envelope.Text, "audioTranscriptHint", out var transcriptHint) && + !string.IsNullOrWhiteSpace(transcriptHint)) + { + turnState.AudioTranscriptHint = transcriptHint; + session.Metadata["audioTranscriptHint"] = transcriptHint; + } + + if (ShouldIgnorePassiveLocalSkillContext(session, envelope.Text)) + { + turnState.AwaitingTurnCompletion = false; + turnState.IgnoreAdditionalAudioUntilUtc = DateTimeOffset.UtcNow.Add(WebSocketTurnState.DefaultLateAudioIgnoreWindow); + ResetBufferedAudio(session); + ClearListenTracking(turnState); + return []; + } + + if (ShouldAutoFinalize(session)) + { + return await FinalizeTurnAsync(session, envelope, "AUTO_FINALIZE", allowFallbackOnMissingTranscript: true, cancellationToken); + } - if (ShouldIgnorePassiveLocalSkillContext(session, envelope.Text)) - { - turnState.AwaitingTurnCompletion = false; - turnState.IgnoreAdditionalAudioUntilUtc = DateTimeOffset.UtcNow.Add(WebSocketTurnState.DefaultLateAudioIgnoreWindow); - ResetBufferedAudio(session); - turnState.SawListen = false; - turnState.SawContext = false; return []; } - - if (ShouldAutoFinalize(session)) + finally { - return await FinalizeTurnAsync(session, envelope, "AUTO_FINALIZE", allowFallbackOnMissingTranscript: true, cancellationToken); + await TrackGlsmPhaseAsync(session, envelope, "context", cancellationToken); } - - return []; } public async Task> HandleTurnAsync( @@ -167,8 +182,8 @@ public sealed partial class WebSocketTurnFinalizationService( session.TurnState.IgnoreAdditionalAudioUntilUtc = DateTimeOffset.UtcNow.Add(WebSocketTurnState.DefaultLateAudioIgnoreWindow); session.FollowUpExpiresUtc = null; ResetBufferedAudio(session); - session.TurnState.SawListen = false; - session.TurnState.SawContext = false; + ClearListenTracking(session.TurnState); + UpdateGlsmPhaseMarker(session); return [.. ResponsePlanToSocketMessagesMapper.MapNoInputAndRedirectToSkill( session.TurnState.TransId ?? session.LastTransId ?? string.Empty, session.TurnState.ListenRules, @@ -181,6 +196,8 @@ public sealed partial class WebSocketTurnFinalizationService( } session.TurnState.AwaitingTurnCompletion = true; + session.TurnState.ListenOpenedUtc ??= DateTimeOffset.UtcNow; + UpdateGlsmPhaseMarker(session); return []; } @@ -275,6 +292,7 @@ public sealed partial class WebSocketTurnFinalizationService( string.Equals(type.GetString(), "LISTEN", StringComparison.OrdinalIgnoreCase)) { turnState.SawListen = true; + turnState.ListenOpenedUtc ??= DateTimeOffset.UtcNow; } if (root.TryGetProperty("transID", out var transId) && transId.ValueKind == JsonValueKind.String) @@ -351,6 +369,7 @@ public sealed partial class WebSocketTurnFinalizationService( turnState.TransId = transId; turnState.ContextPayload = null; turnState.AudioTranscriptHint = null; + turnState.ListenOpenedUtc = null; turnState.LastSttError = null; turnState.LastSttErrorUtc = null; turnState.FirstAudioReceivedUtc = null; @@ -376,36 +395,37 @@ public sealed partial class WebSocketTurnFinalizationService( bool allowFallbackOnMissingTranscript, CancellationToken cancellationToken) { - var turn = ProtocolToTurnContextMapper.MapListenMessage(envelope, session, messageType); - var turnState = session.TurnState; - if (IsYesNoTurn(turn) || ReadPrimaryYesNoRule(turn) is not null) + try { - await sink.RecordTurnDiagnosticAsync("yes_no_turn_received", BuildTurnDiagnosticSnapshot(session, envelope, new Dictionary + var turn = ProtocolToTurnContextMapper.MapListenMessage(envelope, session, messageType); + var turnState = session.TurnState; + if (IsYesNoTurn(turn) || ReadPrimaryYesNoRule(turn) is not null) { - ["messageType"] = messageType, - ["listenRules"] = ReadRules(turn, "listenRules").ToArray(), - ["clientRules"] = ReadRules(turn, "clientRules").ToArray(), - ["listenAsrHints"] = ReadRules(turn, "listenAsrHints").ToArray(), - ["yesNoRule"] = ReadPrimaryYesNoRule(turn), - ["awaitingTurnCompletion"] = turnState.AwaitingTurnCompletion, - ["bufferedAudioBytes"] = turnState.BufferedAudioBytes, - ["bufferedAudioChunks"] = turnState.BufferedAudioChunkCount, - ["sawListen"] = turnState.SawListen, - ["sawContext"] = turnState.SawContext, - ["followUpOpen"] = session.FollowUpOpen, - ["followUpExpiresUtc"] = session.FollowUpExpiresUtc - }), cancellationToken); - } - if (ShouldIgnoreBlankAudioHotphraseTurn(turn)) - { - session.TurnState.AwaitingTurnCompletion = false; - session.TurnState.IgnoreAdditionalAudioUntilUtc = DateTimeOffset.UtcNow.Add(WebSocketTurnState.DefaultLateAudioIgnoreWindow); - session.FollowUpExpiresUtc = null; - ResetBufferedAudio(session); - session.TurnState.SawListen = false; - session.TurnState.SawContext = false; - return []; - } + await sink.RecordTurnDiagnosticAsync("yes_no_turn_received", BuildTurnDiagnosticSnapshot(session, envelope, new Dictionary + { + ["messageType"] = messageType, + ["listenRules"] = ReadRules(turn, "listenRules").ToArray(), + ["clientRules"] = ReadRules(turn, "clientRules").ToArray(), + ["listenAsrHints"] = ReadRules(turn, "listenAsrHints").ToArray(), + ["yesNoRule"] = ReadPrimaryYesNoRule(turn), + ["awaitingTurnCompletion"] = turnState.AwaitingTurnCompletion, + ["bufferedAudioBytes"] = turnState.BufferedAudioBytes, + ["bufferedAudioChunks"] = turnState.BufferedAudioChunkCount, + ["sawListen"] = turnState.SawListen, + ["sawContext"] = turnState.SawContext, + ["followUpOpen"] = session.FollowUpOpen, + ["followUpExpiresUtc"] = session.FollowUpExpiresUtc + }), cancellationToken); + } + if (ShouldIgnoreBlankAudioHotphraseTurn(turn)) + { + session.TurnState.AwaitingTurnCompletion = false; + session.TurnState.IgnoreAdditionalAudioUntilUtc = DateTimeOffset.UtcNow.Add(WebSocketTurnState.DefaultLateAudioIgnoreWindow); + session.FollowUpExpiresUtc = null; + ResetBufferedAudio(session); + ClearListenTracking(session.TurnState); + return []; + } var finalizedTurn = await ResolveTranscriptAsync(turn, session, cancellationToken); if (!IsTranscriptUsable(finalizedTurn)) @@ -445,8 +465,7 @@ public sealed partial class WebSocketTurnFinalizationService( turnState.IgnoreAdditionalAudioUntilUtc = DateTimeOffset.UtcNow.Add(WebSocketTurnState.DefaultLateAudioIgnoreWindow); session.FollowUpExpiresUtc = null; ResetBufferedAudio(session); - turnState.SawListen = false; - turnState.SawContext = false; + ClearListenTracking(turnState); return [.. ResponsePlanToSocketMessagesMapper.MapNoInputAndRedirectToSkill( turnState.TransId ?? session.LastTransId ?? string.Empty, turnState.ListenRules, @@ -483,8 +502,7 @@ public sealed partial class WebSocketTurnFinalizationService( var localRule = ReadPrimaryNoInputRule(finalizedTurn); var noInputReplies = BuildLocalNoInputReplies(session, turnState, localRule); ResetBufferedAudio(session); - turnState.SawListen = false; - turnState.SawContext = false; + ClearListenTracking(turnState); return noInputReplies; } @@ -545,8 +563,7 @@ public sealed partial class WebSocketTurnFinalizationService( .Select(map => new WebSocketReply { Text = map.Text, DelayMs = map.DelayMs }) .ToArray(); ResetBufferedAudio(session); - turnState.SawListen = false; - turnState.SawContext = false; + ClearListenTracking(turnState); return fallbackReplies; } case true when @@ -678,10 +695,14 @@ public sealed partial class WebSocketTurnFinalizationService( }), cancellationToken); } - ResetBufferedAudio(session); - turnState.SawListen = false; - turnState.SawContext = false; - return replies; + ResetBufferedAudio(session); + ClearListenTracking(turnState); + return replies; + } + finally + { + await TrackGlsmPhaseAsync(session, envelope, $"finalize:{messageType}", cancellationToken); + } } private static bool ShouldAutoFinalize(CloudSession session) @@ -708,6 +729,58 @@ public sealed partial class WebSocketTurnFinalizationService( return ShouldIgnoreLateAudio(session) && IsHotphraseLaunchListenSetup(text); } + public static bool TryRecoverStalePendingListen(CloudSession session, out int staleAgeMs) + { + staleAgeMs = 0; + var turnState = session.TurnState; + if (!turnState.AwaitingTurnCompletion || + !turnState.SawListen || + turnState.SawContext || + turnState.BufferedAudioBytes > 0 || + !turnState.ListenOpenedUtc.HasValue) + { + return false; + } + + var age = DateTimeOffset.UtcNow - turnState.ListenOpenedUtc.Value; + if (age < StaleListenSetupRecoveryAge) + { + return false; + } + + staleAgeMs = (int)age.TotalMilliseconds; + turnState.AwaitingTurnCompletion = false; + ResetBufferedAudio(session); + ClearListenTracking(turnState); + turnState.ListenHotphrase = false; + turnState.HotphraseEmptyTurnCount = 0; + UpdateGlsmPhaseMarker(session); + return true; + } + + public static string ResolveGlsmPhase(CloudSession session) + { + var turnState = session.TurnState; + if (!turnState.AwaitingTurnCompletion) + { + return session.FollowUpOpen ? "DISPATCH_DIALOG" : "PROCESS_LISTENER_QUEUE"; + } + + if (turnState.SawListen && !turnState.SawContext && turnState.BufferedAudioBytes == 0) + { + return "HJ_LISTENING"; + } + + if (turnState.SawListen && turnState.SawContext && turnState.BufferedAudioBytes == 0) + { + return "LISTENING"; + } + + return turnState.BufferedAudioBytes > 0 + ? "WAIT_LISTEN_FINISHED" + : "LISTENING"; + } + private static TimeSpan ResolveLateAudioIgnoreWindow(ResponsePlan plan) { return string.Equals(plan.IntentName, "cloud_version", StringComparison.OrdinalIgnoreCase) @@ -1518,6 +1591,53 @@ public sealed partial class WebSocketTurnFinalizationService( return PegasusAffinityContinuationStems.Contains(normalized); } + private static void ClearListenTracking(WebSocketTurnState turnState) + { + turnState.SawListen = false; + turnState.SawContext = false; + turnState.ListenOpenedUtc = null; + } + + private static void UpdateGlsmPhaseMarker(CloudSession session) + { + session.Metadata[GlsmPhaseMetadataKey] = ResolveGlsmPhase(session); + } + + private async Task TrackGlsmPhaseAsync( + CloudSession session, + WebSocketMessageEnvelope envelope, + string trigger, + CancellationToken cancellationToken) + { + var nextPhase = ResolveGlsmPhase(session); + var previousPhase = session.Metadata.TryGetValue(GlsmPhaseMetadataKey, out var rawPhase) + ? rawPhase?.ToString() + : null; + session.Metadata[GlsmPhaseMetadataKey] = nextPhase; + + if (string.Equals(previousPhase, nextPhase, StringComparison.OrdinalIgnoreCase)) + { + return; + } + + try + { + await sink.RecordTurnDiagnosticAsync("glsm_phase_transition", BuildTurnDiagnosticSnapshot(session, envelope, new Dictionary + { + ["trigger"] = trigger, + ["previousState"] = previousPhase, + ["state"] = nextPhase, + ["listenOpenedUtc"] = session.TurnState.ListenOpenedUtc, + ["followUpOpen"] = session.FollowUpOpen, + ["listenRules"] = session.TurnState.ListenRules + }), cancellationToken); + } + catch + { + // Diagnostics should not interrupt turn handling. + } + } + private static Dictionary BuildTurnDiagnosticSnapshot( CloudSession session, WebSocketMessageEnvelope envelope, @@ -1534,6 +1654,7 @@ public sealed partial class WebSocketTurnFinalizationService( details["bufferedAudioChunks"] = session.TurnState.BufferedAudioChunkCount; details["sawListen"] = session.TurnState.SawListen; details["sawContext"] = session.TurnState.SawContext; + details["glsmState"] = ResolveGlsmPhase(session); return details; } diff --git a/OpenJibo/src/Jibo.Cloud/dotnet/src/Jibo.Cloud.Domain/Models/WebSocketTurnState.cs b/OpenJibo/src/Jibo.Cloud/dotnet/src/Jibo.Cloud.Domain/Models/WebSocketTurnState.cs index 4585a35..5110de7 100644 --- a/OpenJibo/src/Jibo.Cloud/dotnet/src/Jibo.Cloud.Domain/Models/WebSocketTurnState.cs +++ b/OpenJibo/src/Jibo.Cloud/dotnet/src/Jibo.Cloud.Domain/Models/WebSocketTurnState.cs @@ -7,6 +7,7 @@ public sealed class WebSocketTurnState public string? TransId { get; set; } public string? ContextPayload { get; set; } + public DateTimeOffset? ListenOpenedUtc { get; set; } public bool ListenHotphrase { get; set; } public int HotphraseEmptyTurnCount { get; set; } public DateTimeOffset? IgnoreAdditionalAudioUntilUtc { get; set; } diff --git a/OpenJibo/tests/Jibo.Cloud.Tests/Turn/FileTurnTelemetrySinkTests.cs b/OpenJibo/tests/Jibo.Cloud.Tests/Turn/FileTurnTelemetrySinkTests.cs index 3f17cd9..9dfe7fe 100644 --- a/OpenJibo/tests/Jibo.Cloud.Tests/Turn/FileTurnTelemetrySinkTests.cs +++ b/OpenJibo/tests/Jibo.Cloud.Tests/Turn/FileTurnTelemetrySinkTests.cs @@ -101,4 +101,49 @@ public sealed class FileTurnTelemetrySinkTests s => s.RecordTranscriptError(It.IsAny(), It.IsAny(), It.IsAny()), Times.Once()); } + + [Fact] + public async Task HandleContext_EmitsGlsmPhaseTransitionDiagnostic() + { + var sink = new Mock(); + sink.Setup(s => s.RecordTurnDiagnosticAsync(It.IsAny(), It.IsAny>(), It.IsAny())) + .Returns(Task.CompletedTask); + var turnService = new WebSocketTurnFinalizationService( + Mock.Of(), + Mock.Of(), + sink.Object); + + var session = new CloudSession + { + Token = "glsm-phase-token", + TurnState = + { + TransId = "trans-glsm", + AwaitingTurnCompletion = true, + SawListen = true, + ListenOpenedUtc = DateTimeOffset.UtcNow - TimeSpan.FromSeconds(1) + } + }; + session.Metadata["glsmPhase"] = "HJ_LISTENING"; + + await turnService.HandleContextAsync( + session, + new WebSocketMessageEnvelope + { + HostName = "neo-hub.jibo.com", + Path = "/listen", + Kind = "neo-hub-listen", + Text = """{"type":"CONTEXT","transID":"trans-glsm","data":{"topic":"conversation"}}""" + }, + CancellationToken.None); + + sink.Verify( + s => s.RecordTurnDiagnosticAsync( + "glsm_phase_transition", + It.Is>(details => + details.ContainsKey("state") && + string.Equals(details["state"] == null ? null : details["state"]!.ToString(), "LISTENING", StringComparison.OrdinalIgnoreCase)), + It.IsAny()), + Times.AtLeastOnce()); + } } diff --git a/OpenJibo/tests/Jibo.Cloud.Tests/WebSockets/JiboWebSocketServiceTests.cs b/OpenJibo/tests/Jibo.Cloud.Tests/WebSockets/JiboWebSocketServiceTests.cs index 8edd903..5f30c6b 100644 --- a/OpenJibo/tests/Jibo.Cloud.Tests/WebSockets/JiboWebSocketServiceTests.cs +++ b/OpenJibo/tests/Jibo.Cloud.Tests/WebSockets/JiboWebSocketServiceTests.cs @@ -2523,6 +2523,47 @@ public sealed class JiboWebSocketServiceTests Assert.Null(session.LastIntent); } + [Fact] + public async Task StaleListenSetup_IsRecoveredWhenNextHotphraseListenArrives() + { + await _service.HandleMessageAsync(new WebSocketMessageEnvelope + { + HostName = "neo-hub.jibo.com", + Path = "/listen", + Kind = "neo-hub-listen", + Token = "hub-stale-listen-token", + Text = """{"type":"LISTEN","transID":"trans-stale-listen","data":{"hotphrase":true,"rules":["launch","globals/global_commands_launch"]}}""" + }); + + var session = _store.FindSessionByToken("hub-stale-listen-token"); + Assert.NotNull(session); + session.TurnState.ListenOpenedUtc = DateTimeOffset.UtcNow - TimeSpan.FromSeconds(12); + session.TurnState.AwaitingTurnCompletion = true; + session.TurnState.SawListen = true; + session.TurnState.SawContext = false; + session.TurnState.BufferedAudioBytes = 0; + session.TurnState.BufferedAudioChunkCount = 0; + session.TurnState.HotphraseEmptyTurnCount = 2; + + var replies = await _service.HandleMessageAsync(new WebSocketMessageEnvelope + { + HostName = "neo-hub.jibo.com", + Path = "/listen", + Kind = "neo-hub-listen", + Token = "hub-stale-listen-token", + Text = """{"type":"LISTEN","transID":"trans-stale-listen","data":{"hotphrase":true,"rules":["launch","globals/global_commands_launch"]}}""" + }); + + Assert.Empty(replies); + Assert.True(session.TurnState.AwaitingTurnCompletion); + Assert.True(session.TurnState.SawListen); + Assert.False(session.TurnState.SawContext); + Assert.Equal(0, session.TurnState.BufferedAudioBytes); + Assert.Equal(0, session.TurnState.BufferedAudioChunkCount); + Assert.Equal(0, session.TurnState.HotphraseEmptyTurnCount); + Assert.True(session.TurnState.ListenOpenedUtc > DateTimeOffset.UtcNow - TimeSpan.FromSeconds(3)); + } + [Fact] public async Task BinaryAudio_AfterWordOfDayRightWordListen_IsIgnoredDuringCleanupWindow() {