Add capture index manifest for group testing

This commit is contained in:
Jacob Dubin
2026-05-17 14:07:56 -05:00
parent c0485da46d
commit 14b5cb74cc
10 changed files with 198 additions and 17 deletions

View File

@@ -0,0 +1,48 @@
using System.Collections.Concurrent;
using System.Text.Json;
namespace Jibo.Cloud.Infrastructure.Telemetry;
internal static class CaptureIndexWriter
{
private static readonly JsonSerializerOptions JsonOptions = new(JsonSerializerDefaults.Web)
{
WriteIndented = false
};
private static readonly ConcurrentDictionary<string, SemaphoreSlim> DirectoryLocks = new(StringComparer.OrdinalIgnoreCase);
public static async Task AppendAsync(
string directoryPath,
string sinkName,
string eventType,
IReadOnlyDictionary<string, object?> details,
CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(directoryPath)) return;
var directory = Path.GetFullPath(directoryPath);
Directory.CreateDirectory(directory);
var indexPath = Path.Combine(directory, "capture-index.ndjson");
var payload = new
{
capturedUtc = DateTimeOffset.UtcNow,
sink = sinkName,
eventType,
details
};
var line = JsonSerializer.Serialize(payload, JsonOptions) + Environment.NewLine;
var gate = DirectoryLocks.GetOrAdd(directory, static _ => new SemaphoreSlim(1, 1));
await gate.WaitAsync(cancellationToken);
try
{
await File.AppendAllTextAsync(indexPath, line, cancellationToken);
}
finally
{
gate.Release();
}
}
}

View File

@@ -64,6 +64,23 @@ public sealed class FileProtocolTelemetrySink(
_writeLock.Release();
}
await CaptureIndexWriter.AppendAsync(
directory,
"http",
"protocol_record",
new Dictionary<string, object?>
{
["method"] = envelope.Method,
["host"] = envelope.HostName,
["path"] = envelope.Path,
["servicePrefix"] = envelope.ServicePrefix,
["operation"] = envelope.Operation,
["statusCode"] = result.StatusCode,
["contentType"] = result.ContentType,
["requestId"] = envelope.RequestId
},
cancellationToken);
logger.LogInformation(
"HTTP telemetry {Method} {Host}{Path} target={Target} status={StatusCode}",
envelope.Method,
@@ -72,4 +89,4 @@ public sealed class FileProtocolTelemetrySink(
$"{envelope.ServicePrefix}.{envelope.Operation}".Trim('.'),
result.StatusCode);
}
}
}

View File

