Skip to content

Sync vs Async Audit — Virtufin C# Projects

Originally generated 2026-06-10. Relocated from virtufin-common/docs/sync-async-review.md to this spec on 2026-06-11. Covers virtufin-api, virtufin-websocketmanager, virtufin-workmanager.

Resolution status legend: ✅ Resolved · 🔄 Resolved via breaking change · ⏭ Deferred (low impact, documented)

Related specs: - Proto-to-Client Mapping — documents the per-language stub and wrapper conventions that determine whether a C# method is X (sync) or XAsync (async) at the gRPC plugin layer


1. virtufin-api

Server Classes

Class Method Returns Type Justification .NET Pattern?
GatewayService Invoke Task<InvokeResponse> Async (I/O) Dynamic gRPC downstream proxy
GatewayService InvokeJson Task<InvokeJsonResponse> Async (I/O) Same pattern — JSON passthrough
GatewayService ListServices Task<ListServicesResponse> Sync (In-memory) Task.FromResult (pure config lookup, no I/O)
GatewayService ListMethods Task<ListMethodsResponse> Async (I/O) gRPC reflection + local cache
GatewayService Subscribe Task (streaming) Async (I/O) Dapr subscriptions + Channel<Event>
GatewayService GetMethodSchema Task<GetMethodSchemaResponse> Async (I/O + in-memory) Reflection cache + descriptor parsing
ConfigService ListServices Task<ListConfigServicesResponse> Sync (In-memory) Task.FromResult — correct pattern
ConfigService GetService Task<GetConfigServiceResponse> Sync (In-memory) Task.FromResult — correct
PubsubService PublishEvent Task<PublishResponse> Async (I/O) Dapr pub/sub I/O
PubsubService Subscribe Task (streaming) Async (I/O) Dapr subscription + channel streaming
PubsubService Unsubscribe Task<UnsubscribeResponse> Sync (In-memory) Task.FromResult (sync UnsubscribeById call)
StateService SaveState Task<SaveStateResponse> Async (I/O) Dapr state + pub/sub
StateService GetState Task<GetStateResponse> Async (I/O) Dapr state read
StateService GetAllState Task<GetAllStateResponse> Async (I/O) Dapr bulk state read
StateService RegisterKeys Task<RegisterKeysResponse> Sync (In-memory) Task.FromResult (in-memory key tracking, no I/O)
StateService DeleteState Task<DeleteStateResponse> Async (I/O) Dapr state delete + pub/sub
GrpcReflectionService GetMethodsAsync Task<List<ServiceMethodInfo>> Async (I/O + cache) gRPC reflection protocol streaming
GrpcReflectionService GetMethodSchemaAsync Task<MethodSchema?> Async (cache + in-memory) Lazy-load + descriptor parse
GrpcReflectionService ExecuteCallAsync (JSON) Task<GrpcCallResult> Async (I/O) Dynamic unary gRPC via reflection
GrpcReflectionService ExecuteCallAsync (bytes) Task<GrpcCallResult> Async (I/O) Raw protobuf passthrough
GrpcReflectionService GetFileDescriptorProtosAsync Task<List<ByteString>> Async (I/O + cache) Reflection service
GrpcReflectionService InvalidateCache void Sync (In-memory) ConcurrentDictionary.TryRemove
GrpcReflectionService InvalidateAllCaches void Sync (In-memory) ConcurrentDictionary.Clear
GrpcChannelPool GetChannel GrpcChannel Sync (In-memory) ConcurrentDictionary.GetOrAdd
GrpcChannelPool ValidateReflection void Async (I/O + stream awaits) await call.RequestStream.WriteAsync(...) + await call.ResponseStream.MoveNext(cts.Token) — true async over the gRPC reflection stream
GrpcChannelPool Dispose void Sync (Cleanup) Channel disposal
PubsubSubscriptionManager Subscribe / UnsubscribeById / BroadcastToTopicAsync Mixed In-memory dict + async dispatch via ChannelWriter.TryWrite
SubscriptionManagerBase Subscribe / Unsubscribe / BroadcastAsync Mixed Generic pattern; BroadcastAsync fans out via Task.WhenAll
SubscriptionHealthSweeper ExecuteAsync Task Async (timer loop) Task.Delay + sync cleanup via TryWrite heartbeats
DaprResiliencePipeline ExecuteAsync<T> / ExecuteAsync Task<T> / Task Async (Polly) Polly retry + circuit breaker
DaprCircuitBreakerHealthCheck CheckHealthAsync Task<HealthCheckResult> Sync (wrapped) Task.FromResult — reads CircuitBroken
DaprHealthCheck CheckHealthAsync Task<HealthCheckResult> Async (I/O) _daprClient.CheckHealthAsync()
GrpcChannelPoolHealthCheck CheckHealthAsync Task<HealthCheckResult> Sync (wrapped) Task.FromResult — reads counts
ServicesConfigurationLoader LoadAsync Task<ServicesConfiguration> Async (I/O) await File.ReadAllTextAsync(...) + await JsonSerializer.Deserialize(...) — true async I/O. D9 resolved; DI factory at startup calls .GetAwaiter().GetResult() at the single startup point (acceptable per audit).

