Add GLSM listener telemetry and stale-listen recovery

This commit is contained in:
Jacob Dubin
2026-05-07 06:24:30 -05:00
parent 69707f32a7
commit 3e50fb9a49
9 changed files with 385 additions and 105 deletions

View File

@@ -25,7 +25,8 @@ public sealed class JiboWebSocketService(
var replies = await turnFinalizationService.HandleBinaryAudioAsync(session, envelope, cancellationToken);
await telemetrySink.RecordTurnEventAsync(envelope, session, "binary_audio_received", new Dictionary<string, object?>
{
["bytes"] = envelope.Binary?.Length ?? 0
["bytes"] = envelope.Binary?.Length ?? 0,
["glsmPhase"] = WebSocketTurnFinalizationService.ResolveGlsmPhase(session)
}, cancellationToken);
return replies;
}
@@ -33,6 +34,8 @@ public sealed class JiboWebSocketService(
var parsedType = ReadMessageType(envelope.Text);
session.LastMessageType = parsedType;
var containsInlineTurnPayload = parsedType == "LISTEN" && ContainsInlineTurnPayload(envelope.Text);
var staleListenRecovered = false;
var staleListenAgeMs = 0;
if (parsedType == "LISTEN" &&
!containsInlineTurnPayload &&
WebSocketTurnFinalizationService.ShouldIgnoreLateListenSetup(session, envelope.Text))
@@ -57,6 +60,19 @@ public sealed class JiboWebSocketService(
return replies;
}
if (parsedType == "LISTEN" &&
!containsInlineTurnPayload &&
WebSocketTurnFinalizationService.TryRecoverStalePendingListen(session, out staleListenAgeMs))
{
staleListenRecovered = true;
await telemetrySink.RecordTurnEventAsync(envelope, session, "glsm_stale_listen_recovered", new Dictionary<string, object?>
{
["staleAgeMs"] = staleListenAgeMs,
["transID"] = session.TurnState.TransId,
["glsmPhase"] = WebSocketTurnFinalizationService.ResolveGlsmPhase(session)
}, cancellationToken);
}
WebSocketTurnFinalizationService.ObserveIncomingMessage(session, envelope.Text);
switch (parsedType)
@@ -66,7 +82,8 @@ public sealed class JiboWebSocketService(
var replies = await turnFinalizationService.HandleContextAsync(session, envelope, cancellationToken);
await telemetrySink.RecordTurnEventAsync(envelope, session, "context_received", new Dictionary<string, object?>
{
["transID"] = session.TurnState.TransId
["transID"] = session.TurnState.TransId,
["glsmPhase"] = WebSocketTurnFinalizationService.ResolveGlsmPhase(session)
}, cancellationToken);
return replies;
}
@@ -80,7 +97,10 @@ public sealed class JiboWebSocketService(
["messageType"] = parsedType,
["replyCount"] = replies.Count,
["transcript"] = session.LastTranscript,
["intent"] = session.LastIntent
["intent"] = session.LastIntent,
["glsmPhase"] = WebSocketTurnFinalizationService.ResolveGlsmPhase(session),
["staleListenRecovered"] = staleListenRecovered,
["staleListenAgeMs"] = staleListenAgeMs
}, cancellationToken);
return replies;
}
@@ -92,7 +112,8 @@ public sealed class JiboWebSocketService(
["messageType"] = parsedType,
["replyCount"] = replies.Count,
["transcript"] = session.LastTranscript,
["intent"] = session.LastIntent
["intent"] = session.LastIntent,
["glsmPhase"] = WebSocketTurnFinalizationService.ResolveGlsmPhase(session)
}, cancellationToken);
return replies;
}

View File