@@ -21,7 +21,7 @@ public sealed class FileTurnTelemetrySink(
{
if (!options.Value.Enabled) return;
await WriteEventAsync(new
await WriteEventAsync(category, new
{
Type = category,
Details = details
@@ -32,7 +32,7 @@ public sealed class FileTurnTelemetrySink(
{
if (!options.Value.Enabled) return;
await WriteEventAsync(new
await WriteEventAsync("transcript_error", new
{
Exception = ex.ToString(),
Message = message,
@@ -40,7 +40,7 @@ public sealed class FileTurnTelemetrySink(
}, "Turn telemetry error", LogLevel.Error, cancellationToken);
}
private async Task WriteEventAsync(object payload, string logMessage, LogLevel level,
private async Task WriteEventAsync(string eventType, object payload, string logMessage, LogLevel level,
CancellationToken cancellationToken)
{
var directory = GetBaseDirectory();
@@ -58,6 +58,17 @@ public sealed class FileTurnTelemetrySink(
_writeLock.Release();
}
await CaptureIndexWriter.AppendAsync(
directory,
"turn",
eventType,
new Dictionary<string, object?>
{
["message"] = logMessage,
["level"] = level.ToString()
},
cancellationToken);
logger.Log(level, "{LogMessage} {Payload}", logMessage, payload);
}
@@ -68,4 +79,4 @@ public sealed class FileTurnTelemetrySink(
Directory.GetCurrentDirectory(),
AppContext.BaseDirectory);
}
}
}

View File

@@ -39,15 +39,20 @@ public sealed class FileWebSocketTelemetrySink(
await WriteRecordAsync(BuildRecord("connection_opened", envelope, session, null, "internal", null, null),
cancellationToken);
await AppendIndexAsync(envelope, session, "connection_opened", null, cancellationToken);
}
public Task RecordInboundAsync(WebSocketMessageEnvelope envelope, CloudSession session, string? messageType,
public async Task RecordInboundAsync(WebSocketMessageEnvelope envelope, CloudSession session, string? messageType,
CancellationToken cancellationToken = default)
{
return !options.Value.Enabled
? Task.CompletedTask
: WriteRecordAsync(BuildRecord("message_in", envelope, session, messageType, "in", null, null),
cancellationToken);
if (!options.Value.Enabled) return;
await WriteRecordAsync(BuildRecord("message_in", envelope, session, messageType, "in", null, null),
cancellationToken);
await AppendIndexAsync(envelope, session, "message_in", new Dictionary<string, object?>
{
["messageType"] = messageType
}, cancellationToken);
}
public Task RecordTurnEventAsync(WebSocketMessageEnvelope envelope, CloudSession session, string eventType,
@@ -72,6 +77,10 @@ public sealed class FileWebSocketTelemetrySink(
await WriteRecordAsync(BuildRecord("message_out", envelope, session, null, "out", replyTypes, null),
cancellationToken);
await AppendIndexAsync(envelope, session, "message_out", new Dictionary<string, object?>
{
["replyTypes"] = replyTypes
}, cancellationToken);
if (_fixtures.TryGetValue(session.SessionId, out var fixture))
fixture.Steps.Add(new CapturedWebSocketFixtureStep
@@ -95,6 +104,10 @@ public sealed class FileWebSocketTelemetrySink(
"internal",
null,
new Dictionary<string, object?> { ["reason"] = reason }), cancellationToken);
await AppendIndexAsync(envelope, session, "connection_closed", new Dictionary<string, object?>
{
["reason"] = reason
}, cancellationToken);
if (!options.Value.ExportFixtures || !_fixtures.TryRemove(session.SessionId, out var fixture) ||
fixture.Steps.Count == 0) return;
@@ -122,6 +135,13 @@ public sealed class FileWebSocketTelemetrySink(
_writeLock.Release();
}
await AppendIndexAsync(envelope, session, "fixture_export", new Dictionary<string, object?>
{
["fixturePath"] = fixturePath,
["stepCount"] = fixture.Steps.Count,
["fixtureName"] = fixtureName
}, cancellationToken);
logger.LogInformation("Exported websocket fixture {FixturePath}", fixturePath);
}
@@ -223,6 +243,31 @@ public sealed class FileWebSocketTelemetrySink(
AppContext.BaseDirectory);
}
private async Task AppendIndexAsync(
WebSocketMessageEnvelope envelope,
CloudSession session,
string eventType,
IReadOnlyDictionary<string, object?>? details,
CancellationToken cancellationToken)
{
var directory = GetBaseDirectory();
await CaptureIndexWriter.AppendAsync(
directory,
"websocket",
eventType,
new Dictionary<string, object?>
{
["sessionId"] = session.SessionId,
["hostName"] = envelope.HostName,
["path"] = envelope.Path,
["kind"] = envelope.Kind,
["token"] = envelope.Token,
["transId"] = session.TurnState.TransId ?? session.LastTransId,
["details"] = details
},
cancellationToken);
}
private static string BuildFixtureName(CloudSession session, CapturedWebSocketFixtureBuilder fixture)
{
var host = SanitizeName(fixture.Session.HostName);
@@ -246,4 +291,4 @@ public sealed class FileWebSocketTelemetrySink(
public CapturedWebSocketFixtureSession Session { get; init; } = new();
public List<CapturedWebSocketFixtureStep> Steps { get; } = [];
}
}
}