Client Classes

Class Method Returns Type .NET Pattern?
ApiClient ListServicesAsync / ListMethodsAsync / InvokeAsync Task<T> Async (I/O)
ApiClient Close / Dispose void Sync (Cleanup)
ApiClient CloseAsync / DisposeAsync Task / ValueTask Async (I/O)
GatewayClient ListServicesAsync / ListMethodsAsync / InvokeAsync Task<T> Async (I/O)
GatewayClient SaveStateAsync / DeleteStateAsync / GetStateAsync / GetAllStateAsync / PublishEventAsync Task<T> Async (I/O) — passes CancellationToken
GatewayClient UnsubscribeFromTopic UnsubscribeResponse Sync (blocking gRPC) 🔄 → removed (B4)
GatewayClient UnsubscribeFromTopicAsync Task<UnsubscribeResponse> Async (I/O)
GatewayClient Subscribe / SubscribeToTopic AsyncServerStreamingCall Sync (returns handle)
GatewayClient SubscribeAsync Task Async (I/O)
ServiceClient InvokeAsync Task<Dictionary> Async (I/O)
ServiceClient TryGetMember bool Sync (In-memory)
StreamEventHandler / IStreamEventHandler OnEvent / OnError void Sync (In-memory)

2. virtufin-websocketmanager

Server Classes

Class Method Returns Type Justification .NET Pattern?
IWebSocketService ConnectAsync Task<WebSocketConnection> Async (I/O) Dapr + WebSocket connect
IWebSocketService ListConnectionsAsync / DisconnectAsync / StartPublishAsync / StopPublishAsync / SendAsync / SendRawAsync Task<T> / Task Async (I/O) Dapr I/O or WebSocket I/O
WebSocketManagerGrpcService Connect / List / Disconnect / StartPublish / StopPublish / Send / SendRaw Task<T> Async (I/O) Delegates to _webSocketService — maps exceptions to RpcStatus
WebSocketClientWrapper ConnectAsync / DisconnectAsync / SendAsync Task Async (I/O) ClientWebSocket I/O
WebSocketClientWrapper SendAndWaitAsync Task<byte[]> Async (I/O) WebSocket send + correlation timeout
WebSocketClientWrapper StartReceiveLoop void Fire-and-forget (observed) Task.Run background receive loop — B1 resolution: ContinueWith(OnlyOnFaulted) attached for exception logging so fire-and-forget is no longer silent.
DaprConnectionRepository GetAllAsync / GetAsync / SaveAsync / DeleteAsync / GetByInstanceIdAsync / ClearAllAsync Task<T> / Task Async (I/O) Dapr state store I/O
DistributedWebSocketConnectionStore CreateConnectionAsync ValueTask<WebSocketConnection> Sync (wrapped) D6 resolution: signature changed from Task<...> to ValueTask<...> returning ValueTask.FromResult — avoids the Task allocation for the sync-completion path. Interface-mandated (per user policy, left as-is).
DistributedWebSocketConnectionStore GetConnectionAsync / GetAllConnectionsAsync / UpdateConnectionAsync / RemoveConnectionAsync / ClearAllConnectionsAsync Task<T> / Task Async (I/O) Local ConcurrentDictionary + Dapr persistence
ConnectionReclaimerHostedService ExecuteAsync Task Async (I/O) BackgroundService loop — 30s poll, Dapr I/O
DaprPublisher PublishAsync Task Async (I/O) Dapr pub/sub I/O
DefaultInstanceIdProvider GetInstanceId / GetKnownInstanceIds / RegisterLiveInstance string / set / void Sync (In-memory) Env var read + lock for set mutation
DaprResiliencePipeline ExecuteAsync<T> / ExecuteAsync Task<T> / Task Async (Polly) Retry + circuit breaker
DaprCircuitBreakerHealthCheck CheckHealthAsync Task<HealthCheckResult> Sync (wrapped) Task.FromResult — reads boolean
DaprStateStoreEntry FromConnection / ToConnection DTO Sync (In-memory) Pure property mapping

