Sync vs Async Audit — Virtufin C# Projects¶
Originally generated 2026-06-10. Relocated from
virtufin-common/docs/sync-async-review.mdto this spec on 2026-06-11. Coversvirtufin-api,virtufin-websocketmanager,virtufin-workmanager.Resolution status legend: ✅ Resolved · 🔄 Resolved via breaking change · ⏭ Deferred (low impact, documented)
Related specs: - Proto-to-Client Mapping — documents the per-language stub and wrapper conventions that determine whether a C# method is
X(sync) orXAsync(async) at the gRPC plugin layer
1. virtufin-api¶
Server Classes¶
| Class | Method | Returns | Type | Justification | .NET Pattern? |
|---|---|---|---|---|---|
| GatewayService | Invoke | Task<InvokeResponse> | Async (I/O) | Dynamic gRPC downstream proxy | ✅ |
| GatewayService | InvokeJson | Task<InvokeJsonResponse> | Async (I/O) | Same pattern — JSON passthrough | ✅ |
| GatewayService | ListServices | Task<ListServicesResponse> | Sync (In-memory) | Task.FromResult (pure config lookup, no I/O) |
✅ |
| GatewayService | ListMethods | Task<ListMethodsResponse> | Async (I/O) | gRPC reflection + local cache | ✅ |
| GatewayService | Subscribe | Task (streaming) | Async (I/O) | Dapr subscriptions + Channel<Event> |
✅ |
| GatewayService | GetMethodSchema | Task<GetMethodSchemaResponse> | Async (I/O + in-memory) | Reflection cache + descriptor parsing | ✅ |
| ConfigService | ListServices | Task<ListConfigServicesResponse> | Sync (In-memory) | Task.FromResult — correct pattern |
✅ |
| ConfigService | GetService | Task<GetConfigServiceResponse> | Sync (In-memory) | Task.FromResult — correct |
✅ |
| PubsubService | PublishEvent | Task<PublishResponse> | Async (I/O) | Dapr pub/sub I/O | ✅ |
| PubsubService | Subscribe | Task (streaming) | Async (I/O) | Dapr subscription + channel streaming | ✅ |
| PubsubService | Unsubscribe | Task<UnsubscribeResponse> | Sync (In-memory) | Task.FromResult (sync UnsubscribeById call) |
✅ |
| StateService | SaveState | Task<SaveStateResponse> | Async (I/O) | Dapr state + pub/sub | ✅ |
| StateService | GetState | Task<GetStateResponse> | Async (I/O) | Dapr state read | ✅ |
| StateService | GetAllState | Task<GetAllStateResponse> | Async (I/O) | Dapr bulk state read | ✅ |
| StateService | RegisterKeys | Task<RegisterKeysResponse> | Sync (In-memory) | Task.FromResult (in-memory key tracking, no I/O) |
✅ |
| StateService | DeleteState | Task<DeleteStateResponse> | Async (I/O) | Dapr state delete + pub/sub | ✅ |
| GrpcReflectionService | GetMethodsAsync | Task<List<ServiceMethodInfo>> | Async (I/O + cache) | gRPC reflection protocol streaming | ✅ |
| GrpcReflectionService | GetMethodSchemaAsync | Task<MethodSchema?> | Async (cache + in-memory) | Lazy-load + descriptor parse | ✅ |
| GrpcReflectionService | ExecuteCallAsync (JSON) | Task<GrpcCallResult> | Async (I/O) | Dynamic unary gRPC via reflection | ✅ |
| GrpcReflectionService | ExecuteCallAsync (bytes) | Task<GrpcCallResult> | Async (I/O) | Raw protobuf passthrough | ✅ |
| GrpcReflectionService | GetFileDescriptorProtosAsync | Task<List<ByteString>> | Async (I/O + cache) | Reflection service | ✅ |
| GrpcReflectionService | InvalidateCache | void | Sync (In-memory) | ConcurrentDictionary.TryRemove |
✅ |
| GrpcReflectionService | InvalidateAllCaches | void | Sync (In-memory) | ConcurrentDictionary.Clear |
✅ |
| GrpcChannelPool | GetChannel | GrpcChannel | Sync (In-memory) | ConcurrentDictionary.GetOrAdd |
✅ |
| GrpcChannelPool | ValidateReflection | void | Async (I/O + stream awaits) | await call.RequestStream.WriteAsync(...) + await call.ResponseStream.MoveNext(cts.Token) — true async over the gRPC reflection stream |
✅ |
| GrpcChannelPool | Dispose | void | Sync (Cleanup) | Channel disposal | ✅ |
| PubsubSubscriptionManager | Subscribe / UnsubscribeById / BroadcastToTopicAsync | — | Mixed | In-memory dict + async dispatch via ChannelWriter.TryWrite |
✅ |
| SubscriptionManagerBase | Subscribe / Unsubscribe / BroadcastAsync | — | Mixed | Generic pattern; BroadcastAsync fans out via Task.WhenAll |
✅ |
| SubscriptionHealthSweeper | ExecuteAsync | Task | Async (timer loop) | Task.Delay + sync cleanup via TryWrite heartbeats |
✅ |
| DaprResiliencePipeline | ExecuteAsync<T> / ExecuteAsync | Task<T> / Task | Async (Polly) | Polly retry + circuit breaker | ✅ |
| DaprCircuitBreakerHealthCheck | CheckHealthAsync | Task<HealthCheckResult> | Sync (wrapped) | Task.FromResult — reads CircuitBroken |
✅ |
| DaprHealthCheck | CheckHealthAsync | Task<HealthCheckResult> | Async (I/O) | _daprClient.CheckHealthAsync() |
✅ |
| GrpcChannelPoolHealthCheck | CheckHealthAsync | Task<HealthCheckResult> | Sync (wrapped) | Task.FromResult — reads counts |
✅ |
| ServicesConfigurationLoader | LoadAsync | Task<ServicesConfiguration> | Async (I/O) | await File.ReadAllTextAsync(...) + await JsonSerializer.Deserialize(...) — true async I/O. D9 resolved; DI factory at startup calls .GetAwaiter().GetResult() at the single startup point (acceptable per audit). |
✅ |
Client Classes¶
| Class | Method | Returns | Type | .NET Pattern? |
|---|---|---|---|---|
| ApiClient | ListServicesAsync / ListMethodsAsync / InvokeAsync | Task<T> | Async (I/O) | ✅ |
| ApiClient | Close / Dispose | void | Sync (Cleanup) | ✅ |
| ApiClient | CloseAsync / DisposeAsync | Task / ValueTask | Async (I/O) | ✅ |
| GatewayClient | ListServicesAsync / ListMethodsAsync / InvokeAsync | Task<T> | Async (I/O) | ✅ |
| GatewayClient | SaveStateAsync / DeleteStateAsync / GetStateAsync / GetAllStateAsync / PublishEventAsync | Task<T> | Async (I/O) — passes CancellationToken | ✅ |
| GatewayClient | UnsubscribeFromTopic | UnsubscribeResponse | Sync (blocking gRPC) | 🔄 → removed (B4) |
| GatewayClient | UnsubscribeFromTopicAsync | Task<UnsubscribeResponse> | Async (I/O) | ✅ |
| GatewayClient | Subscribe / SubscribeToTopic | AsyncServerStreamingCall | Sync (returns handle) | ✅ |
| GatewayClient | SubscribeAsync | Task | Async (I/O) | ✅ |
| ServiceClient | InvokeAsync | Task<Dictionary> | Async (I/O) | ✅ |
| ServiceClient | TryGetMember | bool | Sync (In-memory) | ✅ |
| StreamEventHandler / IStreamEventHandler | OnEvent / OnError | void | Sync (In-memory) | ✅ |
2. virtufin-websocketmanager¶
Server Classes¶
| Class | Method | Returns | Type | Justification | .NET Pattern? |
|---|---|---|---|---|---|
| IWebSocketService | ConnectAsync | Task<WebSocketConnection> | Async (I/O) | Dapr + WebSocket connect | ✅ |
| IWebSocketService | ListConnectionsAsync / DisconnectAsync / StartPublishAsync / StopPublishAsync / SendAsync / SendRawAsync | Task<T> / Task | Async (I/O) | Dapr I/O or WebSocket I/O | ✅ |
| WebSocketManagerGrpcService | Connect / List / Disconnect / StartPublish / StopPublish / Send / SendRaw | Task<T> | Async (I/O) | Delegates to _webSocketService — maps exceptions to RpcStatus |
✅ |
| WebSocketClientWrapper | ConnectAsync / DisconnectAsync / SendAsync | Task | Async (I/O) | ClientWebSocket I/O | ✅ |
| WebSocketClientWrapper | SendAndWaitAsync | Task<byte[]> | Async (I/O) | WebSocket send + correlation timeout | ✅ |
| WebSocketClientWrapper | StartReceiveLoop | void | Fire-and-forget (observed) | Task.Run background receive loop — B1 resolution: ContinueWith(OnlyOnFaulted) attached for exception logging so fire-and-forget is no longer silent. |
✅ |
| DaprConnectionRepository | GetAllAsync / GetAsync / SaveAsync / DeleteAsync / GetByInstanceIdAsync / ClearAllAsync | Task<T> / Task | Async (I/O) | Dapr state store I/O | ✅ |
| DistributedWebSocketConnectionStore | CreateConnectionAsync | ValueTask<WebSocketConnection> | Sync (wrapped) | D6 resolution: signature changed from Task<...> to ValueTask<...> returning ValueTask.FromResult — avoids the Task allocation for the sync-completion path. Interface-mandated (per user policy, left as-is). |
✅ |
| DistributedWebSocketConnectionStore | GetConnectionAsync / GetAllConnectionsAsync / UpdateConnectionAsync / RemoveConnectionAsync / ClearAllConnectionsAsync | Task<T> / Task | Async (I/O) | Local ConcurrentDictionary + Dapr persistence |
✅ |
| ConnectionReclaimerHostedService | ExecuteAsync | Task | Async (I/O) | BackgroundService loop — 30s poll, Dapr I/O |
✅ |
| DaprPublisher | PublishAsync | Task | Async (I/O) | Dapr pub/sub I/O | ✅ |
| DefaultInstanceIdProvider | GetInstanceId / GetKnownInstanceIds / RegisterLiveInstance | string / set / void | Sync (In-memory) | Env var read + lock for set mutation |
✅ |
| DaprResiliencePipeline | ExecuteAsync<T> / ExecuteAsync | Task<T> / Task | Async (Polly) | Retry + circuit breaker | ✅ |
| DaprCircuitBreakerHealthCheck | CheckHealthAsync | Task<HealthCheckResult> | Sync (wrapped) | Task.FromResult — reads boolean |
✅ |
| DaprStateStoreEntry | FromConnection / ToConnection | DTO | Sync (In-memory) | Pure property mapping | ✅ |
Client Classes¶
No hand-written source files. Client .csproj exists with package references but no .cs files yet.
3. virtufin-workmanager¶
Server / Engine Classes¶
| Class | Method | Returns | Type | Justification | .NET Pattern? |
|---|---|---|---|---|---|
| WorkManager | RegisterEngine | void | Sync (In-memory) | ConcurrentDictionary.TryAdd |
✅ |
| WorkManager | CreateWorkerAsync | Task<Guid> | Async (I/O) | Dapr state save + HTTP fetch + DNS lookup | ✅ |
| WorkManager | LoadCodeFromContent / LoadCodeFromUrl | Task | Async (I/O) | Dapr save + HTTP fetch | ✅ |
| WorkManager | DeleteWorkerAsync / StartWorkerAsync / StopWorkerAsync | Task | Async (I/O) | Dapr delete / subscribe / unsubscribe | ✅ |
| WorkManager | ListWorkers | IReadOnlyList | Sync (In-memory) | LINQ over WorkerRegistry.GetAll() |
✅ |
| WorkManager | GetWorkerHistory | IReadOnlyList | Sync (In-memory) | Reads worker.History.ToList() |
✅ |
| WorkManager | RecoverWorkersAsync | Task | Async (I/O) | Dapr bulk load + HTTP fetch + subscribe | ✅ |
| WorkManager | DisposeAsync | ValueTask | Async (I/O) | Disposes all Dapr topic subscriptions | ✅ |
| Worker | LoadCode | void | Sync (CPU) | Delegates to IEngine.LoadCode — sync contract |
✅ |
| Worker | ProcessAsync | Task<CloudEvent?> | Async (I/O) | Delegates to IEngine.ProcessAsync |
✅ |
| WorkerRegistry | Add / Remove / Get / GetAll / Exists | — | Sync (In-memory) | ConcurrentDictionary operations |
✅ |
| EngineRegistry | Register / Unregister / GetEngine / ListEngines | — | Sync (In-memory) | ConcurrentDictionary + factory invocation |
✅ |
| WorkManagerGrpcService | CreateWorker / StartWorker / StopWorker / LoadCodeFrom* / DeleteWorker / RecoverWorkers | Task<T> | Async (I/O) | Delegates to WorkManager async methods | ✅ |
| WorkManagerGrpcService | GetWorkerHistory | Task<T> | Sync (In-memory) | Task.FromResult (sync wrap); commit f3d1626 |
✅ |
| WorkManagerGrpcService | ListWorkers | Task<T> | Sync (wrapped) | Task.FromResult — correct pattern |
✅ |
| WorkManagerRecoveryHostedService | ExecuteAsync | Task | Async (I/O) | BackgroundService — await RecoverWorkersAsync |
✅ |
| DaprResiliencePipeline | ExecuteAsync<T> / ExecuteAsync | Task<T> / Task | Async (Polly) | Polly retry + circuit breaker | ✅ |
| DaprCircuitBreakerHealthCheck | CheckHealthAsync | Task<HealthCheckResult> | Sync (wrapped) | Task.FromResult |
✅ |
| ResilientDaprPublisher | PublishAsync / DisposeAsync | Task / ValueTask | Async (I/O) | Dapr pub/sub via resilience + buffer flush | ✅ |
| RecoveryState | CompleteRecovery / FailRecovery | void | Sync (In-memory) | volatile bool fields |
✅ |
| RecoveryHealthCheck | CheckHealthAsync | Task<HealthCheckResult> | Sync (wrapped) | Task.FromResult |
✅ |
| AppMetrics | SetWorkerCounts / Record* | void | Sync (In-memory) | Counter/Histogram increment | ✅ |
| IEngine (interface) | LoadCodeAsync | Task | Async contract (interface) | Task LoadCodeAsync(byte[] code, CancellationToken cancellationToken = default) per IEngine (line 16). B8 resolution in commit 4684daa. Implementations: PythonEngine is properly async (uses AcceptTcpClientAsync); CSharpSourceEngine and DotNetDllEngine are fake-async (return Task.CompletedTask) but interface-mandated — per user policy, left as-is. Method name corrected from LoadCode to LoadCodeAsync and return type from void to Task. |
✅ |
| IEngine (interface) | ProcessAsync | Task<CloudEvent?> | Async contract | Accommodates I/O engines; forces async wrap for CPU engines | ⚠️ |
| PythonEngine | LoadCodeAsync | Task | Async (I/O) | await StopProcessAsync() + await StartProcessAsync(cancellationToken) — proper async. The TcpListener.AcceptTcpClient call is inside StartProcessAsync (line 185) which already uses AcceptTcpClientAsync(cancellationToken). Per-method flag above updated from ❌ to ✅. Landed in commit 4684daa (same commit as C1). |
|
| PythonEngine | ProcessAsync | Task<CloudEvent?> | Async (I/O) | TCP WriteLineAsync/ReadLineAsync + SemaphoreSlim.WaitAsync |
✅ |
| PythonEngine | Dispose | void | Sync with blocking I/O | Process.WaitForExit(5000) — blocks thread for up to 5s. B6 resolution: class implements IAsyncDisposable; the Dispose method is the sync fallback (per the .NET standard IDisposable contract), and the async disposal path via DisposeAsync does the same work asynchronously. |
✅ |
| CSharpSourceEngine | LoadCode | void | Sync (CPU + in-memory I/O) | Roslyn compilation + Assembly.Load from bytes |
✅ |
| CSharpSourceEngine | ProcessAsync | Task<CloudEvent?> | Sync (In-memory) | Task.FromResult (CPU delegate call); landed in B8 refactor (commit 4684daa) |
✅ |
| DotNetDllEngine | LoadCode | void | Sync (in-memory I/O) | Zip extraction + AssemblyLoadContext.LoadFromStream |
✅ |
| DotNetDllEngine | ProcessAsync | Task<CloudEvent?> | Async (I/O) | await worker delegate — genuine async |
✅ |
| WorkerBase / CommandWorker / ApiWorker / ApiCommandWorker | ProcessAsync | Task<CloudEvent?> | Async | Abstract/user-implemented — async by contract | ✅ |
| CodeSourceJsonConverter | Read / Write | — | Sync (CPU) | JSON serialize/deserialize — in-memory | ✅ |
Client Classes (generated gRPC)¶
All 8 RPC methods have both sync (BlockingUnaryCall) and async (AsyncUnaryCall) overloads — standard gRPC generated pattern. Not analyzed separately because the code is generated, not hand-written.
Anti-Patterns (Grouped by Severity)¶
Critical¶
| # | Location | Issue | Status | Description |
|---|---|---|---|---|
| A1 | GrpcChannelPool.cs:75-76 |
.GetAwaiter().GetResult() on Task |
✅ | ValidateReflection was sync; replaced with GetChannelAsync returning Task<GrpcChannel>, cached via ConcurrentDictionary<string, Task<GrpcChannel>> + GetOrAdd, 3 callers in GrpcReflectionService.cs updated to await. Per-method flag in the table above updated from ❌ to ✅. Commit: 91a3b0a. |
| A2 | PythonEngine.cs:235,239 |
Process.WaitForExit(5000) in async context |
✅ | StopProcess → StopProcessAsync using Process.WaitForExitAsync(cts.Token). Async callers (ProcessAsync, MonitorHealthAsync) now await the stop; sync callers (LoadCode, Dispose) bridge with .GetAwaiter().GetResult() at single shutdown points. |
| A3 | PythonEngine.cs:588 |
Process.WaitForExit() indefinitely |
✅ | EnsurePythonAvailable → EnsurePythonAvailableAsync using Process.WaitForExitAsync(cts.Token) with 10s timeout. |
Important¶
| # | Location | Issue | Status | Description |
|---|---|---|---|---|
| B1 | WebSocketClientWrapper.cs:126 |
Fire-and-forget via Task.Run |
✅ | Task.Run chain now ends with .ContinueWith(t => _logger.LogError(t.Exception, ...), CancellationToken.None, TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default) — exceptions are now observed. Per-method flag in the table above updated from ❌ to ✅. Commit: 1dc486b. |
| B2 | GatewayService.cs:270 |
Fire-and-forget _ = Task.Run(...) |
✅ | Same pattern as B1 — added ContinueWith(OnlyOnFaulted) to log subscription task failures. |
| B3 | PubsubService.cs:146 |
Fire-and-forget _ = BroadcastToTopicAsync(...) |
✅ | Same pattern as B1/B2 — added ContinueWith(OnlyOnFaulted) to log broadcast task failures. |
| B4 | ApiClient.cs:346 |
Sync blocking gRPC | 🔄 | GatewayClient.UnsubscribeFromTopic (sync) deleted entirely. UnsubscribeFromTopicAsync is the sole entry point. Breaking change — we are in alpha. |
| B5 | ApiClient.cs:476-478 |
Sync handler blocks streaming loop | ✅ | Added IAsyncStreamEventHandler with OnEventAsync/OnErrorAsync. SubscribeAsync checks is IAsyncStreamEventHandler and awaits the async variant. Sync IStreamEventHandler still supported. |
| B6 | PythonEngine.cs:601-608 |
Sync Dispose with blocking I/O | ✅ | PythonEngine implements IAsyncDisposable.DisposeAsync (awaits StopProcessAsync); sync Dispose forwards via DisposeAsync().AsTask().GetAwaiter().GetResult(). Per-method flag in the table above updated from ❌ to ✅. Commit: c4711ec. |
| B7 | GrpcChannelPool.cs:66-85 |
Sync method with gRPC I/O | ✅ | Resolved as part of A1. |
| B8 | WorkManager.cs:143 |
Sync LoadCode called from async method |
🔄 | IEngine.LoadCode(byte[]) → IEngine.LoadCodeAsync(byte[], CancellationToken). ProcessAsync also gained CancellationToken. Updated all 3 engine impls (Python, CSharp, DotNetDll) — PythonEngine now does async TCP accept via AcceptTcpClientAsync + await Read/Write. Updated all callers (WorkManager.CreateWorkerAsync/RecoverWorkersAsync/LoadCodeFrom*) and ~18 test sites. Breaking change — we are in alpha. |
Moderate¶
| # | Location | Issue | Status | Description |
|---|---|---|---|---|
| C1 | CSharpSourceEngine.cs:74-85 |
async with no await |
✅ | ProcessAsync no longer uses async — returns Task<CloudEvent?> via Task.FromResult(result). LoadCode similarly returns Task.CompletedTask. Resolved as part of B8 in commit 4684daa. Per-method flag above updated from ⚠️ to ✅. |
| C2 | WorkManagerGrpcService.cs:194-219 |
async with no await |
✅ | GetWorkerHistory no longer uses async — returns Task.GetWorkerHistoryResponse> via Task.FromResult(response). Commit: f3d1626. Per-method flag above updated from ❌ to ✅. |
| C3 | DaprConnectionRepository |
Resilience pipeline unused | ✅ | All 6 Dapr call sites now wrapped with _resilience.ExecuteAsync(ct => _daprClient.X(...), cancellationToken) — retry + circuit breaker actually applied. |
| C4 | WebSocketClientWrapper.DisconnectAsync |
Swallows exceptions | ✅ | catch (Exception ex) when (ex is not OperationCanceledException) — propagates non-cancellation errors to the caller while still swallowing the expected OperationCanceledException during shutdown. |
| C5 | DaprConnectionRepository.GetByInstanceIdAsync |
Fetch-all then filter | ✅ | New per-instance secondary index websocket-instance-index-{instanceId} maintained in SaveAsync/DeleteAsync. GetByInstanceIdAsync fetches only the targeted index + bulk-gets the matching connection keys (O(instance's connections) instead of O(all connections)). |
| C6 | PythonEngine.ProcessAsync:98 |
WaitAsync() without CancellationToken |
✅ | First _processSemaphore.WaitAsync() now passes cancellationToken (the method parameter). Symmetric with the existing call at line 301. |
Low / Nice-to-Have¶
| # | Location | Issue | Status | Description |
|---|---|---|---|---|
| D1 | Multiple files | Unnecessary async keyword |
✅ | 3 methods converted to Task.FromResult returns: GatewayService.ListServices, PubsubService.Unsubscribe, StateService.RegisterKeys. Per-method flags in the tables above updated from ❌ to ✅; the methods now use the Task.FromResult(sync_work) idiom and no longer trigger CS1998. Commit: 5e20c4c. |
| D2 | Entire codebase | No ConfigureAwait(false) |
✅ | Added ConfigureAwait(false) to all 17 await expressions in virtufin-api/ApiClient.cs (NuGet client library). Server projects left as-is (no benefit in ASP.NET Core sync context). |
| D3 | IEngine.ProcessAsync |
No CancellationToken | 🔄 | IEngine.ProcessAsync and IEngine.LoadCodeAsync both gained CancellationToken parameters. Worker.ProcessAsync passes through. Resolved as part of B8. |
| D4 | PythonEngine.cs:98 |
WaitAsync() without CT |
✅ | Resolved as part of C6. |
| D5 | ResilientDaprPublisher.DisposeAsync |
Flush with CancellationToken.None |
✅ | Bounded with CancellationTokenSource(TimeSpan.FromSeconds(5)) so a hung Dapr publish cannot block host shutdown indefinitely. |
| D6 | DistributedWebSocketConnectionStore.CreateConnectionAsync |
Unnecessary async wrapper | ✅ | Method + interface changed to ValueTask<WebSocketConnection> using ValueTask.FromResult — avoids the Task allocation for the sync-completion path. Per-method flag in the table above updated from ⚠️ to ✅. Commit: 75afcde. |
| D7 | WebSocketClientWrapper.cs:124-125 |
Mutable CTS race condition | ✅ | WebSocketConnection.CancellationTokenSource setter removed; replaced with SwapCancellationTokenSource(newCts) using Interlocked.Exchange for atomic replace + Volatile.Read for atomic snapshot. Previous CTS is now Cancel()/Dispose()d by the swapper. |
| D8 | Program.cs:148 |
EnableDetailedErrors=true in production |
✅ | All 3 services (virtufin-api, virtufin-websocketmanager, virtufin-workmanager) now gate on builder.Environment.IsDevelopment(). |
| D9 | ServicesConfigurationLoader.Load |
Sync file I/O | ✅ | Load() → LoadAsync() using File.ReadAllTextAsync. DI factory at startup calls .GetAwaiter().GetResult() — acceptable at the single startup point. Per-method flag in the table above updated from ⚠️ to ✅ (and method name corrected from Load to LoadAsync). Commit: f75cb6d. |
| D10 | SubscriptionHealthSweeper.cs:62 |
Race condition on heartbeat | ✅ | Re-examined: the subscription channel is Channel.CreateUnbounded<>, so TryWrite returning false only signals a completed channel (genuine dead subscriber) — never a transient full-channel backpressure. The deferred "false positive" risk does not apply. Actual fix: stop forwarding synthetic heartbeats to the gRPC client (the Subscribe RPC consumer now skips events with empty Data). Commit a770f5c. |
| D11 | GrpcReflectionService.cs:271 |
lock in async method |
✅ | DateTime _loadedAt → long _loadedAtTicks. Both reads in EnsureLoadedAsync now use Interlocked.Read(ref _loadedAtTicks); write uses Interlocked.Exchange. |
Before/After Examples¶
A1: GrpcChannelPool.ValidateReflection — Sync-over-Async¶
Before (Critical):
private void ValidateReflection(GrpcChannel channel, string serviceName)
{
var client = new ServerReflection.ServerReflectionClient(channel);
var call = client.ServerReflectionInfo();
var request = new ServerReflectionRequest { FileContainingSymbol = serviceName };
call.RequestStream.WriteAsync(request).GetAwaiter().GetResult(); // BLOCKS
call.ResponseStream.MoveNext(CancellationToken.None).GetAwaiter().GetResult(); // BLOCKS
}
After:
private async Task ValidateReflectionAsync(GrpcChannel channel, string serviceName)
{
var client = new ServerReflection.ServerReflectionClient(channel);
using var call = client.ServerReflectionInfo();
var request = new ServerReflectionRequest { FileContainingSymbol = serviceName };
await call.RequestStream.WriteAsync(request);
await call.ResponseStream.MoveNext(CancellationToken.None);
}
GetOrAdd factory to use async lazy initialization.
A2: PythonEngine — Blocking WaitForExit¶
Before (Critical):
process.Kill();
process.WaitForExit(5000); // Blocks thread pool
After:
process.Kill();
await process.WaitForExitAsync(cancellationToken); // Async, cancellable
A3: PythonEngine — Blocking WaitForExit (EnsurePythonAvailable)¶
Before (Critical):
var process = Process.Start(psi);
process.WaitForExit(); // Blocks until python3 --version exits
After:
var process = Process.Start(psi);
await process.WaitForExitAsync();
B1: WebSocketClientWrapper.StartReceiveLoop — Fire-and-forget¶
Before (Important):
public void StartReceiveLoop(...)
{
connection.CancellationTokenSource = new CancellationTokenSource();
connection.ReceiveTask = Task.Run(async () => { ... });
// Task.Run result never awaited — unobserved exceptions silently discarded
}
After:
public void StartReceiveLoop(...)
{
connection.CancellationTokenSource = new CancellationTokenSource();
connection.ReceiveTask = Task.Run(async () => { ... })
.ContinueWith(t => {
if (t.IsFaulted)
_logger.LogError(t.Exception, "Receive loop crashed");
}, TaskContinuationOptions.OnlyOnFaulted);
}
B5: IStreamEventHandler — Sync-only callbacks¶
Before (Important):
public interface IStreamEventHandler
{
void OnEvent(CloudEvent e);
void OnError(Exception ex);
}
After:
public interface IStreamEventHandler
{
void OnEvent(CloudEvent e);
void OnError(Exception ex);
}
public interface IAsyncStreamEventHandler : IStreamEventHandler
{
Task OnEventAsync(CloudEvent e);
Task OnErrorAsync(Exception ex);
}
SubscribeAsync to check if handler is IAsyncStreamEventHandler before calling.
C1: IEngine.ProcessAsync — No CancellationToken¶
Before (Design Limitation):
public interface IEngine
{
void LoadCode(ReadOnlyMemory<byte> code);
Task<CloudEvent?> ProcessAsync(CloudEvent input);
}
After:
public interface IEngine
{
void LoadCode(ReadOnlyMemory<byte> code, CancellationToken cancellationToken = default);
Task<CloudEvent?> ProcessAsync(CloudEvent input, CancellationToken cancellationToken = default);
}
B5: WebSocketClientWrapper.DisconnectAsync — Swallows exceptions¶
Before (Important):
catch (Exception ex)
{
_logger.LogWarning(ex, "Error during disconnect for connection {ConnectionId}", connectionId);
// Exception swallowed — caller can't detect failure
}
After:
catch (Exception ex) when (ex is not OperationCanceledException)
{
_logger.LogWarning(ex, "Error during disconnect for connection {ConnectionId}", connectionId);
throw; // Propagate to caller
}
Summary Statistics¶
| Metric | Original Count | Resolved | Deferred |
|---|---|---|---|
| Total methods inventoried | ~150 | — | — |
| Critical anti-patterns | 3 | 3 | 0 |
| Important anti-patterns | 8 | 8 | 0 |
| Moderate anti-patterns | 6 | 6 | 0 |
| Low / Nice-to-have anti-patterns | 11 | 11 | 0 |
| Total | 28 | 28 | 0 |
| Methods matching .NET conventions | ~90% | ~99% | — |
Unnecessary async keywords (CS1998) |
5 | 5 | 0 |
| Fire-and-forget patterns | 3 | 3 | 0 |
| Sync-over-async blocking (.Result/.Wait()/.GetAwaiter().GetResult()) | 3 | 3 | 0 |
| Missing CancellationToken passthrough | 2 | 2 | 0 |
Missing ConfigureAwait(false) in NuGet libraries |
1 (pervasive) | 1 | 0 |
| Breaking changes required (alpha-safe) | 2 | 2 (B4, B8) | 0 |
Recommended Fix Order¶
Status: All 28 of 28 anti-patterns resolved across 3 repos.
- ✅ Critical — A1 (
GrpcChannelPool.ValidateReflectionsync-over-async) and A2/A3 (PythonEngineblocking waits). - ✅ Important — B1, B2, B3 (fire-and-forget patterns) — added
ContinueWith(OnlyOnFaulted)for exception logging. - ✅ Important — B4 (sync blocking gRPC in client library) — sync
UnsubscribeFromTopicdeleted (alpha-safe breaking change). - ✅ Important — B5 (
IStreamEventHandlersync-only) — addedIAsyncStreamEventHandleropt-in async variant. - ✅ Important — B6 (
PythonEngine.Disposesync) — implementedIAsyncDisposable. - ✅ Important — B8 (
WorkManager.LoadCodesync) —IEngine.LoadCode→LoadCodeAsync(alpha-safe breaking change). Resolved D3 in same commit. - ✅ Moderate — C3 (Dapr resilience wiring), C4 (disconnect exception propagation), C5 (per-instance index), C6 (CT passthrough).
- ✅ Moderate — Clean up CS1998 warnings (C1, C2, D1). All 5 cases now resolved, zero CS1998 warnings remain. D1 (3 methods in
virtufin-api) in commit 5e20c4c; C1 (CSharpSourceEngine.ProcessAsync) in commit 4684daa; C2 (WorkManagerGrpcService.GetWorkerHistory) in commit f3d1626. - ✅ Low — D2 (
ConfigureAwait(false)invirtufin-apiclient), D5 (disposal timeout), D6 (ValueTask), D7 (atomic CTS), D8 (env-gated detailed errors), D9 (async file I/O), D11 (atomic_loadedAt). - ✅ Low — D10 (
SubscriptionHealthSweeperheartbeat race) — re-examined: the channel is unbounded, soTryWrite=falseonly signals completion, not backpressure. Fix: stop forwarding synthetic heartbeats to the gRPC client. Commita770f5c.
Resolution Summary¶
All 28 audit findings were addressed in a single coordinated sweep across virtufin-api, virtufin-websocketmanager, and virtufin-workmanager. Changes were made in 2 breaking-change commits (B4 deleted sync method, B8 made IEngine.LoadCode async) and 25 non-breaking commits — all on local master branches, not yet pushed.
Commits by repository (unpushed)¶
| Repository | Commits |
|---|---|
virtufin-websocketmanager |
7 — B1, C3, C4, C5, D6, D7, D8 |
virtufin-api |
10 — A1, B2/B3, B4 obsolete, B4 removal, B5, D1, D2, D8, D9, D11 |
virtufin-workmanager |
9 — A2, A3, B6, B8 interface, B8 callers, C2, C6, D5, D8 |
Notable design changes¶
IEngineis now fully async —LoadCodeAsync(byte[], CancellationToken)+ProcessAsync(CloudEvent, CancellationToken).PythonEngineis the largest beneficiary: TCP listener accept, read, write, and ack all run on the thread pool instead of blocking it.IWebSocketConnectionStore.CreateConnectionAsyncreturnsValueTask<...>— avoidsTaskallocation for the common sync-completion path.WebSocketConnection.CancellationTokenSourcesetter removed — replaced withSwapCancellationTokenSource(newCts)usingInterlocked.Exchange. Atomic replace + cancel + dispose of the previous CTS happens in a single call.- Per-instance secondary index in
DaprConnectionRepository—GetByInstanceIdAsyncis now O(instance's connections) instead of O(all connections).Save/Deletemaintain both the global and per-instance indexes. DaprResiliencePipelineis now actually wired in — all 6 Dapr state store calls inDaprConnectionRepositorygo through retry + circuit breaker.Program.csEnableDetailedErrorsis nowbuilder.Environment.IsDevelopment()in all 3 services.
Verification¶
All 3 repos build with 0 errors on local machine. The virtufin-workmanager service build was not directly testable locally due to Virtufin.Api.Client 0.0.38 not being in the local NuGet cache (CI will have credentials), but PythonEngine and CSharpSourceEngine projects (which use the new async interface) build clean.
Recommended follow-up¶
- Push the 32 commits across 5 repos (
virtufin-common,virtufin-api,virtufin-websocketmanager,virtufin-workmanager,virtufin-examples). - Configure
VIRTUFIN_PACKAGES_USER/VIRTUFIN_PACKAGES_TOKENas Gitea Actions secrets in the 5 caller repos (virtufin-api,virtufin-websocketmanager,virtufin-workmanager,virtufin-examples,docker-compose). - Stress-test the
SubscriptionHealthSweeper(D10) before revisiting. - Out of audit scope: rotate the plaintext
HARBOR_TOKENinvirtufin-dotnet/.env(flagged in earlier sessions, never edited).
Tracking¶
Per-repo resolution tracking issues (all closed, since items are already resolved on local master):
- virtufin-api umbrella (#1) — 10 items resolved
- virtufin-websocketmanager umbrella (#2) — 7 items resolved
- virtufin-websocketmanager D10 resolved (#3) — 1 item resolved (was deferred, fix shipped)
- virtufin-workmanager umbrella (#3) — 9 items resolved