new pass at websocket telemetry and prepare for capture
This commit is contained in:
61
OpenJibo/docs/live-jibo-capture.md
Normal file
61
OpenJibo/docs/live-jibo-capture.md
Normal file
@@ -0,0 +1,61 @@
|
||||
# Live Jibo Capture
|
||||
|
||||
## Recommendation
|
||||
|
||||
For the first real `.NET cloud -> physical Jibo` runs, use the existing controlled network and routing setup that already works with the Node server.
|
||||
|
||||
Recommended order:
|
||||
|
||||
1. Keep the robot on the known-good Ubuntu laptop based environment.
|
||||
2. Swap the `.NET` cloud into that same controlled path.
|
||||
3. Leave the Node oracle available as a fallback on separate ports or on a second machine.
|
||||
4. Capture real `.NET` websocket traffic and turn it into sanitized fixtures.
|
||||
5. Only after that, decide what belongs in permanent Azure hosting and IaC.
|
||||
|
||||
This is the lowest-risk path because it changes only one major variable at a time: the cloud implementation. It avoids mixing protocol-parity questions with new infrastructure variables.
|
||||
|
||||
## Why Not Azure First
|
||||
|
||||
Azure remains the target hosting direction, but it is not the best first environment for live robot discovery.
|
||||
|
||||
Reasons:
|
||||
|
||||
- the main unknowns are still protocol and turn-behavior details, not Azure primitives
|
||||
- keeping Node and `.NET` both available locally makes fallback and side-by-side comparison much easier
|
||||
- live robot capture is more valuable right now than early CI/CD polish
|
||||
- region injection and device routing work are easier to debug in a tightly controlled local network
|
||||
|
||||
## When To Move Beyond The Ubuntu Setup
|
||||
|
||||
Move to a second local/staging server or Azure after:
|
||||
|
||||
- startup flows are stable against the physical robot
|
||||
- websocket turn telemetry is being captured reliably
|
||||
- several real captured sessions have been sanitized into replay fixtures
|
||||
- the fallback path to Node is no longer needed for normal testing
|
||||
|
||||
## Telemetry Before Live Runs
|
||||
|
||||
The `.NET` cloud now supports structured websocket capture intended for first live runs:
|
||||
|
||||
- event stream written as NDJSON
|
||||
- per-session fixture export for replay
|
||||
- turn metadata including `transID`, buffered audio counts, finalize attempts, and reply types
|
||||
|
||||
Default capture location:
|
||||
|
||||
- `captures/websocket/`
|
||||
|
||||
Artifacts:
|
||||
|
||||
- `*.events.ndjson`
|
||||
- `fixtures/*.flow.json`
|
||||
|
||||
## Suggested First Hookup Plan
|
||||
|
||||
1. Start the `.NET` API on the Ubuntu-backed controlled network using the same robot routing settings currently used for Node.
|
||||
2. Confirm HTTP bootstrap and websocket acceptance with the existing smoke/routing helpers.
|
||||
3. Run one or two controlled listen turns with Jibo.
|
||||
4. Inspect the captured websocket events and exported fixtures.
|
||||
5. Convert the best captures into sanitized checked-in fixtures and tests.
|
||||
6. Keep Node available to compare any surprising turn behavior before changing infrastructure.
|
||||
33
OpenJibo/scripts/cloud/Get-WebSocketCaptureSummary.ps1
Normal file
33
OpenJibo/scripts/cloud/Get-WebSocketCaptureSummary.ps1
Normal file
@@ -0,0 +1,33 @@
|
||||
param(
|
||||
[string]$CaptureDirectory = "..\..\src\Jibo.Cloud\dotnet\src\Jibo.Cloud.Api\bin\Debug\net10.0\captures\websocket"
|
||||
)
|
||||
|
||||
$resolvedDirectory = Resolve-Path -LiteralPath $CaptureDirectory -ErrorAction Stop
|
||||
$eventFiles = Get-ChildItem -LiteralPath $resolvedDirectory -Filter *.events.ndjson -File | Sort-Object LastWriteTimeUtc
|
||||
|
||||
if (-not $eventFiles) {
|
||||
Write-Host "No websocket telemetry event files found in $resolvedDirectory"
|
||||
exit 0
|
||||
}
|
||||
|
||||
$records = foreach ($file in $eventFiles) {
|
||||
Get-Content -LiteralPath $file.FullName | Where-Object { $_.Trim().Length -gt 0 } | ForEach-Object {
|
||||
$_ | ConvertFrom-Json
|
||||
}
|
||||
}
|
||||
|
||||
$records |
|
||||
Group-Object EventType |
|
||||
Sort-Object Name |
|
||||
Select-Object Name, Count |
|
||||
Format-Table -AutoSize
|
||||
|
||||
$fixtureDirectory = Join-Path $resolvedDirectory "fixtures"
|
||||
if (Test-Path -LiteralPath $fixtureDirectory) {
|
||||
Write-Host ""
|
||||
Write-Host "Exported websocket fixtures:"
|
||||
Get-ChildItem -LiteralPath $fixtureDirectory -Filter *.flow.json -File |
|
||||
Sort-Object LastWriteTimeUtc |
|
||||
Select-Object LastWriteTimeUtc, Name |
|
||||
Format-Table -AutoSize
|
||||
}
|
||||
@@ -6,3 +6,5 @@ These scripts help exercise the new .NET hosted cloud locally.
|
||||
Runs a few quick HTTP checks against a local OpenJibo cloud instance.
|
||||
- `Invoke-ProtocolFixture.ps1`
|
||||
Replays a sanitized HTTP fixture against a running local instance.
|
||||
- `Get-WebSocketCaptureSummary.ps1`
|
||||
Summarizes captured websocket telemetry events and exported live-run fixtures from the .NET cloud.
|
||||
|
||||
@@ -72,6 +72,7 @@ Current websocket scope is still intentionally narrow:
|
||||
- synthetic `LISTEN` result shaping for `LISTEN`, `CLIENT_NLU`, and `CLIENT_ASR`
|
||||
- buffered audio state tracking behind a dedicated turn-finalization layer
|
||||
- synthetic STT strategy selection for fixture-driven audio turn completion
|
||||
- structured websocket telemetry and live-run fixture export
|
||||
- `CONTEXT` capture and follow-up turn state
|
||||
- `EOS` completion
|
||||
- first skill vertical for joke/chat `SKILL_ACTION` playback
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
using System.Net.WebSockets;
|
||||
using System.Text;
|
||||
using Jibo.Cloud.Application.Abstractions;
|
||||
using Jibo.Cloud.Application.Services;
|
||||
using Jibo.Cloud.Domain.Models;
|
||||
using Jibo.Cloud.Infrastructure.DependencyInjection;
|
||||
|
||||
var builder = WebApplication.CreateBuilder(args);
|
||||
|
||||
builder.Services.AddOpenJiboCloud();
|
||||
builder.Services.AddOpenJiboCloud(builder.Configuration);
|
||||
|
||||
var app = builder.Build();
|
||||
|
||||
@@ -21,10 +22,21 @@ app.Use(async (context, next) =>
|
||||
}
|
||||
|
||||
var webSocketService = context.RequestServices.GetRequiredService<JiboWebSocketService>();
|
||||
var telemetrySink = context.RequestServices.GetRequiredService<IWebSocketTelemetrySink>();
|
||||
using var socket = await context.WebSockets.AcceptWebSocketAsync();
|
||||
|
||||
var kind = ResolveSocketKind(context.Request.Host.Host, context.Request.Path);
|
||||
var token = ResolveToken(context.Request);
|
||||
var openEnvelope = new WebSocketMessageEnvelope
|
||||
{
|
||||
ConnectionId = Guid.NewGuid().ToString("N"),
|
||||
HostName = context.Request.Host.Host,
|
||||
Path = context.Request.Path.Value ?? "/",
|
||||
Kind = kind,
|
||||
Token = token
|
||||
};
|
||||
var openSession = ResolveSession(webSocketService, openEnvelope);
|
||||
await telemetrySink.RecordConnectionOpenedAsync(openEnvelope, openSession, context.RequestAborted);
|
||||
|
||||
while (socket.State == WebSocketState.Open)
|
||||
{
|
||||
@@ -47,6 +59,8 @@ app.Use(async (context, next) =>
|
||||
};
|
||||
|
||||
var replies = await webSocketService.HandleMessageAsync(envelope, context.RequestAborted);
|
||||
var session = ResolveSession(webSocketService, envelope);
|
||||
await telemetrySink.RecordInboundAsync(envelope, session, ReadMessageType(envelope.Text), context.RequestAborted);
|
||||
foreach (var reply in replies)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(reply.Text))
|
||||
@@ -57,7 +71,20 @@ app.Use(async (context, next) =>
|
||||
var payload = Encoding.UTF8.GetBytes(reply.Text);
|
||||
await socket.SendAsync(payload, WebSocketMessageType.Text, true, context.RequestAborted);
|
||||
}
|
||||
|
||||
await telemetrySink.RecordOutboundAsync(envelope, session, replies, context.RequestAborted);
|
||||
}
|
||||
|
||||
var closeEnvelope = new WebSocketMessageEnvelope
|
||||
{
|
||||
ConnectionId = Guid.NewGuid().ToString("N"),
|
||||
HostName = context.Request.Host.Host,
|
||||
Path = context.Request.Path.Value ?? "/",
|
||||
Kind = kind,
|
||||
Token = token
|
||||
};
|
||||
var closeSession = ResolveSession(webSocketService, closeEnvelope);
|
||||
await telemetrySink.RecordConnectionClosedAsync(closeEnvelope, closeSession, "socket-loop-ended", context.RequestAborted);
|
||||
});
|
||||
|
||||
app.MapGet("/health", () => Results.Json(new { ok = true, service = "OpenJibo Cloud Api" }));
|
||||
@@ -167,4 +194,29 @@ static string? ResolveToken(HttpRequest request)
|
||||
return null;
|
||||
}
|
||||
|
||||
static string ReadMessageType(string? text)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(text))
|
||||
{
|
||||
return "BINARY_OR_EMPTY";
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
using var document = System.Text.Json.JsonDocument.Parse(text);
|
||||
return document.RootElement.TryGetProperty("type", out var type) && type.ValueKind == System.Text.Json.JsonValueKind.String
|
||||
? type.GetString() ?? "UNKNOWN"
|
||||
: "UNKNOWN";
|
||||
}
|
||||
catch
|
||||
{
|
||||
return "TEXT";
|
||||
}
|
||||
}
|
||||
|
||||
static CloudSession ResolveSession(JiboWebSocketService webSocketService, WebSocketMessageEnvelope envelope)
|
||||
{
|
||||
return webSocketService.GetOrCreateSession(envelope);
|
||||
}
|
||||
|
||||
internal sealed record ReceivedSocketMessage(WebSocketMessageType MessageType, byte[] Buffer);
|
||||
|
||||
@@ -0,0 +1,12 @@
|
||||
using Jibo.Cloud.Domain.Models;
|
||||
|
||||
namespace Jibo.Cloud.Application.Abstractions;
|
||||
|
||||
public interface IWebSocketTelemetrySink
|
||||
{
|
||||
Task RecordConnectionOpenedAsync(WebSocketMessageEnvelope envelope, CloudSession session, CancellationToken cancellationToken = default);
|
||||
Task RecordInboundAsync(WebSocketMessageEnvelope envelope, CloudSession session, string? messageType, CancellationToken cancellationToken = default);
|
||||
Task RecordTurnEventAsync(WebSocketMessageEnvelope envelope, CloudSession session, string eventType, IReadOnlyDictionary<string, object?> details, CancellationToken cancellationToken = default);
|
||||
Task RecordOutboundAsync(WebSocketMessageEnvelope envelope, CloudSession session, IReadOnlyList<WebSocketReply> replies, CancellationToken cancellationToken = default);
|
||||
Task RecordConnectionClosedAsync(WebSocketMessageEnvelope envelope, CloudSession session, string reason, CancellationToken cancellationToken = default);
|
||||
}
|
||||
@@ -7,17 +7,28 @@ namespace Jibo.Cloud.Application.Services;
|
||||
|
||||
public sealed class JiboWebSocketService(
|
||||
ICloudStateStore stateStore,
|
||||
IWebSocketTelemetrySink telemetrySink,
|
||||
WebSocketTurnFinalizationService turnFinalizationService)
|
||||
{
|
||||
public CloudSession GetOrCreateSession(WebSocketMessageEnvelope envelope)
|
||||
{
|
||||
return stateStore.FindSessionByToken(envelope.Token ?? string.Empty) ??
|
||||
stateStore.OpenSession(envelope.Kind, null, envelope.Token, envelope.HostName, envelope.Path);
|
||||
}
|
||||
|
||||
public async Task<IReadOnlyList<WebSocketReply>> HandleMessageAsync(WebSocketMessageEnvelope envelope, CancellationToken cancellationToken = default)
|
||||
{
|
||||
var session = stateStore.FindSessionByToken(envelope.Token ?? string.Empty) ??
|
||||
stateStore.OpenSession(envelope.Kind, null, envelope.Token, envelope.HostName, envelope.Path);
|
||||
var session = GetOrCreateSession(envelope);
|
||||
session.LastSeenUtc = DateTimeOffset.UtcNow;
|
||||
|
||||
if (envelope.IsBinary)
|
||||
{
|
||||
return turnFinalizationService.HandleBinaryAudio(session, envelope);
|
||||
var replies = turnFinalizationService.HandleBinaryAudio(session, envelope);
|
||||
await telemetrySink.RecordTurnEventAsync(envelope, session, "binary_audio_received", new Dictionary<string, object?>
|
||||
{
|
||||
["bytes"] = envelope.Binary?.Length ?? 0
|
||||
}, cancellationToken);
|
||||
return replies;
|
||||
}
|
||||
|
||||
var parsedType = ReadMessageType(envelope.Text);
|
||||
@@ -31,12 +42,25 @@ public sealed class JiboWebSocketService(
|
||||
|
||||
if (parsedType == "CONTEXT")
|
||||
{
|
||||
return turnFinalizationService.HandleContext(session, envelope.Text);
|
||||
var replies = turnFinalizationService.HandleContext(session, envelope.Text);
|
||||
await telemetrySink.RecordTurnEventAsync(envelope, session, "context_received", new Dictionary<string, object?>
|
||||
{
|
||||
["transID"] = session.TurnState.TransId
|
||||
}, cancellationToken);
|
||||
return replies;
|
||||
}
|
||||
|
||||
if (parsedType is "LISTEN" or "CLIENT_NLU" or "CLIENT_ASR")
|
||||
{
|
||||
return await turnFinalizationService.HandleTurnAsync(session, envelope, parsedType, cancellationToken);
|
||||
var replies = await turnFinalizationService.HandleTurnAsync(session, envelope, parsedType, cancellationToken);
|
||||
await telemetrySink.RecordTurnEventAsync(envelope, session, "turn_processed", new Dictionary<string, object?>
|
||||
{
|
||||
["messageType"] = parsedType,
|
||||
["replyCount"] = replies.Count,
|
||||
["transcript"] = session.LastTranscript,
|
||||
["intent"] = session.LastIntent
|
||||
}, cancellationToken);
|
||||
return replies;
|
||||
}
|
||||
|
||||
return
|
||||
|
||||
@@ -0,0 +1,13 @@
|
||||
using Jibo.Cloud.Application.Abstractions;
|
||||
using Jibo.Cloud.Domain.Models;
|
||||
|
||||
namespace Jibo.Cloud.Application.Services;
|
||||
|
||||
public sealed class NullWebSocketTelemetrySink : IWebSocketTelemetrySink
|
||||
{
|
||||
public Task RecordConnectionOpenedAsync(WebSocketMessageEnvelope envelope, CloudSession session, CancellationToken cancellationToken = default) => Task.CompletedTask;
|
||||
public Task RecordInboundAsync(WebSocketMessageEnvelope envelope, CloudSession session, string? messageType, CancellationToken cancellationToken = default) => Task.CompletedTask;
|
||||
public Task RecordTurnEventAsync(WebSocketMessageEnvelope envelope, CloudSession session, string eventType, IReadOnlyDictionary<string, object?> details, CancellationToken cancellationToken = default) => Task.CompletedTask;
|
||||
public Task RecordOutboundAsync(WebSocketMessageEnvelope envelope, CloudSession session, IReadOnlyList<WebSocketReply> replies, CancellationToken cancellationToken = default) => Task.CompletedTask;
|
||||
public Task RecordConnectionClosedAsync(WebSocketMessageEnvelope envelope, CloudSession session, string reason, CancellationToken cancellationToken = default) => Task.CompletedTask;
|
||||
}
|
||||
@@ -0,0 +1,25 @@
|
||||
using System.Text.Json;
|
||||
|
||||
namespace Jibo.Cloud.Domain.Models;
|
||||
|
||||
public sealed class CapturedWebSocketFixture
|
||||
{
|
||||
public string Name { get; init; } = string.Empty;
|
||||
public CapturedWebSocketFixtureSession Session { get; init; } = new();
|
||||
public IReadOnlyList<CapturedWebSocketFixtureStep> Steps { get; init; } = [];
|
||||
}
|
||||
|
||||
public sealed class CapturedWebSocketFixtureSession
|
||||
{
|
||||
public string HostName { get; init; } = string.Empty;
|
||||
public string Path { get; init; } = "/";
|
||||
public string Kind { get; init; } = "unknown";
|
||||
public string? Token { get; init; }
|
||||
}
|
||||
|
||||
public sealed class CapturedWebSocketFixtureStep
|
||||
{
|
||||
public JsonElement? Text { get; init; }
|
||||
public IReadOnlyList<int>? Binary { get; init; }
|
||||
public IReadOnlyList<string> ExpectedReplyTypes { get; init; } = [];
|
||||
}
|
||||
@@ -0,0 +1,24 @@
|
||||
namespace Jibo.Cloud.Domain.Models;
|
||||
|
||||
public sealed class WebSocketTelemetryRecord
|
||||
{
|
||||
public DateTimeOffset TimestampUtc { get; init; } = DateTimeOffset.UtcNow;
|
||||
public string EventType { get; init; } = string.Empty;
|
||||
public string SessionId { get; init; } = string.Empty;
|
||||
public string ConnectionId { get; init; } = string.Empty;
|
||||
public string? Token { get; init; }
|
||||
public string HostName { get; init; } = string.Empty;
|
||||
public string Path { get; init; } = "/";
|
||||
public string Kind { get; init; } = "unknown";
|
||||
public string? TransId { get; init; }
|
||||
public string? MessageType { get; init; }
|
||||
public string Direction { get; init; } = "internal";
|
||||
public string? Text { get; init; }
|
||||
public int? BinaryLength { get; init; }
|
||||
public IReadOnlyList<string> ReplyTypes { get; init; } = [];
|
||||
public int BufferedAudioBytes { get; init; }
|
||||
public int BufferedAudioChunks { get; init; }
|
||||
public int FinalizeAttempts { get; init; }
|
||||
public bool AwaitingTurnCompletion { get; init; }
|
||||
public IReadOnlyDictionary<string, object?> Details { get; init; } = new Dictionary<string, object?>(StringComparer.OrdinalIgnoreCase);
|
||||
}
|
||||
@@ -1,19 +1,27 @@
|
||||
using Jibo.Cloud.Application.Abstractions;
|
||||
using Jibo.Cloud.Application.Services;
|
||||
using Jibo.Cloud.Infrastructure.Persistence;
|
||||
using Jibo.Cloud.Infrastructure.Telemetry;
|
||||
using Jibo.Runtime.Abstractions;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Configuration;
|
||||
|
||||
namespace Jibo.Cloud.Infrastructure.DependencyInjection;
|
||||
|
||||
public static class ServiceCollectionExtensions
|
||||
{
|
||||
public static IServiceCollection AddOpenJiboCloud(this IServiceCollection services)
|
||||
public static IServiceCollection AddOpenJiboCloud(this IServiceCollection services, IConfiguration? configuration = null)
|
||||
{
|
||||
if (configuration is not null)
|
||||
{
|
||||
services.Configure<WebSocketTelemetryOptions>(configuration.GetSection("OpenJibo:Telemetry"));
|
||||
}
|
||||
|
||||
services.AddSingleton<ICloudStateStore, InMemoryCloudStateStore>();
|
||||
services.AddSingleton<IConversationBroker, DemoConversationBroker>();
|
||||
services.AddSingleton<ISttStrategy, SyntheticBufferedAudioSttStrategy>();
|
||||
services.AddSingleton<ISttStrategySelector, DefaultSttStrategySelector>();
|
||||
services.AddSingleton<IWebSocketTelemetrySink, FileWebSocketTelemetrySink>();
|
||||
services.AddSingleton<ProtocolToTurnContextMapper>();
|
||||
services.AddSingleton<ResponsePlanToSocketMessagesMapper>();
|
||||
services.AddSingleton<WebSocketTurnFinalizationService>();
|
||||
|
||||
@@ -0,0 +1,256 @@
|
||||
using System.Collections.Concurrent;
|
||||
using System.Text.Json;
|
||||
using Jibo.Cloud.Application.Abstractions;
|
||||
using Jibo.Cloud.Domain.Models;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
|
||||
namespace Jibo.Cloud.Infrastructure.Telemetry;
|
||||
|
||||
public sealed class FileWebSocketTelemetrySink(
|
||||
ILogger<FileWebSocketTelemetrySink> logger,
|
||||
IOptions<WebSocketTelemetryOptions> options) : IWebSocketTelemetrySink
|
||||
{
|
||||
private static readonly JsonSerializerOptions JsonOptions = new(JsonSerializerDefaults.Web)
|
||||
{
|
||||
WriteIndented = true
|
||||
};
|
||||
|
||||
private readonly ConcurrentDictionary<string, CapturedWebSocketFixtureBuilder> _fixtures = new(StringComparer.OrdinalIgnoreCase);
|
||||
private readonly SemaphoreSlim _writeLock = new(1, 1);
|
||||
|
||||
public async Task RecordConnectionOpenedAsync(WebSocketMessageEnvelope envelope, CloudSession session, CancellationToken cancellationToken = default)
|
||||
{
|
||||
if (!options.Value.Enabled)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
_fixtures[session.SessionId] = new CapturedWebSocketFixtureBuilder
|
||||
{
|
||||
Session = new CapturedWebSocketFixtureSession
|
||||
{
|
||||
HostName = envelope.HostName,
|
||||
Path = envelope.Path,
|
||||
Kind = envelope.Kind,
|
||||
Token = envelope.Token
|
||||
}
|
||||
};
|
||||
|
||||
await WriteRecordAsync(BuildRecord("connection_opened", envelope, session, null, "internal", null, null), cancellationToken);
|
||||
}
|
||||
|
||||
public Task RecordInboundAsync(WebSocketMessageEnvelope envelope, CloudSession session, string? messageType, CancellationToken cancellationToken = default)
|
||||
{
|
||||
if (!options.Value.Enabled)
|
||||
{
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
return WriteRecordAsync(BuildRecord("message_in", envelope, session, messageType, "in", null, null), cancellationToken);
|
||||
}
|
||||
|
||||
public Task RecordTurnEventAsync(WebSocketMessageEnvelope envelope, CloudSession session, string eventType, IReadOnlyDictionary<string, object?> details, CancellationToken cancellationToken = default)
|
||||
{
|
||||
if (!options.Value.Enabled)
|
||||
{
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
return WriteRecordAsync(BuildRecord(eventType, envelope, session, null, "internal", null, details), cancellationToken);
|
||||
}
|
||||
|
||||
public async Task RecordOutboundAsync(WebSocketMessageEnvelope envelope, CloudSession session, IReadOnlyList<WebSocketReply> replies, CancellationToken cancellationToken = default)
|
||||
{
|
||||
if (!options.Value.Enabled)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var replyTypes = replies
|
||||
.Select(reply => ReadReplyType(reply.Text))
|
||||
.Where(type => !string.IsNullOrWhiteSpace(type))
|
||||
.Select(type => type!)
|
||||
.ToArray();
|
||||
|
||||
await WriteRecordAsync(BuildRecord("message_out", envelope, session, null, "out", replyTypes, null), cancellationToken);
|
||||
|
||||
if (_fixtures.TryGetValue(session.SessionId, out var fixture))
|
||||
{
|
||||
fixture.Steps.Add(new CapturedWebSocketFixtureStep
|
||||
{
|
||||
Text = ParseJsonElement(envelope.Text),
|
||||
Binary = envelope.Binary?.Select(value => (int)value).ToArray(),
|
||||
ExpectedReplyTypes = replyTypes
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
public async Task RecordConnectionClosedAsync(WebSocketMessageEnvelope envelope, CloudSession session, string reason, CancellationToken cancellationToken = default)
|
||||
{
|
||||
if (!options.Value.Enabled)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
await WriteRecordAsync(BuildRecord(
|
||||
"connection_closed",
|
||||
envelope,
|
||||
session,
|
||||
null,
|
||||
"internal",
|
||||
null,
|
||||
new Dictionary<string, object?> { ["reason"] = reason }), cancellationToken);
|
||||
|
||||
if (!options.Value.ExportFixtures || !_fixtures.TryRemove(session.SessionId, out var fixture) || fixture.Steps.Count == 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var fixtureName = BuildFixtureName(session, fixture);
|
||||
var capturedFixture = new CapturedWebSocketFixture
|
||||
{
|
||||
Name = fixtureName,
|
||||
Session = fixture.Session,
|
||||
Steps = [.. fixture.Steps]
|
||||
};
|
||||
|
||||
var fixtureDirectory = Path.Combine(GetBaseDirectory(), "fixtures");
|
||||
Directory.CreateDirectory(fixtureDirectory);
|
||||
var fixturePath = Path.Combine(fixtureDirectory, $"{fixtureName}.flow.json");
|
||||
|
||||
await _writeLock.WaitAsync(cancellationToken);
|
||||
try
|
||||
{
|
||||
await File.WriteAllTextAsync(fixturePath, JsonSerializer.Serialize(capturedFixture, JsonOptions), cancellationToken);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_writeLock.Release();
|
||||
}
|
||||
|
||||
logger.LogInformation("Exported websocket fixture {FixturePath}", fixturePath);
|
||||
}
|
||||
|
||||
private async Task WriteRecordAsync(WebSocketTelemetryRecord record, CancellationToken cancellationToken)
|
||||
{
|
||||
var directory = GetBaseDirectory();
|
||||
Directory.CreateDirectory(directory);
|
||||
var filePath = Path.Combine(directory, $"{DateTimeOffset.UtcNow:yyyyMMdd}.events.ndjson");
|
||||
var line = JsonSerializer.Serialize(record) + Environment.NewLine;
|
||||
|
||||
await _writeLock.WaitAsync(cancellationToken);
|
||||
try
|
||||
{
|
||||
await File.AppendAllTextAsync(filePath, line, cancellationToken);
|
||||
}
|
||||
finally
|
||||
{
|
||||
_writeLock.Release();
|
||||
}
|
||||
|
||||
logger.LogInformation(
|
||||
"WebSocket telemetry {EventType} session={SessionId} transId={TransId} bufferedBytes={BufferedBytes} replyTypes={ReplyTypes}",
|
||||
record.EventType,
|
||||
record.SessionId,
|
||||
record.TransId,
|
||||
record.BufferedAudioBytes,
|
||||
string.Join(",", record.ReplyTypes));
|
||||
}
|
||||
|
||||
private static WebSocketTelemetryRecord BuildRecord(
|
||||
string eventType,
|
||||
WebSocketMessageEnvelope envelope,
|
||||
CloudSession session,
|
||||
string? messageType,
|
||||
string direction,
|
||||
IReadOnlyList<string>? replyTypes,
|
||||
IReadOnlyDictionary<string, object?>? details) => new()
|
||||
{
|
||||
EventType = eventType,
|
||||
SessionId = session.SessionId,
|
||||
ConnectionId = envelope.ConnectionId,
|
||||
Token = envelope.Token,
|
||||
HostName = envelope.HostName,
|
||||
Path = envelope.Path,
|
||||
Kind = envelope.Kind,
|
||||
TransId = session.TurnState.TransId ?? session.LastTransId,
|
||||
MessageType = messageType,
|
||||
Direction = direction,
|
||||
Text = envelope.Text,
|
||||
BinaryLength = envelope.Binary?.Length,
|
||||
ReplyTypes = replyTypes ?? [],
|
||||
BufferedAudioBytes = session.TurnState.BufferedAudioBytes,
|
||||
BufferedAudioChunks = session.TurnState.BufferedAudioChunkCount,
|
||||
FinalizeAttempts = session.TurnState.FinalizeAttemptCount,
|
||||
AwaitingTurnCompletion = session.TurnState.AwaitingTurnCompletion,
|
||||
Details = details ?? new Dictionary<string, object?>()
|
||||
};
|
||||
|
||||
private static string? ReadReplyType(string? text)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(text))
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
using var document = JsonDocument.Parse(text);
|
||||
return document.RootElement.TryGetProperty("type", out var type) && type.ValueKind == JsonValueKind.String
|
||||
? type.GetString()
|
||||
: null;
|
||||
}
|
||||
catch
|
||||
{
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private static JsonElement? ParseJsonElement(string? text)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(text))
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
using var document = JsonDocument.Parse(text);
|
||||
return document.RootElement.Clone();
|
||||
}
|
||||
catch
|
||||
{
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private string GetBaseDirectory()
|
||||
{
|
||||
return Path.GetFullPath(options.Value.DirectoryPath, AppContext.BaseDirectory);
|
||||
}
|
||||
|
||||
private static string BuildFixtureName(CloudSession session, CapturedWebSocketFixtureBuilder fixture)
|
||||
{
|
||||
var host = SanitizeName(fixture.Session.HostName);
|
||||
var kind = SanitizeName(fixture.Session.Kind);
|
||||
var transId = SanitizeName(session.TurnState.TransId ?? session.LastTransId ?? session.SessionId);
|
||||
return $"{host}-{kind}-{transId}";
|
||||
}
|
||||
|
||||
private static string SanitizeName(string value)
|
||||
{
|
||||
var chars = value
|
||||
.ToLowerInvariant()
|
||||
.Select(character => char.IsLetterOrDigit(character) ? character : '-')
|
||||
.ToArray();
|
||||
|
||||
return string.Join(string.Empty, new string(chars).Split('-', StringSplitOptions.RemoveEmptyEntries));
|
||||
}
|
||||
|
||||
private sealed class CapturedWebSocketFixtureBuilder
|
||||
{
|
||||
public CapturedWebSocketFixtureSession Session { get; init; } = new();
|
||||
public List<CapturedWebSocketFixtureStep> Steps { get; } = [];
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,8 @@
|
||||
namespace Jibo.Cloud.Infrastructure.Telemetry;
|
||||
|
||||
public sealed class WebSocketTelemetryOptions
|
||||
{
|
||||
public bool Enabled { get; set; } = true;
|
||||
public bool ExportFixtures { get; set; } = true;
|
||||
public string DirectoryPath { get; set; } = "captures/websocket";
|
||||
}
|
||||
@@ -0,0 +1,75 @@
|
||||
using System.Text.Json;
|
||||
using Jibo.Cloud.Domain.Models;
|
||||
using Jibo.Cloud.Infrastructure.Telemetry;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using Microsoft.Extensions.Options;
|
||||
|
||||
namespace Jibo.Cloud.Tests.WebSockets;
|
||||
|
||||
public sealed class FileWebSocketTelemetrySinkTests : IDisposable
|
||||
{
|
||||
private readonly string _directoryPath;
|
||||
|
||||
public FileWebSocketTelemetrySinkTests()
|
||||
{
|
||||
_directoryPath = Path.Combine(Path.GetTempPath(), "OpenJibo.Tests", Guid.NewGuid().ToString("N"));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task RecordsFixtureOnConnectionClose()
|
||||
{
|
||||
var sink = CreateSink();
|
||||
var envelope = new WebSocketMessageEnvelope
|
||||
{
|
||||
ConnectionId = "conn-1",
|
||||
HostName = "neo-hub.jibo.com",
|
||||
Path = "/listen",
|
||||
Kind = "neo-hub-listen",
|
||||
Token = "token-1",
|
||||
Text = """{"type":"LISTEN","transID":"trans-1","data":{"text":"hello jibo"}}"""
|
||||
};
|
||||
var session = new CloudSession
|
||||
{
|
||||
Token = "token-1",
|
||||
HostName = "neo-hub.jibo.com",
|
||||
Path = "/listen"
|
||||
};
|
||||
session.TurnState.TransId = "trans-1";
|
||||
|
||||
await sink.RecordConnectionOpenedAsync(envelope, session);
|
||||
await sink.RecordInboundAsync(envelope, session, "LISTEN");
|
||||
await sink.RecordOutboundAsync(envelope, session,
|
||||
[
|
||||
new WebSocketReply { Text = """{"type":"LISTEN"}""" },
|
||||
new WebSocketReply { Text = """{"type":"EOS"}""" }
|
||||
]);
|
||||
await sink.RecordConnectionClosedAsync(envelope, session, "test");
|
||||
|
||||
var fixtureDirectory = Path.Combine(_directoryPath, "fixtures");
|
||||
var fixturePath = Directory.GetFiles(fixtureDirectory, "*.flow.json").Single();
|
||||
using var document = JsonDocument.Parse(await File.ReadAllTextAsync(fixturePath));
|
||||
Assert.Equal("neo-hub.jibo.com", document.RootElement.GetProperty("session").GetProperty("hostName").GetString());
|
||||
Assert.Equal(1, document.RootElement.GetProperty("steps").GetArrayLength());
|
||||
Assert.Equal("LISTEN", document.RootElement.GetProperty("steps")[0].GetProperty("expectedReplyTypes")[0].GetString());
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
if (Directory.Exists(_directoryPath))
|
||||
{
|
||||
Directory.Delete(_directoryPath, true);
|
||||
}
|
||||
}
|
||||
|
||||
private FileWebSocketTelemetrySink CreateSink()
|
||||
{
|
||||
return new FileWebSocketTelemetrySink(
|
||||
NullLogger<FileWebSocketTelemetrySink>.Instance,
|
||||
Options.Create(new WebSocketTelemetryOptions
|
||||
{
|
||||
Enabled = true,
|
||||
ExportFixtures = true,
|
||||
DirectoryPath = _directoryPath
|
||||
}));
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,5 @@
|
||||
using System.Text.Json;
|
||||
using Jibo.Cloud.Application.Abstractions;
|
||||
using Jibo.Cloud.Application.Services;
|
||||
using Jibo.Cloud.Domain.Models;
|
||||
using Jibo.Cloud.Infrastructure.Persistence;
|
||||
@@ -24,6 +25,7 @@ public sealed class JiboWebSocketServiceTests
|
||||
|
||||
_service = new JiboWebSocketService(
|
||||
_store,
|
||||
new NullWebSocketTelemetrySink(),
|
||||
new WebSocketTurnFinalizationService(
|
||||
turnContextMapper,
|
||||
conversationBroker,
|
||||
|
||||
Reference in New Issue
Block a user