Client Classes

No hand-written source files. Client .csproj exists with package references but no .cs files yet.


3. virtufin-workmanager

Server / Engine Classes

Class Method Returns Type Justification .NET Pattern?
WorkManager RegisterEngine void Sync (In-memory) ConcurrentDictionary.TryAdd
WorkManager CreateWorkerAsync Task<Guid> Async (I/O) Dapr state save + HTTP fetch + DNS lookup
WorkManager LoadCodeFromContent / LoadCodeFromUrl Task Async (I/O) Dapr save + HTTP fetch
WorkManager DeleteWorkerAsync / StartWorkerAsync / StopWorkerAsync Task Async (I/O) Dapr delete / subscribe / unsubscribe
WorkManager ListWorkers IReadOnlyList Sync (In-memory) LINQ over WorkerRegistry.GetAll()
WorkManager GetWorkerHistory IReadOnlyList Sync (In-memory) Reads worker.History.ToList()
WorkManager RecoverWorkersAsync Task Async (I/O) Dapr bulk load + HTTP fetch + subscribe
WorkManager DisposeAsync ValueTask Async (I/O) Disposes all Dapr topic subscriptions
Worker LoadCode void Sync (CPU) Delegates to IEngine.LoadCode — sync contract
Worker ProcessAsync Task<CloudEvent?> Async (I/O) Delegates to IEngine.ProcessAsync
WorkerRegistry Add / Remove / Get / GetAll / Exists Sync (In-memory) ConcurrentDictionary operations
EngineRegistry Register / Unregister / GetEngine / ListEngines Sync (In-memory) ConcurrentDictionary + factory invocation
WorkManagerGrpcService CreateWorker / StartWorker / StopWorker / LoadCodeFrom* / DeleteWorker / RecoverWorkers Task<T> Async (I/O) Delegates to WorkManager async methods
WorkManagerGrpcService GetWorkerHistory Task<T> Sync (In-memory) Task.FromResult (sync wrap); commit f3d1626
WorkManagerGrpcService ListWorkers Task<T> Sync (wrapped) Task.FromResult — correct pattern
WorkManagerRecoveryHostedService ExecuteAsync Task Async (I/O) BackgroundServiceawait RecoverWorkersAsync
DaprResiliencePipeline ExecuteAsync<T> / ExecuteAsync Task<T> / Task Async (Polly) Polly retry + circuit breaker
DaprCircuitBreakerHealthCheck CheckHealthAsync Task<HealthCheckResult> Sync (wrapped) Task.FromResult
ResilientDaprPublisher PublishAsync / DisposeAsync Task / ValueTask Async (I/O) Dapr pub/sub via resilience + buffer flush
RecoveryState CompleteRecovery / FailRecovery void Sync (In-memory) volatile bool fields
RecoveryHealthCheck CheckHealthAsync Task<HealthCheckResult> Sync (wrapped) Task.FromResult
AppMetrics SetWorkerCounts / Record* void Sync (In-memory) Counter/Histogram increment
IEngine (interface) LoadCodeAsync Task Async contract (interface) Task LoadCodeAsync(byte[] code, CancellationToken cancellationToken = default) per IEngine (line 16). B8 resolution in commit 4684daa. Implementations: PythonEngine is properly async (uses AcceptTcpClientAsync); CSharpSourceEngine and DotNetDllEngine are fake-async (return Task.CompletedTask) but interface-mandated — per user policy, left as-is. Method name corrected from LoadCode to LoadCodeAsync and return type from void to Task.
IEngine (interface) ProcessAsync Task<CloudEvent?> Async contract Accommodates I/O engines; forces async wrap for CPU engines ⚠️
PythonEngine LoadCodeAsync Task Async (I/O) await StopProcessAsync() + await StartProcessAsync(cancellationToken) — proper async. The TcpListener.AcceptTcpClient call is inside StartProcessAsync (line 185) which already uses AcceptTcpClientAsync(cancellationToken). Per-method flag above updated from to . Landed in commit 4684daa (same commit as C1).
PythonEngine ProcessAsync Task<CloudEvent?> Async (I/O) TCP WriteLineAsync/ReadLineAsync + SemaphoreSlim.WaitAsync
PythonEngine Dispose void Sync with blocking I/O Process.WaitForExit(5000) — blocks thread for up to 5s. B6 resolution: class implements IAsyncDisposable; the Dispose method is the sync fallback (per the .NET standard IDisposable contract), and the async disposal path via DisposeAsync does the same work asynchronously.
CSharpSourceEngine LoadCode void Sync (CPU + in-memory I/O) Roslyn compilation + Assembly.Load from bytes
CSharpSourceEngine ProcessAsync Task<CloudEvent?> Sync (In-memory) Task.FromResult (CPU delegate call); landed in B8 refactor (commit 4684daa)
DotNetDllEngine LoadCode void Sync (in-memory I/O) Zip extraction + AssemblyLoadContext.LoadFromStream
DotNetDllEngine ProcessAsync Task<CloudEvent?> Async (I/O) await worker delegate — genuine async
WorkerBase / CommandWorker / ApiWorker / ApiCommandWorker ProcessAsync Task<CloudEvent?> Async Abstract/user-implemented — async by contract
CodeSourceJsonConverter Read / Write Sync (CPU) JSON serialize/deserialize — in-memory

