Skip to content

Worker Management

Purpose

Event-driven worker management system enabling deployment and execution of workers written in multiple languages (Python, C#, .NET DLL) that subscribe to Dapr pub/sub topics, process CloudEvents, and coordinate via distributed locks.

Topic and state-key naming for worker lifecycle events is defined in pubsub-topics/spec.md. The lifecycle topic is workmanager.lifecycle; finer-grained infra events (heartbeats, grouplock changes) live under workmanager.infra.*.

Requirements

Requirement: Worker Lifecycle Management

The WorkManager SHALL support creating, starting, stopping, and deleting workers. Workers SHALL persist their configuration and restore automatically on service restart.

Scenario: Create and start a worker

  • WHEN a client creates a worker with a topic subscription and code payload
  • THEN the worker SHALL be persisted to the state store, subscribed to the topic, and set to Running status

Scenario: Hot-reload worker code

  • WHEN a client updates a running worker's code
  • THEN the worker SHALL switch to the new code without restarting the service

Scenario: Auto-recovery on restart

  • WHEN the WorkManager service restarts
  • THEN all previously persisted workers SHALL be restored to their saved state before the health check passes

Scenario: Worker version history

  • WHEN a worker's code is updated
  • THEN the previous code version SHALL be preserved in a versioned history

Requirement: Service Architecture

The WorkManager SHALL use the virtufin-api gRPC for all state operations. The WorkManager SHALL NOT call DaprClient.GetStateAsync, SaveStateAsync, DeleteStateAsync, or any other Dapr state API directly. The Dapr sidecar SHALL remain in use for service-invocation mTLS, distributed tracing, and metrics.

Scenario: Service persists worker state

  • WHEN the WorkManager saves or retrieves worker configuration
  • THEN it SHALL call the API's State gRPC service
  • AND it SHALL NOT call any Dapr state API directly

Scenario: Service subscribes to a worker topic

  • WHEN the WorkManager subscribes to a worker's input topic
  • THEN it SHALL call Pubsub.Subscribe on the virtufin-api (regular, not the system variant)
  • AND it SHALL NOT call DaprPublishSubscribeClient.SubscribeAsync directly

Requirement: Worker Lifecycle Events

The WorkManager SHALL publish worker lifecycle events to the workmanager.lifecycle topic on every state transition. Topic, ce-type, and state-key conventions are defined in pubsub-topics/spec.md. Events SHALL be CloudEvents v1.0 envelopes with ce-type = com.virtufin.workmanager.lifecycle.<state>.

Scenario: Worker created

  • WHEN a worker is created
  • THEN the WorkManager SHALL publish a worker.created event with worker_id, group, topic in the data payload

Scenario: Worker started

  • WHEN a worker is started
  • THEN the WorkManager SHALL publish a worker.started event

Scenario: Worker stopped

  • WHEN a worker is stopped
  • THEN the WorkManager SHALL publish a worker.stopped event

Scenario: Worker error

  • WHEN a worker process fails or throws
  • THEN the WorkManager SHALL publish a worker.error event with error_type and error_message in the data payload

Requirement: Polyglot Worker Engines

The WorkManager SHALL support executing worker code in multiple languages via a pluggable engine architecture. Built-in engines SHALL include Python (subprocess), C# (Roslyn runtime compilation), and pre-compiled .NET DLL (in-process, the default DotNetDllEngine via hostfxr + AssemblyLoadContext).

Scenario: Python worker execution

  • WHEN a worker with a Python engine processes a message
  • THEN the worker code SHALL execute in a dedicated Python subprocess with a configurable timeout

Scenario: C# source worker execution

  • WHEN a worker with a C# source engine processes a message
  • THEN the code SHALL be compiled at runtime via Roslyn and executed in-process

Scenario: DLL worker execution (in-process, default)

  • WHEN a worker with MIME type application/x-dotnet-dll processes a message
  • THEN the worker DLL SHALL be loaded into an isolated AssemblyLoadContext and executed in-process in the WorkManager, with sub-microsecond per-call latency

Scenario: Engine extension

  • WHEN a new engine implementation conforms to the IEngine interface
  • THEN it SHALL be registrable in the engine registry without modifying core WorkManager code

Requirement: Event-Driven Message Processing

Workers SHALL subscribe to Dapr pub/sub topics and process incoming CloudEvents. The output CloudEvent type field SHALL determine the publish topic.

Scenario: Message processing

  • WHEN a CloudEvent is published to a topic a worker subscribes to
  • THEN the worker's code SHALL be invoked with the event data and any output events SHALL be published to the topic matching their type field

Scenario: Concurrent message handling

  • WHEN multiple messages arrive for the same worker
  • THEN they SHALL be processed concurrently, subject to group locking constraints

Requirement: Distributed Coordination

Workers assigned to the same group SHALL process messages with mutually exclusive execution using distributed locks with fencing tokens. Messages that exhaust retries SHALL go to a dead-letter topic.

Scenario: Lock acquisition

  • WHEN a worker in a group receives a message
  • THEN it SHALL acquire a distributed lock with a fencing token before processing

Scenario: Lock contention

  • WHEN a lock cannot be acquired because another instance holds it
  • THEN the message SHALL be retried with exponential backoff, up to the Dapr max redelivery count

Scenario: Dead letter

  • WHEN a message exhausts all redelivery attempts due to lock contention
  • THEN the message SHALL be routed to the dead-letter topic (<topic>-dead)

Scenario: Stale lock theft

  • WHEN a lock holder's fencing token is stale
  • THEN another instance SHALL be able to steal the lock

Requirement: Worker Code Sources

Workers SHALL accept code from inline Base64-encoded content or from HTTP(S) URLs. URL-based sources SHALL be validated against SSRF and host allowlist constraints as defined in the cross-cutting security requirements.

Scenario: Inline code worker

  • WHEN a worker is created with a Base64-encoded code string
  • THEN the decoded code SHALL be passed to the engine for execution

Scenario: URL code worker

  • WHEN a worker is created with a URL pointing to code
  • THEN the code SHALL be fetched, SHA-256 verified, and passed to the engine

Scenario: Blocked URL

  • WHEN a worker's code URL targets a private IP or a host not in the allowlist
  • THEN the worker creation SHALL fail with a security rejection

Requirement: Python Sandboxing

Python worker subprocesses SHALL run with restricted interpreter flags and a package allowlist. All non-allowed imports SHALL be blocked.

Scenario: Allowed import

  • WHEN a Python worker imports a package listed in PYTHON_ALLOWED_PACKAGES
  • THEN the import SHALL succeed

Scenario: Blocked import

  • WHEN a Python worker imports a package not in the allowlist
  • THEN the import SHALL fail with an error

Requirement: Worker Contract (IWorker)

The IWorker interface SHALL define an asynchronous Process method returning Task<CloudEvent?>. WorkerBase SHALL accept constructor parameters (responseType, source) instead of abstract property overrides for CloudEvent routing.

Scenario: Async worker process

  • WHEN the WorkManager invokes a worker's Process method
  • THEN the call SHALL be awaited asynchronously without blocking threads

Scenario: Constructor-based configuration

  • WHEN a worker extends WorkerBase or CommandWorker
  • THEN it SHALL pass the response CloudEvent type and source URI via the constructor, not via abstract property overrides

Scenario: ApiWorker with pre-configured ApiClient

  • WHEN a worker extends ApiWorker with apiHost, apiPort, responseEventType, and source
  • THEN the base class SHALL create an ApiClient at the configured host/port and provide it to ProcessAsync(CloudEvent, ApiClient)

Scenario: ApiCommandWorker with pre-configured ApiClient

  • WHEN a worker extends ApiCommandWorker with apiHost, apiPort, responseEventType, and source
  • THEN the base class SHALL create an ApiClient at the configured host/port and make it available during HandleCommandAsync

Requirement: CommandWorker Base Class

A CommandWorker abstract class SHALL extend WorkerBase to simplify command-based workers. It SHALL parse the CloudEvent data as JSON, extract a "command" field, and dispatch to the abstract HandleCommandAsync method. Unknown commands SHALL produce automatic error responses.

Scenario: Command dispatch

  • WHEN a CloudEvent arrives with {"command": "hello", ...} as data
  • THEN CommandWorker SHALL parse the JSON, extract "hello", and call HandleCommandAsync(input, "hello", node)

Scenario: Unknown command

  • WHEN CommandWorker cannot match the command to any handler
  • THEN it SHALL return an error response with "Unknown command: <command>"

Scenario: Correlation ID propagation

  • WHEN CommandWorker creates a response via Response(input, ...) or Error(input, ...)
  • THEN the correlationid from the input CloudEvent SHALL be propagated to the output CloudEvent via the WithCorrelationId extension method

Requirement: Correlation ID Propagation

Worker responses SHALL propagate the correlationid extension attribute from the input CloudEvent to the output CloudEvent. The CloudEvent.WithCorrelationId(input) extension method SHALL handle this as a no-op when absent.

Scenario: Response with correlation ID

  • WHEN a worker creates a Response() or Error() via WorkerBase helpers
  • THEN the output CloudEvent SHALL carry the same correlationid as the input CloudEvent if present

Scenario: Input without correlation ID

  • WHEN a worker processes an input CloudEvent without a correlationid extension
  • THEN the output CloudEvent SHALL NOT have a correlationid extension

Requirement: Generic CommandWorker Base Class

The Virtufin.Worker.DevKit package SHALL provide a generic CommandWorker<T> where T : struct, Enum and a generic ApiCommandWorker<T> where T : struct, Enum as the canonical base classes for command-based workers. The wire protocol of the CloudEvent data.command field SHALL be the lowercase identifier of the enum value (e.g. enum WebSocketManagerCommand.create corresponds to the wire string "create"). The base class SHALL parse the wire string with Enum.TryParse<T>(command, ignoreCase: true, out _) and dispatch to an abstract HandleAsync(CloudEvent, T, JsonNode) method. An unknown wire command SHALL produce an error response. The non-generic CommandWorker / ApiCommandWorker types are removed (alpha, no backward compatibility).

Scenario: Generic dispatch on valid command

  • WHEN a CloudEvent arrives with {"command": "create", ...} and the worker is CommandWorker<MyEnum> where MyEnum.create exists
  • THEN the base SHALL parse "create" into MyEnum.create and invoke HandleAsync(input, MyEnum.create, node)

Scenario: Wire name == identifier (lowercase)

  • WHEN a worker defines an enum value WebSocketManagerCommand.create
  • THEN the corresponding wire command string SHALL be the lowercase identifier "create"

Scenario: Unknown command

  • WHEN the wire command string does not match any enum value
  • THEN the base SHALL return an error response with message "Unknown command: <command>"

Scenario: Missing command field

  • WHEN the CloudEvent data has no "command" field
  • THEN the base SHALL return an error response with message "Missing 'command' field"

Scenario: Malformed JSON

  • WHEN the CloudEvent data cannot be parsed as JSON or parses to null
  • THEN the base SHALL return an error response with message "Failed to parse command JSON" (or wrap the parser exception in "Internal error: …")

Scenario: Generic ApiCommandWorker

  • WHEN a worker extends ApiCommandWorker<T> and the api_host field is present in the command JSON
  • THEN the base SHALL create or reconfigure the ApiClient for the parsed host/port (default port 5002) and make it available via the Api property during HandleAsync

Scenario: Backend invocation via dynamic gateway

  • WHEN a worker invokes a backend service via dynamic gateway = Api.Gateway; await gateway.<service>.<method>(requestData);
  • THEN the ServiceClient returned by <service> SHALL resolve to the named service and <method> SHALL invoke the corresponding method via the API Gateway

Scenario: New enum value triggers compiler warning on the switch

  • WHEN a new value is added to the command enum and the worker's HandleAsync switch expression does not handle it
  • THEN the C# compiler SHALL emit a warning (CS8509 or equivalent) for the unhandled enum value

Requirement: In-Process DotNet DLL Engine

The WorkManager SHALL support an in-process DotNetDllEngine (default for MIME type application/x-dotnet-dll) that loads pre-compiled .NET worker DLLs directly into the AOT-compiled WorkManager process. The engine SHALL embed the .NET runtime (CoreCLR) into the host process on first use via the hostfxr C API and SHALL load the worker DLL into a per-worker, collectible AssemblyLoadContext. The engine SHALL be AOT-compatible: the engine code itself is compiled with <IsAotCompatible>true</IsAotCompatible> and SHALL NOT use reflection on its own types; the only reflection SHALL be on the JIT-loaded worker assembly.

Scenario: First LoadCodeAsync initializes CoreCLR

  • WHEN the first LoadCodeAsync is called for MIME type application/x-dotnet-dll
  • THEN the engine SHALL load libhostfxr via NativeLibrary.Load and call hostfxr_initialize_for_runtime_config to initialize CoreCLR in the host process; the cost SHALL be paid once per WorkManager process

Scenario: Subsequent LoadCodeAsync in the same process

  • WHEN a second LoadCodeAsync is called after CoreCLR is already initialized
  • THEN the engine SHALL skip the hostfxr initialization and load the new worker DLL directly

Scenario: Per-worker AssemblyLoadContext

  • WHEN a LoadCodeAsync is called with a worker nupkg
  • THEN the engine SHALL extract the worker DLL (and its sibling dependencies) from the nupkg's lib/<tfm>/ entries, create a new collectible AssemblyLoadContext, and load the worker assembly into it

Scenario: DevKit resolution

  • WHEN the JIT-loaded worker assembly references Virtufin.Worker.DevKit
  • THEN the engine SHALL resolve the reference to the AOT-compiled Virtufin.Worker.DevKit instance already loaded in the WorkManager process (so the worker and the engine see the same type identity)

Scenario: Direct in-process dispatch

  • WHEN ProcessAsync is called on the engine with a CloudEvent
  • THEN the engine SHALL call the worker's IWorker.ProcessAsync directly, with no socket, no JSON serialization, and no subprocess scheduling; the per-call latency SHALL be sub-microsecond

Scenario: Runtime discovery on target

  • WHEN the WorkManager runs on a target that does not have a compatible .NET runtime installed
  • THEN the first LoadCodeAsync SHALL fail with a clear error message that names the missing runtime and the discovery paths (DOTNET_ROOT, system install, side-by-side host/fxr/<version>/)

Scenario: Roll-forward to a newer runtime

  • WHEN the WorkManager is built for .NET 10 and the target has a .NET 11+ runtime installed
  • THEN the engine SHALL use the newer runtime; the generated runtimeconfig.json SHALL declare "rollForward": "Major"

Requirement: Native DLL Engine

The WorkManager SHALL support a NativeDllEngine for executing workers shipped as a per-architecture native shared library (.so / .dll / .dylib). The engine SHALL run in-process inside the (NativeAOT-compiled) WorkManager and SHALL NOT require a separate worker-host subprocess or a JIT build stage. The engine SHALL be registered for MIME type application/x-native-dll. Per-architecture entries SHALL live under runtimes/<rid>/native/ inside the worker zip, matching the NuGet convention for native packages.

Scenario: Native worker execution

  • WHEN a worker with MIME type application/x-native-dll processes a CloudEvent
  • THEN the engine SHALL NativeLibrary.Load the matching runtimes/<rid>/native/<file> entry from the worker zip, resolve the entry_point and free_result exports declared in manifest.json, marshal the input CloudEvent to a FlatBuffer, invoke the native Process function, and deserialize the returned FlatBuffer into a WorkerResponse.

Scenario: Per-architecture entry selection

  • WHEN a worker zip contains entries under runtimes/<rid>/native/ for at least one supported RID (linux-x64, linux-arm64)
  • THEN the engine SHALL select the entry whose <rid> directory matches RuntimeInformation.RuntimeIdentifier of the running process and SHALL load the corresponding lib<library>.so, <library>.dll, or lib<library>.dylib per the platform (see Native Worker Library Naming).

Scenario: Unsupported architecture

  • WHEN the worker zip does not contain a runtimes/<rid>/native/ entry for the running architecture
  • THEN the engine SHALL throw NotSupportedException listing the supported archs (linux-x64, linux-arm64).

Scenario: Missing manifest

  • WHEN the worker zip does not contain manifest.json at the root
  • THEN the engine SHALL refuse to load the worker and throw a clear error naming the missing file.

Scenario: ABI version mismatch

  • WHEN the manifest.json declares an abi_version that is not in the engine's supported set
  • THEN the engine SHALL refuse to load the worker and throw a clear error naming the supported versions.

Requirement: Native Worker ABI

The NativeDllEngine SHALL define and enforce a stable C ABI between the managed engine and the native worker. The ABI SHALL consist of: (a) a FlatBuffers schema for the CloudEvent wire format; (b) two C function exports Process and FreeResult; (c) a VirtufinHost struct carrying host callbacks; (d) a manifest.json declaring the ABI version, the library basename, and the export names. The engine SHALL vendor a virtufin_worker_api.h header for distribution to worker authors.

Scenario: ABI struct layout

  • WHEN a native worker is loaded
  • THEN the engine SHALL pass a VirtufinHost* whose first field is an opaque engine_ptr (a GCHandle to the engine instance), whose second field is abi_version (currently 1), and whose remaining fields are C function pointers log, gateway_call, and free_response.

Scenario: CloudEvent FlatBuffer input

  • WHEN the engine invokes Process(host, in_buf, in_len, &out_buf, &out_len)
  • THEN in_buf SHALL point to a FlatBuffer-encoded CloudEvent matching the vendored worker_api.fbs schema. The extensions[correlationid] field SHALL carry the correlation id from the input CloudEvent when present.

Scenario: WorkerResponse FlatBuffer output

  • WHEN Process returns 0
  • THEN *out_buf SHALL point to a FlatBuffer-encoded WorkerResponse in which exactly one of result_event or error_message is set. If result_event is set, the engine SHALL publish it on the reply topic derived from the input CloudEvent. If error_message is set, the engine SHALL surface a worker.error lifecycle event and publish an error response if the worker did not specify one.

Scenario: FreeResult pairing

  • WHEN Process returns a non-null *out_buf
  • THEN the engine SHALL call FreeResult(out_buf) exactly once, including on the error path. The worker SHALL use the same allocator for the *out_buf allocation that FreeResult releases.

Scenario: Logging callback

  • WHEN the native worker calls host->log(level, message)
  • THEN the engine SHALL map level (0=trace, 1=debug, 2=info, 3=warn, 4=error) to the corresponding Microsoft.Extensions.Logging.LogLevel and forward message to the engine's structured logger. The lifetime of message SHALL be limited to the call.

Scenario: Gateway call indirection

  • WHEN the native worker calls host->gateway_call(service, method, request, request_len, &response, &response_len)
  • THEN the engine SHALL deserialize request as JSON into a Dictionary<string, object?>, invoke ApiClient.InvokeAsync(service, method, dict), JSON-serialize the returned dictionary into *response (allocated with Marshal.AllocCoTaskMem), and return 0 on success or non-zero on internal failure. The worker SHALL free *response via host->free_response(response).

Scenario: Manifest format

  • WHEN the engine parses the worker's manifest.json
  • THEN the JSON object SHALL contain abi_version (integer) and library (non-empty string matching [A-Za-z0-9_.-]+), and MAY contain entry_point (string, default "Process") and free_result (string, default "FreeResult"). Additional fields SHALL be ignored by v1. A missing or empty library field SHALL cause the engine to refuse the worker with a clear error.

Requirement: Native Worker Identity and Lifecycle

The NativeDllEngine SHALL communicate worker identity to the native worker via process-level environment variables set at LoadCodeAsync time, and SHALL release the loaded library and all engine-owned resources on worker stop.

Scenario: Worker identity env vars

  • WHEN the engine loads a native worker
  • THEN it SHALL set VIRTUFIN_WORKER_ID to the worker GUID, VIRTUFIN_WORKER_GROUP to the worker group if non-null, and VIRTUFIN_WORKER_TOPIC to the worker's input topic. The worker SHALL read these once at initialization.

Scenario: Correlation id from input

  • WHEN the engine marshals the input CloudEvent for a Process call
  • THEN the FlatBuffer's extensions[correlationid] attribute SHALL carry the input CloudEvent's correlationid extension attribute if present. Workers SHALL read it from the FlatBuffer, not from env vars.

Scenario: Concurrent Process calls

  • WHEN multiple Process invocations are made against the same loaded library
  • THEN the engine SHALL allow them to proceed concurrently. The native worker SHALL be reentrant; any per-worker mutable state SHALL be protected by the worker's own synchronization primitives.

Scenario: Reload under load

  • WHEN LoadCodeAsync is called while a ProcessAsync is in flight
  • THEN the engine SHALL block the reload until the in-flight call completes. The new code SHALL take effect for subsequent calls only.

Scenario: Cleanup on stop

  • WHEN the engine is disposed or the worker is stopped
  • THEN the engine SHALL call NativeLibrary.Free on the loaded library, free the GCHandle for the engine pointer, delete the temp file holding the DLL, and restore the prior values of VIRTUFIN_WORKER_ID, VIRTUFIN_WORKER_GROUP, VIRTUFIN_WORKER_TOPIC if they were set.

Requirement: Native Worker Caveats

The NativeDllEngine SHALL document its isolation and timeout characteristics in the engine README and the worker author guide.

Scenario: In-process crash isolation caveat

  • WHEN a native worker raises an unrecoverable error (segmentation fault, illegal instruction, stack overflow from native code)
  • THEN the WorkManager process SHALL terminate because the error cannot be caught in a way that leaves the .NET runtime in a defined state. Workers SHALL be authored under the assumption that a crash terminates the entire WorkManager. This caveat is unique to the NativeDllEngine; the Python and DotNetDll engines isolate their workers in subprocesses.

Scenario: Unmanaged timeout caveat

  • WHEN the MessageHandlingTimeoutSeconds elapses while a native Process call is in flight
  • THEN the CancellationToken on the managed side SHALL fire and the engine SHALL surface a worker.error lifecycle event, but the native call SHALL continue to run until it returns or the process crashes. Workers SHALL be authored to honor the timeout cooperatively (e.g. via periodic checks of a flag set from a watchdog thread, if at all) or SHALL be guaranteed to return within the timeout.

Scenario: ABI version policy

  • WHEN the WorkManager ships a new major ABI version
  • THEN the engine SHALL continue to load workers declaring the previous ABI version for at least one release cycle, and SHALL refuse workers declaring ABIs older than that.

Requirement: Native Worker Library Naming

The NativeDllEngine SHALL derive the per-platform shared library filename from the manifest's library field by appending an OS-specific prefix and suffix, matching the convention used by NuGet-distributed native packages (runtimes/<rid>/native/). The engine SHALL pick the filename based on RuntimeInformation.IsOSPlatform.

Scenario: Linux filename

  • WHEN the running platform reports OSPlatform.Linux
  • THEN the engine SHALL load lib<library>.so from runtimes/<rid>/native/.

Scenario: Windows filename

  • WHEN the running platform reports OSPlatform.Windows
  • THEN the engine SHALL load <library>.dll from runtimes/<rid>/native/.

Scenario: macOS filename

  • WHEN the running platform reports OSPlatform.OSX
  • THEN the engine SHALL load lib<library>.dylib from runtimes/<rid>/native/.

Scenario: Library name validation

  • WHEN the manifest's library field is empty or contains characters outside [A-Za-z0-9_.-]
  • THEN the engine SHALL refuse the worker and throw a clear error naming the offending value.