diff --git a/OpenJibo/docs/live-jibo-capture.md b/OpenJibo/docs/live-jibo-capture.md new file mode 100644 index 0000000..b2ca47c --- /dev/null +++ b/OpenJibo/docs/live-jibo-capture.md @@ -0,0 +1,61 @@ +# Live Jibo Capture + +## Recommendation + +For the first real `.NET cloud -> physical Jibo` runs, use the existing controlled network and routing setup that already works with the Node server. + +Recommended order: + +1. Keep the robot on the known-good Ubuntu laptop based environment. +2. Swap the `.NET` cloud into that same controlled path. +3. Leave the Node oracle available as a fallback on separate ports or on a second machine. +4. Capture real `.NET` websocket traffic and turn it into sanitized fixtures. +5. Only after that, decide what belongs in permanent Azure hosting and IaC. + +This is the lowest-risk path because it changes only one major variable at a time: the cloud implementation. It avoids mixing protocol-parity questions with new infrastructure variables. + +## Why Not Azure First + +Azure remains the target hosting direction, but it is not the best first environment for live robot discovery. + +Reasons: + +- the main unknowns are still protocol and turn-behavior details, not Azure primitives +- keeping Node and `.NET` both available locally makes fallback and side-by-side comparison much easier +- live robot capture is more valuable right now than early CI/CD polish +- region injection and device routing work are easier to debug in a tightly controlled local network + +## When To Move Beyond The Ubuntu Setup + +Move to a second local/staging server or Azure after: + +- startup flows are stable against the physical robot +- websocket turn telemetry is being captured reliably +- several real captured sessions have been sanitized into replay fixtures +- the fallback path to Node is no longer needed for normal testing + +## Telemetry Before Live Runs + +The `.NET` cloud now supports structured websocket capture intended for first live runs: + +- event stream written as NDJSON +- per-session fixture export for replay +- turn metadata including `transID`, buffered audio counts, finalize attempts, and reply types + +Default capture location: + +- `captures/websocket/` + +Artifacts: + +- `*.events.ndjson` +- `fixtures/*.flow.json` + +## Suggested First Hookup Plan + +1. Start the `.NET` API on the Ubuntu-backed controlled network using the same robot routing settings currently used for Node. +2. Confirm HTTP bootstrap and websocket acceptance with the existing smoke/routing helpers. +3. Run one or two controlled listen turns with Jibo. +4. Inspect the captured websocket events and exported fixtures. +5. Convert the best captures into sanitized checked-in fixtures and tests. +6. Keep Node available to compare any surprising turn behavior before changing infrastructure. diff --git a/OpenJibo/scripts/cloud/Get-WebSocketCaptureSummary.ps1 b/OpenJibo/scripts/cloud/Get-WebSocketCaptureSummary.ps1 new file mode 100644 index 0000000..4801f52 --- /dev/null +++ b/OpenJibo/scripts/cloud/Get-WebSocketCaptureSummary.ps1 @@ -0,0 +1,33 @@ +param( + [string]$CaptureDirectory = "..\..\src\Jibo.Cloud\dotnet\src\Jibo.Cloud.Api\bin\Debug\net10.0\captures\websocket" +) + +$resolvedDirectory = Resolve-Path -LiteralPath $CaptureDirectory -ErrorAction Stop +$eventFiles = Get-ChildItem -LiteralPath $resolvedDirectory -Filter *.events.ndjson -File | Sort-Object LastWriteTimeUtc + +if (-not $eventFiles) { + Write-Host "No websocket telemetry event files found in $resolvedDirectory" + exit 0 +} + +$records = foreach ($file in $eventFiles) { + Get-Content -LiteralPath $file.FullName | Where-Object { $_.Trim().Length -gt 0 } | ForEach-Object { + $_ | ConvertFrom-Json + } +} + +$records | + Group-Object EventType | + Sort-Object Name | + Select-Object Name, Count | + Format-Table -AutoSize + +$fixtureDirectory = Join-Path $resolvedDirectory "fixtures" +if (Test-Path -LiteralPath $fixtureDirectory) { + Write-Host "" + Write-Host "Exported websocket fixtures:" + Get-ChildItem -LiteralPath $fixtureDirectory -Filter *.flow.json -File | + Sort-Object LastWriteTimeUtc | + Select-Object LastWriteTimeUtc, Name | + Format-Table -AutoSize +} diff --git a/OpenJibo/scripts/cloud/README.md b/OpenJibo/scripts/cloud/README.md index fb63046..94cf548 100644 --- a/OpenJibo/scripts/cloud/README.md +++ b/OpenJibo/scripts/cloud/README.md @@ -6,3 +6,5 @@ These scripts help exercise the new .NET hosted cloud locally. Runs a few quick HTTP checks against a local OpenJibo cloud instance. - `Invoke-ProtocolFixture.ps1` Replays a sanitized HTTP fixture against a running local instance. +- `Get-WebSocketCaptureSummary.ps1` + Summarizes captured websocket telemetry events and exported live-run fixtures from the .NET cloud. diff --git a/OpenJibo/src/Jibo.Cloud/dotnet/README.md b/OpenJibo/src/Jibo.Cloud/dotnet/README.md index e4c2c0e..53b36ab 100644 --- a/OpenJibo/src/Jibo.Cloud/dotnet/README.md +++ b/OpenJibo/src/Jibo.Cloud/dotnet/README.md @@ -72,6 +72,7 @@ Current websocket scope is still intentionally narrow: - synthetic `LISTEN` result shaping for `LISTEN`, `CLIENT_NLU`, and `CLIENT_ASR` - buffered audio state tracking behind a dedicated turn-finalization layer - synthetic STT strategy selection for fixture-driven audio turn completion +- structured websocket telemetry and live-run fixture export - `CONTEXT` capture and follow-up turn state - `EOS` completion - first skill vertical for joke/chat `SKILL_ACTION` playback diff --git a/OpenJibo/src/Jibo.Cloud/dotnet/src/Jibo.Cloud.Api/Program.cs b/OpenJibo/src/Jibo.Cloud/dotnet/src/Jibo.Cloud.Api/Program.cs index ded659f..72233f3 100644 --- a/OpenJibo/src/Jibo.Cloud/dotnet/src/Jibo.Cloud.Api/Program.cs +++ b/OpenJibo/src/Jibo.Cloud/dotnet/src/Jibo.Cloud.Api/Program.cs @@ -1,12 +1,13 @@ using System.Net.WebSockets; using System.Text; +using Jibo.Cloud.Application.Abstractions; using Jibo.Cloud.Application.Services; using Jibo.Cloud.Domain.Models; using Jibo.Cloud.Infrastructure.DependencyInjection; var builder = WebApplication.CreateBuilder(args); -builder.Services.AddOpenJiboCloud(); +builder.Services.AddOpenJiboCloud(builder.Configuration); var app = builder.Build(); @@ -21,10 +22,21 @@ app.Use(async (context, next) => } var webSocketService = context.RequestServices.GetRequiredService(); + var telemetrySink = context.RequestServices.GetRequiredService(); using var socket = await context.WebSockets.AcceptWebSocketAsync(); var kind = ResolveSocketKind(context.Request.Host.Host, context.Request.Path); var token = ResolveToken(context.Request); + var openEnvelope = new WebSocketMessageEnvelope + { + ConnectionId = Guid.NewGuid().ToString("N"), + HostName = context.Request.Host.Host, + Path = context.Request.Path.Value ?? "/", + Kind = kind, + Token = token + }; + var openSession = ResolveSession(webSocketService, openEnvelope); + await telemetrySink.RecordConnectionOpenedAsync(openEnvelope, openSession, context.RequestAborted); while (socket.State == WebSocketState.Open) { @@ -47,6 +59,8 @@ app.Use(async (context, next) => }; var replies = await webSocketService.HandleMessageAsync(envelope, context.RequestAborted); + var session = ResolveSession(webSocketService, envelope); + await telemetrySink.RecordInboundAsync(envelope, session, ReadMessageType(envelope.Text), context.RequestAborted); foreach (var reply in replies) { if (string.IsNullOrWhiteSpace(reply.Text)) @@ -57,7 +71,20 @@ app.Use(async (context, next) => var payload = Encoding.UTF8.GetBytes(reply.Text); await socket.SendAsync(payload, WebSocketMessageType.Text, true, context.RequestAborted); } + + await telemetrySink.RecordOutboundAsync(envelope, session, replies, context.RequestAborted); } + + var closeEnvelope = new WebSocketMessageEnvelope + { + ConnectionId = Guid.NewGuid().ToString("N"), + HostName = context.Request.Host.Host, + Path = context.Request.Path.Value ?? "/", + Kind = kind, + Token = token + }; + var closeSession = ResolveSession(webSocketService, closeEnvelope); + await telemetrySink.RecordConnectionClosedAsync(closeEnvelope, closeSession, "socket-loop-ended", context.RequestAborted); }); app.MapGet("/health", () => Results.Json(new { ok = true, service = "OpenJibo Cloud Api" })); @@ -167,4 +194,29 @@ static string? ResolveToken(HttpRequest request) return null; } +static string ReadMessageType(string? text) +{ + if (string.IsNullOrWhiteSpace(text)) + { + return "BINARY_OR_EMPTY"; + } + + try + { + using var document = System.Text.Json.JsonDocument.Parse(text); + return document.RootElement.TryGetProperty("type", out var type) && type.ValueKind == System.Text.Json.JsonValueKind.String + ? type.GetString() ?? "UNKNOWN" + : "UNKNOWN"; + } + catch + { + return "TEXT"; + } +} + +static CloudSession ResolveSession(JiboWebSocketService webSocketService, WebSocketMessageEnvelope envelope) +{ + return webSocketService.GetOrCreateSession(envelope); +} + internal sealed record ReceivedSocketMessage(WebSocketMessageType MessageType, byte[] Buffer); diff --git a/OpenJibo/src/Jibo.Cloud/dotnet/src/Jibo.Cloud.Application/Abstractions/IWebSocketTelemetrySink.cs b/OpenJibo/src/Jibo.Cloud/dotnet/src/Jibo.Cloud.Application/Abstractions/IWebSocketTelemetrySink.cs new file mode 100644 index 0000000..495a99a --- /dev/null +++ b/OpenJibo/src/Jibo.Cloud/dotnet/src/Jibo.Cloud.Application/Abstractions/IWebSocketTelemetrySink.cs @@ -0,0 +1,12 @@ +using Jibo.Cloud.Domain.Models; + +namespace Jibo.Cloud.Application.Abstractions; + +public interface IWebSocketTelemetrySink +{ + Task RecordConnectionOpenedAsync(WebSocketMessageEnvelope envelope, CloudSession session, CancellationToken cancellationToken = default); + Task RecordInboundAsync(WebSocketMessageEnvelope envelope, CloudSession session, string? messageType, CancellationToken cancellationToken = default); + Task RecordTurnEventAsync(WebSocketMessageEnvelope envelope, CloudSession session, string eventType, IReadOnlyDictionary details, CancellationToken cancellationToken = default); + Task RecordOutboundAsync(WebSocketMessageEnvelope envelope, CloudSession session, IReadOnlyList replies, CancellationToken cancellationToken = default); + Task RecordConnectionClosedAsync(WebSocketMessageEnvelope envelope, CloudSession session, string reason, CancellationToken cancellationToken = default); +} diff --git a/OpenJibo/src/Jibo.Cloud/dotnet/src/Jibo.Cloud.Application/Services/JiboWebSocketService.cs b/OpenJibo/src/Jibo.Cloud/dotnet/src/Jibo.Cloud.Application/Services/JiboWebSocketService.cs index 83df7fb..64eaf1b 100644 --- a/OpenJibo/src/Jibo.Cloud/dotnet/src/Jibo.Cloud.Application/Services/JiboWebSocketService.cs +++ b/OpenJibo/src/Jibo.Cloud/dotnet/src/Jibo.Cloud.Application/Services/JiboWebSocketService.cs @@ -7,17 +7,28 @@ namespace Jibo.Cloud.Application.Services; public sealed class JiboWebSocketService( ICloudStateStore stateStore, + IWebSocketTelemetrySink telemetrySink, WebSocketTurnFinalizationService turnFinalizationService) { + public CloudSession GetOrCreateSession(WebSocketMessageEnvelope envelope) + { + return stateStore.FindSessionByToken(envelope.Token ?? string.Empty) ?? + stateStore.OpenSession(envelope.Kind, null, envelope.Token, envelope.HostName, envelope.Path); + } + public async Task> HandleMessageAsync(WebSocketMessageEnvelope envelope, CancellationToken cancellationToken = default) { - var session = stateStore.FindSessionByToken(envelope.Token ?? string.Empty) ?? - stateStore.OpenSession(envelope.Kind, null, envelope.Token, envelope.HostName, envelope.Path); + var session = GetOrCreateSession(envelope); session.LastSeenUtc = DateTimeOffset.UtcNow; if (envelope.IsBinary) { - return turnFinalizationService.HandleBinaryAudio(session, envelope); + var replies = turnFinalizationService.HandleBinaryAudio(session, envelope); + await telemetrySink.RecordTurnEventAsync(envelope, session, "binary_audio_received", new Dictionary + { + ["bytes"] = envelope.Binary?.Length ?? 0 + }, cancellationToken); + return replies; } var parsedType = ReadMessageType(envelope.Text); @@ -31,12 +42,25 @@ public sealed class JiboWebSocketService( if (parsedType == "CONTEXT") { - return turnFinalizationService.HandleContext(session, envelope.Text); + var replies = turnFinalizationService.HandleContext(session, envelope.Text); + await telemetrySink.RecordTurnEventAsync(envelope, session, "context_received", new Dictionary + { + ["transID"] = session.TurnState.TransId + }, cancellationToken); + return replies; } if (parsedType is "LISTEN" or "CLIENT_NLU" or "CLIENT_ASR") { - return await turnFinalizationService.HandleTurnAsync(session, envelope, parsedType, cancellationToken); + var replies = await turnFinalizationService.HandleTurnAsync(session, envelope, parsedType, cancellationToken); + await telemetrySink.RecordTurnEventAsync(envelope, session, "turn_processed", new Dictionary + { + ["messageType"] = parsedType, + ["replyCount"] = replies.Count, + ["transcript"] = session.LastTranscript, + ["intent"] = session.LastIntent + }, cancellationToken); + return replies; } return diff --git a/OpenJibo/src/Jibo.Cloud/dotnet/src/Jibo.Cloud.Application/Services/NullWebSocketTelemetrySink.cs b/OpenJibo/src/Jibo.Cloud/dotnet/src/Jibo.Cloud.Application/Services/NullWebSocketTelemetrySink.cs new file mode 100644 index 0000000..ebd237e --- /dev/null +++ b/OpenJibo/src/Jibo.Cloud/dotnet/src/Jibo.Cloud.Application/Services/NullWebSocketTelemetrySink.cs @@ -0,0 +1,13 @@ +using Jibo.Cloud.Application.Abstractions; +using Jibo.Cloud.Domain.Models; + +namespace Jibo.Cloud.Application.Services; + +public sealed class NullWebSocketTelemetrySink : IWebSocketTelemetrySink +{ + public Task RecordConnectionOpenedAsync(WebSocketMessageEnvelope envelope, CloudSession session, CancellationToken cancellationToken = default) => Task.CompletedTask; + public Task RecordInboundAsync(WebSocketMessageEnvelope envelope, CloudSession session, string? messageType, CancellationToken cancellationToken = default) => Task.CompletedTask; + public Task RecordTurnEventAsync(WebSocketMessageEnvelope envelope, CloudSession session, string eventType, IReadOnlyDictionary details, CancellationToken cancellationToken = default) => Task.CompletedTask; + public Task RecordOutboundAsync(WebSocketMessageEnvelope envelope, CloudSession session, IReadOnlyList replies, CancellationToken cancellationToken = default) => Task.CompletedTask; + public Task RecordConnectionClosedAsync(WebSocketMessageEnvelope envelope, CloudSession session, string reason, CancellationToken cancellationToken = default) => Task.CompletedTask; +} diff --git a/OpenJibo/src/Jibo.Cloud/dotnet/src/Jibo.Cloud.Domain/Models/CapturedWebSocketFixture.cs b/OpenJibo/src/Jibo.Cloud/dotnet/src/Jibo.Cloud.Domain/Models/CapturedWebSocketFixture.cs new file mode 100644 index 0000000..b167176 --- /dev/null +++ b/OpenJibo/src/Jibo.Cloud/dotnet/src/Jibo.Cloud.Domain/Models/CapturedWebSocketFixture.cs @@ -0,0 +1,25 @@ +using System.Text.Json; + +namespace Jibo.Cloud.Domain.Models; + +public sealed class CapturedWebSocketFixture +{ + public string Name { get; init; } = string.Empty; + public CapturedWebSocketFixtureSession Session { get; init; } = new(); + public IReadOnlyList Steps { get; init; } = []; +} + +public sealed class CapturedWebSocketFixtureSession +{ + public string HostName { get; init; } = string.Empty; + public string Path { get; init; } = "/"; + public string Kind { get; init; } = "unknown"; + public string? Token { get; init; } +} + +public sealed class CapturedWebSocketFixtureStep +{ + public JsonElement? Text { get; init; } + public IReadOnlyList? Binary { get; init; } + public IReadOnlyList ExpectedReplyTypes { get; init; } = []; +} diff --git a/OpenJibo/src/Jibo.Cloud/dotnet/src/Jibo.Cloud.Domain/Models/WebSocketTelemetryRecord.cs b/OpenJibo/src/Jibo.Cloud/dotnet/src/Jibo.Cloud.Domain/Models/WebSocketTelemetryRecord.cs new file mode 100644 index 0000000..6eb0779 --- /dev/null +++ b/OpenJibo/src/Jibo.Cloud/dotnet/src/Jibo.Cloud.Domain/Models/WebSocketTelemetryRecord.cs @@ -0,0 +1,24 @@ +namespace Jibo.Cloud.Domain.Models; + +public sealed class WebSocketTelemetryRecord +{ + public DateTimeOffset TimestampUtc { get; init; } = DateTimeOffset.UtcNow; + public string EventType { get; init; } = string.Empty; + public string SessionId { get; init; } = string.Empty; + public string ConnectionId { get; init; } = string.Empty; + public string? Token { get; init; } + public string HostName { get; init; } = string.Empty; + public string Path { get; init; } = "/"; + public string Kind { get; init; } = "unknown"; + public string? TransId { get; init; } + public string? MessageType { get; init; } + public string Direction { get; init; } = "internal"; + public string? Text { get; init; } + public int? BinaryLength { get; init; } + public IReadOnlyList ReplyTypes { get; init; } = []; + public int BufferedAudioBytes { get; init; } + public int BufferedAudioChunks { get; init; } + public int FinalizeAttempts { get; init; } + public bool AwaitingTurnCompletion { get; init; } + public IReadOnlyDictionary Details { get; init; } = new Dictionary(StringComparer.OrdinalIgnoreCase); +} diff --git a/OpenJibo/src/Jibo.Cloud/dotnet/src/Jibo.Cloud.Infrastructure/DependencyInjection/ServiceCollectionExtensions.cs b/OpenJibo/src/Jibo.Cloud/dotnet/src/Jibo.Cloud.Infrastructure/DependencyInjection/ServiceCollectionExtensions.cs index 5122a0e..31b9810 100644 --- a/OpenJibo/src/Jibo.Cloud/dotnet/src/Jibo.Cloud.Infrastructure/DependencyInjection/ServiceCollectionExtensions.cs +++ b/OpenJibo/src/Jibo.Cloud/dotnet/src/Jibo.Cloud.Infrastructure/DependencyInjection/ServiceCollectionExtensions.cs @@ -1,19 +1,27 @@ using Jibo.Cloud.Application.Abstractions; using Jibo.Cloud.Application.Services; using Jibo.Cloud.Infrastructure.Persistence; +using Jibo.Cloud.Infrastructure.Telemetry; using Jibo.Runtime.Abstractions; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Configuration; namespace Jibo.Cloud.Infrastructure.DependencyInjection; public static class ServiceCollectionExtensions { - public static IServiceCollection AddOpenJiboCloud(this IServiceCollection services) + public static IServiceCollection AddOpenJiboCloud(this IServiceCollection services, IConfiguration? configuration = null) { + if (configuration is not null) + { + services.Configure(configuration.GetSection("OpenJibo:Telemetry")); + } + services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); + services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); diff --git a/OpenJibo/src/Jibo.Cloud/dotnet/src/Jibo.Cloud.Infrastructure/Telemetry/FileWebSocketTelemetrySink.cs b/OpenJibo/src/Jibo.Cloud/dotnet/src/Jibo.Cloud.Infrastructure/Telemetry/FileWebSocketTelemetrySink.cs new file mode 100644 index 0000000..fc9609f --- /dev/null +++ b/OpenJibo/src/Jibo.Cloud/dotnet/src/Jibo.Cloud.Infrastructure/Telemetry/FileWebSocketTelemetrySink.cs @@ -0,0 +1,256 @@ +using System.Collections.Concurrent; +using System.Text.Json; +using Jibo.Cloud.Application.Abstractions; +using Jibo.Cloud.Domain.Models; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +namespace Jibo.Cloud.Infrastructure.Telemetry; + +public sealed class FileWebSocketTelemetrySink( + ILogger logger, + IOptions options) : IWebSocketTelemetrySink +{ + private static readonly JsonSerializerOptions JsonOptions = new(JsonSerializerDefaults.Web) + { + WriteIndented = true + }; + + private readonly ConcurrentDictionary _fixtures = new(StringComparer.OrdinalIgnoreCase); + private readonly SemaphoreSlim _writeLock = new(1, 1); + + public async Task RecordConnectionOpenedAsync(WebSocketMessageEnvelope envelope, CloudSession session, CancellationToken cancellationToken = default) + { + if (!options.Value.Enabled) + { + return; + } + + _fixtures[session.SessionId] = new CapturedWebSocketFixtureBuilder + { + Session = new CapturedWebSocketFixtureSession + { + HostName = envelope.HostName, + Path = envelope.Path, + Kind = envelope.Kind, + Token = envelope.Token + } + }; + + await WriteRecordAsync(BuildRecord("connection_opened", envelope, session, null, "internal", null, null), cancellationToken); + } + + public Task RecordInboundAsync(WebSocketMessageEnvelope envelope, CloudSession session, string? messageType, CancellationToken cancellationToken = default) + { + if (!options.Value.Enabled) + { + return Task.CompletedTask; + } + + return WriteRecordAsync(BuildRecord("message_in", envelope, session, messageType, "in", null, null), cancellationToken); + } + + public Task RecordTurnEventAsync(WebSocketMessageEnvelope envelope, CloudSession session, string eventType, IReadOnlyDictionary details, CancellationToken cancellationToken = default) + { + if (!options.Value.Enabled) + { + return Task.CompletedTask; + } + + return WriteRecordAsync(BuildRecord(eventType, envelope, session, null, "internal", null, details), cancellationToken); + } + + public async Task RecordOutboundAsync(WebSocketMessageEnvelope envelope, CloudSession session, IReadOnlyList replies, CancellationToken cancellationToken = default) + { + if (!options.Value.Enabled) + { + return; + } + + var replyTypes = replies + .Select(reply => ReadReplyType(reply.Text)) + .Where(type => !string.IsNullOrWhiteSpace(type)) + .Select(type => type!) + .ToArray(); + + await WriteRecordAsync(BuildRecord("message_out", envelope, session, null, "out", replyTypes, null), cancellationToken); + + if (_fixtures.TryGetValue(session.SessionId, out var fixture)) + { + fixture.Steps.Add(new CapturedWebSocketFixtureStep + { + Text = ParseJsonElement(envelope.Text), + Binary = envelope.Binary?.Select(value => (int)value).ToArray(), + ExpectedReplyTypes = replyTypes + }); + } + } + + public async Task RecordConnectionClosedAsync(WebSocketMessageEnvelope envelope, CloudSession session, string reason, CancellationToken cancellationToken = default) + { + if (!options.Value.Enabled) + { + return; + } + + await WriteRecordAsync(BuildRecord( + "connection_closed", + envelope, + session, + null, + "internal", + null, + new Dictionary { ["reason"] = reason }), cancellationToken); + + if (!options.Value.ExportFixtures || !_fixtures.TryRemove(session.SessionId, out var fixture) || fixture.Steps.Count == 0) + { + return; + } + + var fixtureName = BuildFixtureName(session, fixture); + var capturedFixture = new CapturedWebSocketFixture + { + Name = fixtureName, + Session = fixture.Session, + Steps = [.. fixture.Steps] + }; + + var fixtureDirectory = Path.Combine(GetBaseDirectory(), "fixtures"); + Directory.CreateDirectory(fixtureDirectory); + var fixturePath = Path.Combine(fixtureDirectory, $"{fixtureName}.flow.json"); + + await _writeLock.WaitAsync(cancellationToken); + try + { + await File.WriteAllTextAsync(fixturePath, JsonSerializer.Serialize(capturedFixture, JsonOptions), cancellationToken); + } + finally + { + _writeLock.Release(); + } + + logger.LogInformation("Exported websocket fixture {FixturePath}", fixturePath); + } + + private async Task WriteRecordAsync(WebSocketTelemetryRecord record, CancellationToken cancellationToken) + { + var directory = GetBaseDirectory(); + Directory.CreateDirectory(directory); + var filePath = Path.Combine(directory, $"{DateTimeOffset.UtcNow:yyyyMMdd}.events.ndjson"); + var line = JsonSerializer.Serialize(record) + Environment.NewLine; + + await _writeLock.WaitAsync(cancellationToken); + try + { + await File.AppendAllTextAsync(filePath, line, cancellationToken); + } + finally + { + _writeLock.Release(); + } + + logger.LogInformation( + "WebSocket telemetry {EventType} session={SessionId} transId={TransId} bufferedBytes={BufferedBytes} replyTypes={ReplyTypes}", + record.EventType, + record.SessionId, + record.TransId, + record.BufferedAudioBytes, + string.Join(",", record.ReplyTypes)); + } + + private static WebSocketTelemetryRecord BuildRecord( + string eventType, + WebSocketMessageEnvelope envelope, + CloudSession session, + string? messageType, + string direction, + IReadOnlyList? replyTypes, + IReadOnlyDictionary? details) => new() + { + EventType = eventType, + SessionId = session.SessionId, + ConnectionId = envelope.ConnectionId, + Token = envelope.Token, + HostName = envelope.HostName, + Path = envelope.Path, + Kind = envelope.Kind, + TransId = session.TurnState.TransId ?? session.LastTransId, + MessageType = messageType, + Direction = direction, + Text = envelope.Text, + BinaryLength = envelope.Binary?.Length, + ReplyTypes = replyTypes ?? [], + BufferedAudioBytes = session.TurnState.BufferedAudioBytes, + BufferedAudioChunks = session.TurnState.BufferedAudioChunkCount, + FinalizeAttempts = session.TurnState.FinalizeAttemptCount, + AwaitingTurnCompletion = session.TurnState.AwaitingTurnCompletion, + Details = details ?? new Dictionary() + }; + + private static string? ReadReplyType(string? text) + { + if (string.IsNullOrWhiteSpace(text)) + { + return null; + } + + try + { + using var document = JsonDocument.Parse(text); + return document.RootElement.TryGetProperty("type", out var type) && type.ValueKind == JsonValueKind.String + ? type.GetString() + : null; + } + catch + { + return null; + } + } + + private static JsonElement? ParseJsonElement(string? text) + { + if (string.IsNullOrWhiteSpace(text)) + { + return null; + } + + try + { + using var document = JsonDocument.Parse(text); + return document.RootElement.Clone(); + } + catch + { + return null; + } + } + + private string GetBaseDirectory() + { + return Path.GetFullPath(options.Value.DirectoryPath, AppContext.BaseDirectory); + } + + private static string BuildFixtureName(CloudSession session, CapturedWebSocketFixtureBuilder fixture) + { + var host = SanitizeName(fixture.Session.HostName); + var kind = SanitizeName(fixture.Session.Kind); + var transId = SanitizeName(session.TurnState.TransId ?? session.LastTransId ?? session.SessionId); + return $"{host}-{kind}-{transId}"; + } + + private static string SanitizeName(string value) + { + var chars = value + .ToLowerInvariant() + .Select(character => char.IsLetterOrDigit(character) ? character : '-') + .ToArray(); + + return string.Join(string.Empty, new string(chars).Split('-', StringSplitOptions.RemoveEmptyEntries)); + } + + private sealed class CapturedWebSocketFixtureBuilder + { + public CapturedWebSocketFixtureSession Session { get; init; } = new(); + public List Steps { get; } = []; + } +} diff --git a/OpenJibo/src/Jibo.Cloud/dotnet/src/Jibo.Cloud.Infrastructure/Telemetry/WebSocketTelemetryOptions.cs b/OpenJibo/src/Jibo.Cloud/dotnet/src/Jibo.Cloud.Infrastructure/Telemetry/WebSocketTelemetryOptions.cs new file mode 100644 index 0000000..5f03e30 --- /dev/null +++ b/OpenJibo/src/Jibo.Cloud/dotnet/src/Jibo.Cloud.Infrastructure/Telemetry/WebSocketTelemetryOptions.cs @@ -0,0 +1,8 @@ +namespace Jibo.Cloud.Infrastructure.Telemetry; + +public sealed class WebSocketTelemetryOptions +{ + public bool Enabled { get; set; } = true; + public bool ExportFixtures { get; set; } = true; + public string DirectoryPath { get; set; } = "captures/websocket"; +} diff --git a/OpenJibo/tests/Jibo.Cloud.Tests/WebSockets/FileWebSocketTelemetrySinkTests.cs b/OpenJibo/tests/Jibo.Cloud.Tests/WebSockets/FileWebSocketTelemetrySinkTests.cs new file mode 100644 index 0000000..651d48f --- /dev/null +++ b/OpenJibo/tests/Jibo.Cloud.Tests/WebSockets/FileWebSocketTelemetrySinkTests.cs @@ -0,0 +1,75 @@ +using System.Text.Json; +using Jibo.Cloud.Domain.Models; +using Jibo.Cloud.Infrastructure.Telemetry; +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; + +namespace Jibo.Cloud.Tests.WebSockets; + +public sealed class FileWebSocketTelemetrySinkTests : IDisposable +{ + private readonly string _directoryPath; + + public FileWebSocketTelemetrySinkTests() + { + _directoryPath = Path.Combine(Path.GetTempPath(), "OpenJibo.Tests", Guid.NewGuid().ToString("N")); + } + + [Fact] + public async Task RecordsFixtureOnConnectionClose() + { + var sink = CreateSink(); + var envelope = new WebSocketMessageEnvelope + { + ConnectionId = "conn-1", + HostName = "neo-hub.jibo.com", + Path = "/listen", + Kind = "neo-hub-listen", + Token = "token-1", + Text = """{"type":"LISTEN","transID":"trans-1","data":{"text":"hello jibo"}}""" + }; + var session = new CloudSession + { + Token = "token-1", + HostName = "neo-hub.jibo.com", + Path = "/listen" + }; + session.TurnState.TransId = "trans-1"; + + await sink.RecordConnectionOpenedAsync(envelope, session); + await sink.RecordInboundAsync(envelope, session, "LISTEN"); + await sink.RecordOutboundAsync(envelope, session, + [ + new WebSocketReply { Text = """{"type":"LISTEN"}""" }, + new WebSocketReply { Text = """{"type":"EOS"}""" } + ]); + await sink.RecordConnectionClosedAsync(envelope, session, "test"); + + var fixtureDirectory = Path.Combine(_directoryPath, "fixtures"); + var fixturePath = Directory.GetFiles(fixtureDirectory, "*.flow.json").Single(); + using var document = JsonDocument.Parse(await File.ReadAllTextAsync(fixturePath)); + Assert.Equal("neo-hub.jibo.com", document.RootElement.GetProperty("session").GetProperty("hostName").GetString()); + Assert.Equal(1, document.RootElement.GetProperty("steps").GetArrayLength()); + Assert.Equal("LISTEN", document.RootElement.GetProperty("steps")[0].GetProperty("expectedReplyTypes")[0].GetString()); + } + + public void Dispose() + { + if (Directory.Exists(_directoryPath)) + { + Directory.Delete(_directoryPath, true); + } + } + + private FileWebSocketTelemetrySink CreateSink() + { + return new FileWebSocketTelemetrySink( + NullLogger.Instance, + Options.Create(new WebSocketTelemetryOptions + { + Enabled = true, + ExportFixtures = true, + DirectoryPath = _directoryPath + })); + } +} diff --git a/OpenJibo/tests/Jibo.Cloud.Tests/WebSockets/JiboWebSocketServiceTests.cs b/OpenJibo/tests/Jibo.Cloud.Tests/WebSockets/JiboWebSocketServiceTests.cs index c0e03a4..5c911d6 100644 --- a/OpenJibo/tests/Jibo.Cloud.Tests/WebSockets/JiboWebSocketServiceTests.cs +++ b/OpenJibo/tests/Jibo.Cloud.Tests/WebSockets/JiboWebSocketServiceTests.cs @@ -1,4 +1,5 @@ using System.Text.Json; +using Jibo.Cloud.Application.Abstractions; using Jibo.Cloud.Application.Services; using Jibo.Cloud.Domain.Models; using Jibo.Cloud.Infrastructure.Persistence; @@ -24,6 +25,7 @@ public sealed class JiboWebSocketServiceTests _service = new JiboWebSocketService( _store, + new NullWebSocketTelemetrySink(), new WebSocketTurnFinalizationService( turnContextMapper, conversationBroker,