Client Classes (generated gRPC)

All 8 RPC methods have both sync (BlockingUnaryCall) and async (AsyncUnaryCall) overloads — standard gRPC generated pattern. Not analyzed separately because the code is generated, not hand-written.


Anti-Patterns (Grouped by Severity)

Critical

# Location Issue Status Description
A1 GrpcChannelPool.cs:75-76 .GetAwaiter().GetResult() on Task ValidateReflection was sync; replaced with GetChannelAsync returning Task<GrpcChannel>, cached via ConcurrentDictionary<string, Task<GrpcChannel>> + GetOrAdd, 3 callers in GrpcReflectionService.cs updated to await. Per-method flag in the table above updated from to . Commit: 91a3b0a.
A2 PythonEngine.cs:235,239 Process.WaitForExit(5000) in async context StopProcessStopProcessAsync using Process.WaitForExitAsync(cts.Token). Async callers (ProcessAsync, MonitorHealthAsync) now await the stop; sync callers (LoadCode, Dispose) bridge with .GetAwaiter().GetResult() at single shutdown points.
A3 PythonEngine.cs:588 Process.WaitForExit() indefinitely EnsurePythonAvailableEnsurePythonAvailableAsync using Process.WaitForExitAsync(cts.Token) with 10s timeout.

Important