@@ -14,9 +14,11 @@ public sealed partial class WebSocketTurnFinalizationService(
{
private const int AutoFinalizeMinBufferedAudioBytes = 15000;
private const int AutoFinalizeMinBufferedAudioChunks = 5;
private const string GlsmPhaseMetadataKey = "glsmPhase";
private static readonly TimeSpan AutoFinalizeMinTurnAge = TimeSpan.FromMilliseconds(1800);
private static readonly TimeSpan AutoFinalizeMissingTranscriptFallbackAge = TimeSpan.FromMilliseconds(4200);
private static readonly TimeSpan AutoFinalizeContinuationDeferralMaxAge = TimeSpan.FromMilliseconds(3600);
private static readonly TimeSpan StaleListenSetupRecoveryAge = TimeSpan.FromSeconds(9);
private const int AutoFinalizeContinuationDeferralMaxAttempts = 2;
private static readonly HashSet<string> PegasusAffinityContinuationStems = new(StringComparer.Ordinal)
{
@@ -61,54 +63,61 @@ public sealed partial class WebSocketTurnFinalizationService(
WebSocketMessageEnvelope envelope,
CancellationToken cancellationToken = default)
{
var turnState = session.TurnState;
var ignoreLateAudio = ShouldIgnoreLateAudio(session);
var ignoreAudioWithoutListen = ShouldIgnoreAudioWithoutListen(turnState);
if (ignoreLateAudio || ignoreAudioWithoutListen)
try
{
await sink.RecordTurnDiagnosticAsync("binary_audio_ignored", BuildTurnDiagnosticSnapshot(session, envelope, new Dictionary<string, object?>
var turnState = session.TurnState;
var ignoreLateAudio = ShouldIgnoreLateAudio(session);
var ignoreAudioWithoutListen = ShouldIgnoreAudioWithoutListen(turnState);
if (ignoreLateAudio || ignoreAudioWithoutListen)
{
await sink.RecordTurnDiagnosticAsync("binary_audio_ignored", BuildTurnDiagnosticSnapshot(session, envelope, new Dictionary<string, object?>
{
["ignored"] = true,
["ignoreLateAudio"] = ignoreLateAudio,
["ignoreAudioWithoutListen"] = ignoreAudioWithoutListen,
["awaitingTurnCompletion"] = turnState.AwaitingTurnCompletion,
["bufferedAudioBytes"] = turnState.BufferedAudioBytes,
["bufferedAudioChunks"] = turnState.BufferedAudioChunkCount,
["sawListen"] = turnState.SawListen,
["sawContext"] = turnState.SawContext
}), cancellationToken);
return [];
}
session.LastMessageType = "BINARY_AUDIO";
turnState.FirstAudioReceivedUtc ??= DateTimeOffset.UtcNow;
turnState.BufferedAudioChunkCount += 1;
turnState.BufferedAudioBytes += envelope.Binary?.Length ?? 0;
if (envelope.Binary is { Length: > 0 })
{
turnState.BufferedAudioFrames.Add([.. envelope.Binary]);
}
turnState.LastAudioReceivedUtc = DateTimeOffset.UtcNow;
turnState.AwaitingTurnCompletion = true;
session.Metadata["lastAudioBytes"] = envelope.Binary?.Length ?? 0;
await sink.RecordTurnDiagnosticAsync("binary_audio_received", BuildTurnDiagnosticSnapshot(session, envelope, new Dictionary<string, object?>
{
["ignored"] = true,
["ignoreLateAudio"] = ignoreLateAudio,
["ignoreAudioWithoutListen"] = ignoreAudioWithoutListen,
["awaitingTurnCompletion"] = turnState.AwaitingTurnCompletion,
["bufferedAudioBytes"] = turnState.BufferedAudioBytes,
["bufferedAudioChunks"] = turnState.BufferedAudioChunkCount,
["awaitingTurnCompletion"] = turnState.AwaitingTurnCompletion,
["sawListen"] = turnState.SawListen,
["sawContext"] = turnState.SawContext
["sawContext"] = turnState.SawContext,
["listenRules"] = turnState.ListenRules,
["listenAsrHints"] = turnState.ListenAsrHints,
["yesNoRule"] = turnState.ListenRules.FirstOrDefault(IsConstrainedYesNoRule)
}), cancellationToken);
if (ShouldAutoFinalize(session))
{
return await FinalizeTurnAsync(session, envelope, "AUTO_FINALIZE", allowFallbackOnMissingTranscript: true, cancellationToken);
}
return [];
}
session.LastMessageType = "BINARY_AUDIO";
turnState.FirstAudioReceivedUtc ??= DateTimeOffset.UtcNow;
turnState.BufferedAudioChunkCount += 1;
turnState.BufferedAudioBytes += envelope.Binary?.Length ?? 0;
if (envelope.Binary is { Length: > 0 })
finally
{
turnState.BufferedAudioFrames.Add([.. envelope.Binary]);
await TrackGlsmPhaseAsync(session, envelope, "binary_audio", cancellationToken);
}
turnState.LastAudioReceivedUtc = DateTimeOffset.UtcNow;
turnState.AwaitingTurnCompletion = true;
session.Metadata["lastAudioBytes"] = envelope.Binary?.Length ?? 0;
await sink.RecordTurnDiagnosticAsync("binary_audio_received", BuildTurnDiagnosticSnapshot(session, envelope, new Dictionary<string, object?>
{
["bufferedAudioBytes"] = turnState.BufferedAudioBytes,
["bufferedAudioChunks"] = turnState.BufferedAudioChunkCount,
["awaitingTurnCompletion"] = turnState.AwaitingTurnCompletion,
["sawListen"] = turnState.SawListen,
["sawContext"] = turnState.SawContext,
["listenRules"] = turnState.ListenRules,
["listenAsrHints"] = turnState.ListenAsrHints,
["yesNoRule"] = turnState.ListenRules.FirstOrDefault(IsConstrainedYesNoRule)
}), cancellationToken);
if (ShouldAutoFinalize(session))
{
return await FinalizeTurnAsync(session, envelope, "AUTO_FINALIZE", allowFallbackOnMissingTranscript: true, cancellationToken);
}
return [];
}
public async Task<IReadOnlyList<WebSocketReply>> HandleContextAsync(
@@ -116,34 +125,40 @@ public sealed partial class WebSocketTurnFinalizationService(
WebSocketMessageEnvelope envelope,
CancellationToken cancellationToken = default)
{
var turnState = session.TurnState;
turnState.SawContext = true;
turnState.ContextPayload = ExtractDataPayload(envelope.Text);
session.Metadata["context"] = turnState.ContextPayload;
if (TryReadContextProperty(envelope.Text, "audioTranscriptHint", out var transcriptHint) &&
!string.IsNullOrWhiteSpace(transcriptHint))
try
{
turnState.AudioTranscriptHint = transcriptHint;
session.Metadata["audioTranscriptHint"] = transcriptHint;
}
var turnState = session.TurnState;
turnState.SawContext = true;
turnState.ContextPayload = ExtractDataPayload(envelope.Text);
session.Metadata["context"] = turnState.ContextPayload;
if (TryReadContextProperty(envelope.Text, "audioTranscriptHint", out var transcriptHint) &&
!string.IsNullOrWhiteSpace(transcriptHint))
{
turnState.AudioTranscriptHint = transcriptHint;
session.Metadata["audioTranscriptHint"] = transcriptHint;
}
if (ShouldIgnorePassiveLocalSkillContext(session, envelope.Text))
{
turnState.AwaitingTurnCompletion = false;
turnState.IgnoreAdditionalAudioUntilUtc = DateTimeOffset.UtcNow.Add(WebSocketTurnState.DefaultLateAudioIgnoreWindow);
ResetBufferedAudio(session);
ClearListenTracking(turnState);
return [];
}
if (ShouldAutoFinalize(session))
{
return await FinalizeTurnAsync(session, envelope, "AUTO_FINALIZE", allowFallbackOnMissingTranscript: true, cancellationToken);
}
if (ShouldIgnorePassiveLocalSkillContext(session, envelope.Text))
{
turnState.AwaitingTurnCompletion = false;
turnState.IgnoreAdditionalAudioUntilUtc = DateTimeOffset.UtcNow.Add(WebSocketTurnState.DefaultLateAudioIgnoreWindow);
ResetBufferedAudio(session);
turnState.SawListen = false;
turnState.SawContext = false;
return [];
}
if (ShouldAutoFinalize(session))
finally
{
return await FinalizeTurnAsync(session, envelope, "AUTO_FINALIZE", allowFallbackOnMissingTranscript: true, cancellationToken);
await TrackGlsmPhaseAsync(session, envelope, "context", cancellationToken);
}
return [];
}
public async Task<IReadOnlyList<WebSocketReply>> HandleTurnAsync(
@@ -167,8 +182,8 @@ public sealed partial class WebSocketTurnFinalizationService(
session.TurnState.IgnoreAdditionalAudioUntilUtc = DateTimeOffset.UtcNow.Add(WebSocketTurnState.DefaultLateAudioIgnoreWindow);
session.FollowUpExpiresUtc = null;
ResetBufferedAudio(session);
session.TurnState.SawListen = false;
session.TurnState.SawContext = false;
ClearListenTracking(session.TurnState);
UpdateGlsmPhaseMarker(session);
return [.. ResponsePlanToSocketMessagesMapper.MapNoInputAndRedirectToSkill(
session.TurnState.TransId ?? session.LastTransId ?? string.Empty,
session.TurnState.ListenRules,
@@ -181,6 +196,8 @@ public sealed partial class WebSocketTurnFinalizationService(
}
session.TurnState.AwaitingTurnCompletion = true;
session.TurnState.ListenOpenedUtc ??= DateTimeOffset.UtcNow;
UpdateGlsmPhaseMarker(session);
return [];
}
@@ -275,6 +292,7 @@ public sealed partial class WebSocketTurnFinalizationService(
string.Equals(type.GetString(), "LISTEN", StringComparison.OrdinalIgnoreCase))
{
turnState.SawListen = true;
turnState.ListenOpenedUtc ??= DateTimeOffset.UtcNow;
}
if (root.TryGetProperty("transID", out var transId) && transId.ValueKind == JsonValueKind.String)
@@ -351,6 +369,7 @@ public sealed partial class WebSocketTurnFinalizationService(
turnState.TransId = transId;
turnState.ContextPayload = null;
turnState.AudioTranscriptHint = null;
turnState.ListenOpenedUtc = null;
turnState.LastSttError = null;
turnState.LastSttErrorUtc = null;
turnState.FirstAudioReceivedUtc = null;
@@ -376,36 +395,37 @@ public sealed partial class WebSocketTurnFinalizationService(
bool allowFallbackOnMissingTranscript,
CancellationToken cancellationToken)
{
var turn = ProtocolToTurnContextMapper.MapListenMessage(envelope, session, messageType);
var turnState = session.TurnState;
if (IsYesNoTurn(turn) || ReadPrimaryYesNoRule(turn) is not null)
try
{
await sink.RecordTurnDiagnosticAsync("yes_no_turn_received", BuildTurnDiagnosticSnapshot(session, envelope, new Dictionary<string, object?>
var turn = ProtocolToTurnContextMapper.MapListenMessage(envelope, session, messageType);
var turnState = session.TurnState;
if (IsYesNoTurn(turn) || ReadPrimaryYesNoRule(turn) is not null)
{
["messageType"] = messageType,
["listenRules"] = ReadRules(turn, "listenRules").ToArray(),
["clientRules"] = ReadRules(turn, "clientRules").ToArray(),
["listenAsrHints"] = ReadRules(turn, "listenAsrHints").ToArray(),
["yesNoRule"] = ReadPrimaryYesNoRule(turn),
["awaitingTurnCompletion"] = turnState.AwaitingTurnCompletion,
["bufferedAudioBytes"] = turnState.BufferedAudioBytes,
["bufferedAudioChunks"] = turnState.BufferedAudioChunkCount,
["sawListen"] = turnState.SawListen,
["sawContext"] = turnState.SawContext,
["followUpOpen"] = session.FollowUpOpen,
["followUpExpiresUtc"] = session.FollowUpExpiresUtc
}), cancellationToken);
}
if (ShouldIgnoreBlankAudioHotphraseTurn(turn))
{
session.TurnState.AwaitingTurnCompletion = false;
session.TurnState.IgnoreAdditionalAudioUntilUtc = DateTimeOffset.UtcNow.Add(WebSocketTurnState.DefaultLateAudioIgnoreWindow);
session.FollowUpExpiresUtc = null;
ResetBufferedAudio(session);
session.TurnState.SawListen = false;
session.TurnState.SawContext = false;
return [];
}
await sink.RecordTurnDiagnosticAsync("yes_no_turn_received", BuildTurnDiagnosticSnapshot(session, envelope, new Dictionary<string, object?>
{
["messageType"] = messageType,
["listenRules"] = ReadRules(turn, "listenRules").ToArray(),
["clientRules"] = ReadRules(turn, "clientRules").ToArray(),
["listenAsrHints"] = ReadRules(turn, "listenAsrHints").ToArray(),
["yesNoRule"] = ReadPrimaryYesNoRule(turn),
["awaitingTurnCompletion"] = turnState.AwaitingTurnCompletion,
["bufferedAudioBytes"] = turnState.BufferedAudioBytes,
["bufferedAudioChunks"] = turnState.BufferedAudioChunkCount,
["sawListen"] = turnState.SawListen,
["sawContext"] = turnState.SawContext,
["followUpOpen"] = session.FollowUpOpen,
["followUpExpiresUtc"] = session.FollowUpExpiresUtc
}), cancellationToken);
}
if (ShouldIgnoreBlankAudioHotphraseTurn(turn))
{
session.TurnState.AwaitingTurnCompletion = false;
session.TurnState.IgnoreAdditionalAudioUntilUtc = DateTimeOffset.UtcNow.Add(WebSocketTurnState.DefaultLateAudioIgnoreWindow);
session.FollowUpExpiresUtc = null;
ResetBufferedAudio(session);
ClearListenTracking(session.TurnState);
return [];
}
var finalizedTurn = await ResolveTranscriptAsync(turn, session, cancellationToken);
if (!IsTranscriptUsable(finalizedTurn))
@@ -445,8 +465,7 @@ public sealed partial class WebSocketTurnFinalizationService(
turnState.IgnoreAdditionalAudioUntilUtc = DateTimeOffset.UtcNow.Add(WebSocketTurnState.DefaultLateAudioIgnoreWindow);
session.FollowUpExpiresUtc = null;
ResetBufferedAudio(session);
turnState.SawListen = false;
turnState.SawContext = false;
ClearListenTracking(turnState);
return [.. ResponsePlanToSocketMessagesMapper.MapNoInputAndRedirectToSkill(
turnState.TransId ?? session.LastTransId ?? string.Empty,
turnState.ListenRules,
@@ -483,8 +502,7 @@ public sealed partial class WebSocketTurnFinalizationService(
var localRule = ReadPrimaryNoInputRule(finalizedTurn);
var noInputReplies = BuildLocalNoInputReplies(session, turnState, localRule);
ResetBufferedAudio(session);
turnState.SawListen = false;
turnState.SawContext = false;
ClearListenTracking(turnState);
return noInputReplies;
}
@@ -545,8 +563,7 @@ public sealed partial class WebSocketTurnFinalizationService(
.Select(map => new WebSocketReply { Text = map.Text, DelayMs = map.DelayMs })
.ToArray();
ResetBufferedAudio(session);
turnState.SawListen = false;
turnState.SawContext = false;
ClearListenTracking(turnState);
return fallbackReplies;
}
case true when
@@ -678,10 +695,14 @@ public sealed partial class WebSocketTurnFinalizationService(
}), cancellationToken);
}
ResetBufferedAudio(session);
turnState.SawListen = false;
turnState.SawContext = false;
return replies;
ResetBufferedAudio(session);
ClearListenTracking(turnState);
return replies;
}
finally
{
await TrackGlsmPhaseAsync(session, envelope, $"finalize:{messageType}", cancellationToken);
}
}
private static bool ShouldAutoFinalize(CloudSession session)
@@ -708,6 +729,58 @@ public sealed partial class WebSocketTurnFinalizationService(
return ShouldIgnoreLateAudio(session) && IsHotphraseLaunchListenSetup(text);
}
public static bool TryRecoverStalePendingListen(CloudSession session, out int staleAgeMs)
{
staleAgeMs = 0;
var turnState = session.TurnState;
if (!turnState.AwaitingTurnCompletion ||
!turnState.SawListen ||
turnState.SawContext ||
turnState.BufferedAudioBytes > 0 ||
!turnState.ListenOpenedUtc.HasValue)
{
return false;
}
var age = DateTimeOffset.UtcNow - turnState.ListenOpenedUtc.Value;
if (age < StaleListenSetupRecoveryAge)
{
return false;
}
staleAgeMs = (int)age.TotalMilliseconds;
turnState.AwaitingTurnCompletion = false;
ResetBufferedAudio(session);
ClearListenTracking(turnState);
turnState.ListenHotphrase = false;
turnState.HotphraseEmptyTurnCount = 0;
UpdateGlsmPhaseMarker(session);
return true;
}
public static string ResolveGlsmPhase(CloudSession session)
{
var turnState = session.TurnState;
if (!turnState.AwaitingTurnCompletion)
{
return session.FollowUpOpen ? "DISPATCH_DIALOG" : "PROCESS_LISTENER_QUEUE";
}
if (turnState.SawListen && !turnState.SawContext && turnState.BufferedAudioBytes == 0)
{
return "HJ_LISTENING";
}
if (turnState.SawListen && turnState.SawContext && turnState.BufferedAudioBytes == 0)
{
return "LISTENING";
}
return turnState.BufferedAudioBytes > 0
? "WAIT_LISTEN_FINISHED"
: "LISTENING";
}
private static TimeSpan ResolveLateAudioIgnoreWindow(ResponsePlan plan)
{
return string.Equals(plan.IntentName, "cloud_version", StringComparison.OrdinalIgnoreCase)
@@ -1518,6 +1591,53 @@ public sealed partial class WebSocketTurnFinalizationService(
return PegasusAffinityContinuationStems.Contains(normalized);
}
private static void ClearListenTracking(WebSocketTurnState turnState)
{
turnState.SawListen = false;
turnState.SawContext = false;
turnState.ListenOpenedUtc = null;
}
private static void UpdateGlsmPhaseMarker(CloudSession session)
{
session.Metadata[GlsmPhaseMetadataKey] = ResolveGlsmPhase(session);
}
private async Task TrackGlsmPhaseAsync(
CloudSession session,
WebSocketMessageEnvelope envelope,
string trigger,
CancellationToken cancellationToken)
{
var nextPhase = ResolveGlsmPhase(session);
var previousPhase = session.Metadata.TryGetValue(GlsmPhaseMetadataKey, out var rawPhase)
? rawPhase?.ToString()
: null;
session.Metadata[GlsmPhaseMetadataKey] = nextPhase;
if (string.Equals(previousPhase, nextPhase, StringComparison.OrdinalIgnoreCase))
{
return;
}
try
{
await sink.RecordTurnDiagnosticAsync("glsm_phase_transition", BuildTurnDiagnosticSnapshot(session, envelope, new Dictionary<string, object?>
{
["trigger"] = trigger,
["previousState"] = previousPhase,
["state"] = nextPhase,
["listenOpenedUtc"] = session.TurnState.ListenOpenedUtc,
["followUpOpen"] = session.FollowUpOpen,
["listenRules"] = session.TurnState.ListenRules
}), cancellationToken);
}
catch
{
// Diagnostics should not interrupt turn handling.
}
}
private static Dictionary<string, object?> BuildTurnDiagnosticSnapshot(
CloudSession session,
WebSocketMessageEnvelope envelope,
@@ -1534,6 +1654,7 @@ public sealed partial class WebSocketTurnFinalizationService(
details["bufferedAudioChunks"] = session.TurnState.BufferedAudioChunkCount;
details["sawListen"] = session.TurnState.SawListen;
details["sawContext"] = session.TurnState.SawContext;
details["glsmState"] = ResolveGlsmPhase(session);
return details;
}

View File

@@ -7,6 +7,7 @@ public sealed class WebSocketTurnState
public string? TransId { get; set; }
public string? ContextPayload { get; set; }
public DateTimeOffset? ListenOpenedUtc { get; set; }
public bool ListenHotphrase { get; set; }
public int HotphraseEmptyTurnCount { get; set; }
public DateTimeOffset? IgnoreAdditionalAudioUntilUtc { get; set; }