# Location Issue Status Description
B1 WebSocketClientWrapper.cs:126 Fire-and-forget via Task.Run Task.Run chain now ends with .ContinueWith(t => _logger.LogError(t.Exception, ...), CancellationToken.None, TaskContinuationOptions.OnlyOnFaulted, TaskScheduler.Default) — exceptions are now observed. Per-method flag in the table above updated from to . Commit: 1dc486b.
B2 GatewayService.cs:270 Fire-and-forget _ = Task.Run(...) Same pattern as B1 — added ContinueWith(OnlyOnFaulted) to log subscription task failures.
B3 PubsubService.cs:146 Fire-and-forget _ = BroadcastToTopicAsync(...) Same pattern as B1/B2 — added ContinueWith(OnlyOnFaulted) to log broadcast task failures.
B4 ApiClient.cs:346 Sync blocking gRPC 🔄 GatewayClient.UnsubscribeFromTopic (sync) deleted entirely. UnsubscribeFromTopicAsync is the sole entry point. Breaking change — we are in alpha.
B5 ApiClient.cs:476-478 Sync handler blocks streaming loop Added IAsyncStreamEventHandler with OnEventAsync/OnErrorAsync. SubscribeAsync checks is IAsyncStreamEventHandler and awaits the async variant. Sync IStreamEventHandler still supported.
B6 PythonEngine.cs:601-608 Sync Dispose with blocking I/O PythonEngine implements IAsyncDisposable.DisposeAsync (awaits StopProcessAsync); sync Dispose forwards via DisposeAsync().AsTask().GetAwaiter().GetResult(). Per-method flag in the table above updated from to . Commit: c4711ec.
B7 GrpcChannelPool.cs:66-85 Sync method with gRPC I/O Resolved as part of A1.
B8 WorkManager.cs:143 Sync LoadCode called from async method 🔄 IEngine.LoadCode(byte[])IEngine.LoadCodeAsync(byte[], CancellationToken). ProcessAsync also gained CancellationToken. Updated all 3 engine impls (Python, CSharp, DotNetDll) — PythonEngine now does async TCP accept via AcceptTcpClientAsync + await Read/Write. Updated all callers (WorkManager.CreateWorkerAsync/RecoverWorkersAsync/LoadCodeFrom*) and ~18 test sites. Breaking change — we are in alpha.

Moderate

# Location Issue Status Description
C1 CSharpSourceEngine.cs:74-85 async with no await ProcessAsync no longer uses async — returns Task<CloudEvent?> via Task.FromResult(result). LoadCode similarly returns Task.CompletedTask. Resolved as part of B8 in commit 4684daa. Per-method flag above updated from ⚠️ to .
C2 WorkManagerGrpcService.cs:194-219 async with no await GetWorkerHistory no longer uses async — returns Task.GetWorkerHistoryResponse> via Task.FromResult(response). Commit: f3d1626. Per-method flag above updated from to .
C3 DaprConnectionRepository Resilience pipeline unused All 6 Dapr call sites now wrapped with _resilience.ExecuteAsync(ct => _daprClient.X(...), cancellationToken) — retry + circuit breaker actually applied.
C4 WebSocketClientWrapper.DisconnectAsync Swallows exceptions catch (Exception ex) when (ex is not OperationCanceledException) — propagates non-cancellation errors to the caller while still swallowing the expected OperationCanceledException during shutdown.
C5 DaprConnectionRepository.GetByInstanceIdAsync Fetch-all then filter New per-instance secondary index websocket-instance-index-{instanceId} maintained in SaveAsync/DeleteAsync. GetByInstanceIdAsync fetches only the targeted index + bulk-gets the matching connection keys (O(instance's connections) instead of O(all connections)).
C6 PythonEngine.ProcessAsync:98 WaitAsync() without CancellationToken First _processSemaphore.WaitAsync() now passes cancellationToken (the method parameter). Symmetric with the existing call at line 301.

Low / Nice-to-Have

# Location Issue Status Description
D1 Multiple files Unnecessary async keyword 3 methods converted to Task.FromResult returns: GatewayService.ListServices, PubsubService.Unsubscribe, StateService.RegisterKeys. Per-method flags in the tables above updated from to ; the methods now use the Task.FromResult(sync_work) idiom and no longer trigger CS1998. Commit: 5e20c4c.
D2 Entire codebase No ConfigureAwait(false) Added ConfigureAwait(false) to all 17 await expressions in virtufin-api/ApiClient.cs (NuGet client library). Server projects left as-is (no benefit in ASP.NET Core sync context).
D3 IEngine.ProcessAsync No CancellationToken 🔄 IEngine.ProcessAsync and IEngine.LoadCodeAsync both gained CancellationToken parameters. Worker.ProcessAsync passes through. Resolved as part of B8.
D4 PythonEngine.cs:98 WaitAsync() without CT Resolved as part of C6.
D5 ResilientDaprPublisher.DisposeAsync Flush with CancellationToken.None Bounded with CancellationTokenSource(TimeSpan.FromSeconds(5)) so a hung Dapr publish cannot block host shutdown indefinitely.
D6 DistributedWebSocketConnectionStore.CreateConnectionAsync Unnecessary async wrapper Method + interface changed to ValueTask<WebSocketConnection> using ValueTask.FromResult — avoids the Task allocation for the sync-completion path. Per-method flag in the table above updated from ⚠️ to . Commit: 75afcde.
D7 WebSocketClientWrapper.cs:124-125 Mutable CTS race condition WebSocketConnection.CancellationTokenSource setter removed; replaced with SwapCancellationTokenSource(newCts) using Interlocked.Exchange for atomic replace + Volatile.Read for atomic snapshot. Previous CTS is now Cancel()/Dispose()d by the swapper.
D8 Program.cs:148 EnableDetailedErrors=true in production All 3 services (virtufin-api, virtufin-websocketmanager, virtufin-workmanager) now gate on builder.Environment.IsDevelopment().
D9 ServicesConfigurationLoader.Load Sync file I/O Load()LoadAsync() using File.ReadAllTextAsync. DI factory at startup calls .GetAwaiter().GetResult() — acceptable at the single startup point. Per-method flag in the table above updated from ⚠️ to (and method name corrected from Load to LoadAsync). Commit: f75cb6d.
D10 SubscriptionHealthSweeper.cs:62 Race condition on heartbeat Re-examined: the subscription channel is Channel.CreateUnbounded<>, so TryWrite returning false only signals a completed channel (genuine dead subscriber) — never a transient full-channel backpressure. The deferred "false positive" risk does not apply. Actual fix: stop forwarding synthetic heartbeats to the gRPC client (the Subscribe RPC consumer now skips events with empty Data). Commit a770f5c.
D11 GrpcReflectionService.cs:271 lock in async method DateTime _loadedAtlong _loadedAtTicks. Both reads in EnsureLoadedAsync now use Interlocked.Read(ref _loadedAtTicks); write uses Interlocked.Exchange.

Before/After Examples

A1: GrpcChannelPool.ValidateReflection — Sync-over-Async

Before (Critical):

private void ValidateReflection(GrpcChannel channel, string serviceName)
{
    var client = new ServerReflection.ServerReflectionClient(channel);
    var call = client.ServerReflectionInfo();
    var request = new ServerReflectionRequest { FileContainingSymbol = serviceName };
    call.RequestStream.WriteAsync(request).GetAwaiter().GetResult();  // BLOCKS
    call.ResponseStream.MoveNext(CancellationToken.None).GetAwaiter().GetResult();  // BLOCKS
}

After:

private async Task ValidateReflectionAsync(GrpcChannel channel, string serviceName)
{
    var client = new ServerReflection.ServerReflectionClient(channel);
    using var call = client.ServerReflectionInfo();
    var request = new ServerReflectionRequest { FileContainingSymbol = serviceName };
    await call.RequestStream.WriteAsync(request);
    await call.ResponseStream.MoveNext(CancellationToken.None);
}
Then refactor GetOrAdd factory to use async lazy initialization.

A2: PythonEngine — Blocking WaitForExit

Before (Critical):

process.Kill();
process.WaitForExit(5000);  // Blocks thread pool

After:

process.Kill();
await process.WaitForExitAsync(cancellationToken);  // Async, cancellable

A3: PythonEngine — Blocking WaitForExit (EnsurePythonAvailable)

Before (Critical):

var process = Process.Start(psi);
process.WaitForExit();  // Blocks until python3 --version exits

After:

var process = Process.Start(psi);
await process.WaitForExitAsync();

B1: WebSocketClientWrapper.StartReceiveLoop — Fire-and-forget

Before (Important):

public void StartReceiveLoop(...)
{
    connection.CancellationTokenSource = new CancellationTokenSource();
    connection.ReceiveTask = Task.Run(async () => { ... });
    // Task.Run result never awaited — unobserved exceptions silently discarded
}

After:

public void StartReceiveLoop(...)
{
    connection.CancellationTokenSource = new CancellationTokenSource();
    connection.ReceiveTask = Task.Run(async () => { ... })
        .ContinueWith(t => {
            if (t.IsFaulted)
                _logger.LogError(t.Exception, "Receive loop crashed");
        }, TaskContinuationOptions.OnlyOnFaulted);
}

B5: IStreamEventHandler — Sync-only callbacks

Before (Important):

public interface IStreamEventHandler
{
    void OnEvent(CloudEvent e);
    void OnError(Exception ex);
}

After:

public interface IStreamEventHandler
{
    void OnEvent(CloudEvent e);
    void OnError(Exception ex);
}

public interface IAsyncStreamEventHandler : IStreamEventHandler
{
    Task OnEventAsync(CloudEvent e);
    Task OnErrorAsync(Exception ex);
}
Then update SubscribeAsync to check if handler is IAsyncStreamEventHandler before calling.

C1: IEngine.ProcessAsync — No CancellationToken

Before (Design Limitation):

public interface IEngine
{
    void LoadCode(ReadOnlyMemory<byte> code);
    Task<CloudEvent?> ProcessAsync(CloudEvent input);
}

After:

public interface IEngine
{
    void LoadCode(ReadOnlyMemory<byte> code, CancellationToken cancellationToken = default);
    Task<CloudEvent?> ProcessAsync(CloudEvent input, CancellationToken cancellationToken = default);
}

B5: WebSocketClientWrapper.DisconnectAsync — Swallows exceptions

Before (Important):

catch (Exception ex)
{
    _logger.LogWarning(ex, "Error during disconnect for connection {ConnectionId}", connectionId);
    // Exception swallowed — caller can't detect failure
}

After:

catch (Exception ex) when (ex is not OperationCanceledException)
{
    _logger.LogWarning(ex, "Error during disconnect for connection {ConnectionId}", connectionId);
    throw;  // Propagate to caller
}


Summary Statistics

Metric Original Count Resolved Deferred
Total methods inventoried ~150
Critical anti-patterns 3 3 0
Important anti-patterns 8 8 0
Moderate anti-patterns 6 6 0
Low / Nice-to-have anti-patterns 11 11 0
Total 28 28 0
Methods matching .NET conventions ~90% ~99%
Unnecessary async keywords (CS1998) 5 5 0
Fire-and-forget patterns 3 3 0
Sync-over-async blocking (.Result/.Wait()/.GetAwaiter().GetResult()) 3 3 0
Missing CancellationToken passthrough 2 2 0
Missing ConfigureAwait(false) in NuGet libraries 1 (pervasive) 1 0
Breaking changes required (alpha-safe) 2 2 (B4, B8) 0

Status: All 28 of 28 anti-patterns resolved across 3 repos.

  1. Critical — A1 (GrpcChannelPool.ValidateReflection sync-over-async) and A2/A3 (PythonEngine blocking waits).
  2. Important — B1, B2, B3 (fire-and-forget patterns) — added ContinueWith(OnlyOnFaulted) for exception logging.
  3. Important — B4 (sync blocking gRPC in client library) — sync UnsubscribeFromTopic deleted (alpha-safe breaking change).
  4. Important — B5 (IStreamEventHandler sync-only) — added IAsyncStreamEventHandler opt-in async variant.
  5. Important — B6 (PythonEngine.Dispose sync) — implemented IAsyncDisposable.
  6. Important — B8 (WorkManager.LoadCode sync) — IEngine.LoadCodeLoadCodeAsync (alpha-safe breaking change). Resolved D3 in same commit.
  7. Moderate — C3 (Dapr resilience wiring), C4 (disconnect exception propagation), C5 (per-instance index), C6 (CT passthrough).
  8. Moderate — Clean up CS1998 warnings (C1, C2, D1). All 5 cases now resolved, zero CS1998 warnings remain. D1 (3 methods in virtufin-api) in commit 5e20c4c; C1 (CSharpSourceEngine.ProcessAsync) in commit 4684daa; C2 (WorkManagerGrpcService.GetWorkerHistory) in commit f3d1626.
  9. Low — D2 (ConfigureAwait(false) in virtufin-api client), D5 (disposal timeout), D6 (ValueTask), D7 (atomic CTS), D8 (env-gated detailed errors), D9 (async file I/O), D11 (atomic _loadedAt).
  10. Low — D10 (SubscriptionHealthSweeper heartbeat race) — re-examined: the channel is unbounded, so TryWrite=false only signals completion, not backpressure. Fix: stop forwarding synthetic heartbeats to the gRPC client. Commit a770f5c.

Resolution Summary

All 28 audit findings were addressed in a single coordinated sweep across virtufin-api, virtufin-websocketmanager, and virtufin-workmanager. Changes were made in 2 breaking-change commits (B4 deleted sync method, B8 made IEngine.LoadCode async) and 25 non-breaking commits — all on local master branches, not yet pushed.

Commits by repository (unpushed)

Repository Commits
virtufin-websocketmanager 7 — B1, C3, C4, C5, D6, D7, D8
virtufin-api 10 — A1, B2/B3, B4 obsolete, B4 removal, B5, D1, D2, D8, D9, D11
virtufin-workmanager 9 — A2, A3, B6, B8 interface, B8 callers, C2, C6, D5, D8

Notable design changes

  • IEngine is now fully asyncLoadCodeAsync(byte[], CancellationToken) + ProcessAsync(CloudEvent, CancellationToken). PythonEngine is the largest beneficiary: TCP listener accept, read, write, and ack all run on the thread pool instead of blocking it.
  • IWebSocketConnectionStore.CreateConnectionAsync returns ValueTask<...> — avoids Task allocation for the common sync-completion path.
  • WebSocketConnection.CancellationTokenSource setter removed — replaced with SwapCancellationTokenSource(newCts) using Interlocked.Exchange. Atomic replace + cancel + dispose of the previous CTS happens in a single call.
  • Per-instance secondary index in DaprConnectionRepositoryGetByInstanceIdAsync is now O(instance's connections) instead of O(all connections). Save/Delete maintain both the global and per-instance indexes.
  • DaprResiliencePipeline is now actually wired in — all 6 Dapr state store calls in DaprConnectionRepository go through retry + circuit breaker.
  • Program.cs EnableDetailedErrors is now builder.Environment.IsDevelopment() in all 3 services.

Verification

All 3 repos build with 0 errors on local machine. The virtufin-workmanager service build was not directly testable locally due to Virtufin.Api.Client 0.0.38 not being in the local NuGet cache (CI will have credentials), but PythonEngine and CSharpSourceEngine projects (which use the new async interface) build clean.

  1. Push the 32 commits across 5 repos (virtufin-common, virtufin-api, virtufin-websocketmanager, virtufin-workmanager, virtufin-examples).
  2. Configure VIRTUFIN_PACKAGES_USER/VIRTUFIN_PACKAGES_TOKEN as Gitea Actions secrets in the 5 caller repos (virtufin-api, virtufin-websocketmanager, virtufin-workmanager, virtufin-examples, docker-compose).
  3. Stress-test the SubscriptionHealthSweeper (D10) before revisiting.
  4. Out of audit scope: rotate the plaintext HARBOR_TOKEN in virtufin-dotnet/.env (flagged in earlier sessions, never edited).

Tracking

Per-repo resolution tracking issues (all closed, since items are already resolved on local master):