Worker Management
Purpose¶
Event-driven worker management system enabling deployment and execution of workers written in multiple languages (Python, C#, .NET DLL) that subscribe to Dapr pub/sub topics, process CloudEvents, and coordinate via distributed locks.
Topic and state-key naming for worker lifecycle events is defined in pubsub-topics/spec.md. The lifecycle topic is workmanager.lifecycle; finer-grained infra events (heartbeats, grouplock changes) live under workmanager.infra.*.
Requirements¶
Requirement: Worker Lifecycle Management¶
The WorkManager SHALL support creating, starting, stopping, and deleting workers. Workers SHALL persist their configuration and restore automatically on service restart.
Scenario: Create and start a worker¶
- WHEN a client creates a worker with a topic subscription and code payload
- THEN the worker SHALL be persisted to the state store, subscribed to the topic, and set to Running status
Scenario: Hot-reload worker code¶
- WHEN a client updates a running worker's code
- THEN the worker SHALL switch to the new code without restarting the service
Scenario: Auto-recovery on restart¶
- WHEN the WorkManager service restarts
- THEN all previously persisted workers SHALL be restored to their saved state before the health check passes
Scenario: Worker version history¶
- WHEN a worker's code is updated
- THEN the previous code version SHALL be preserved in a versioned history
Requirement: Service Architecture¶
The WorkManager SHALL use the virtufin-api gRPC for all state operations. The WorkManager SHALL NOT call DaprClient.GetStateAsync, SaveStateAsync, DeleteStateAsync, or any other Dapr state API directly. The Dapr sidecar SHALL remain in use for service-invocation mTLS, distributed tracing, and metrics.
Scenario: Service persists worker state¶
- WHEN the WorkManager saves or retrieves worker configuration
- THEN it SHALL call the API's
StategRPC service - AND it SHALL NOT call any Dapr state API directly
Scenario: Service subscribes to a worker topic¶
- WHEN the WorkManager subscribes to a worker's input topic
- THEN it SHALL call
Pubsub.Subscribeon the virtufin-api (regular, not the system variant) - AND it SHALL NOT call
DaprPublishSubscribeClient.SubscribeAsyncdirectly
Requirement: Worker Lifecycle Events¶
The WorkManager SHALL publish worker lifecycle events to the workmanager.lifecycle topic on every state transition. Topic, ce-type, and state-key conventions are defined in pubsub-topics/spec.md. Events SHALL be CloudEvents v1.0 envelopes with ce-type = com.virtufin.workmanager.lifecycle.<state>.
Scenario: Worker created¶
- WHEN a worker is created
- THEN the WorkManager SHALL publish a
worker.createdevent withworker_id,group,topicin the data payload
Scenario: Worker started¶
- WHEN a worker is started
- THEN the WorkManager SHALL publish a
worker.startedevent
Scenario: Worker stopped¶
- WHEN a worker is stopped
- THEN the WorkManager SHALL publish a
worker.stoppedevent
Scenario: Worker error¶
- WHEN a worker process fails or throws
- THEN the WorkManager SHALL publish a
worker.errorevent witherror_typeanderror_messagein the data payload
Requirement: Polyglot Worker Engines¶
The WorkManager SHALL support executing worker code in multiple languages via a pluggable engine architecture. Built-in engines SHALL include Python (subprocess), C# (Roslyn runtime compilation), and pre-compiled .NET DLL (in-process, the default DotNetDllEngine via hostfxr + AssemblyLoadContext).
Scenario: Python worker execution¶
- WHEN a worker with a Python engine processes a message
- THEN the worker code SHALL execute in a dedicated Python subprocess with a configurable timeout
Scenario: C# source worker execution¶
- WHEN a worker with a C# source engine processes a message
- THEN the code SHALL be compiled at runtime via Roslyn and executed in-process
Scenario: DLL worker execution (in-process, default)¶
- WHEN a worker with MIME type
application/x-dotnet-dllprocesses a message - THEN the worker DLL SHALL be loaded into an isolated
AssemblyLoadContextand executed in-process in the WorkManager, with sub-microsecond per-call latency
Scenario: Engine extension¶
- WHEN a new engine implementation conforms to the
IEngineinterface - THEN it SHALL be registrable in the engine registry without modifying core WorkManager code
Requirement: Event-Driven Message Processing¶
Workers SHALL subscribe to Dapr pub/sub topics and process incoming CloudEvents. The output CloudEvent type field SHALL determine the publish topic.
Scenario: Message processing¶
- WHEN a CloudEvent is published to a topic a worker subscribes to
- THEN the worker's code SHALL be invoked with the event data and any output events SHALL be published to the topic matching their
typefield
Scenario: Concurrent message handling¶
- WHEN multiple messages arrive for the same worker
- THEN they SHALL be processed concurrently, subject to group locking constraints
Requirement: Distributed Coordination¶
Workers assigned to the same group SHALL process messages with mutually exclusive execution using distributed locks with fencing tokens. Messages that exhaust retries SHALL go to a dead-letter topic.
Scenario: Lock acquisition¶
- WHEN a worker in a group receives a message
- THEN it SHALL acquire a distributed lock with a fencing token before processing
Scenario: Lock contention¶
- WHEN a lock cannot be acquired because another instance holds it
- THEN the message SHALL be retried with exponential backoff, up to the Dapr max redelivery count
Scenario: Dead letter¶
- WHEN a message exhausts all redelivery attempts due to lock contention
- THEN the message SHALL be routed to the dead-letter topic (
<topic>-dead)
Scenario: Stale lock theft¶
- WHEN a lock holder's fencing token is stale
- THEN another instance SHALL be able to steal the lock
Requirement: Worker Code Sources¶
Workers SHALL accept code from inline Base64-encoded content or from HTTP(S) URLs. URL-based sources SHALL be validated against SSRF and host allowlist constraints as defined in the cross-cutting security requirements.
Scenario: Inline code worker¶
- WHEN a worker is created with a Base64-encoded code string
- THEN the decoded code SHALL be passed to the engine for execution
Scenario: URL code worker¶
- WHEN a worker is created with a URL pointing to code
- THEN the code SHALL be fetched, SHA-256 verified, and passed to the engine
Scenario: Blocked URL¶
- WHEN a worker's code URL targets a private IP or a host not in the allowlist
- THEN the worker creation SHALL fail with a security rejection
Requirement: Python Sandboxing¶
Python worker subprocesses SHALL run with restricted interpreter flags and a package allowlist. All non-allowed imports SHALL be blocked.
Scenario: Allowed import¶
- WHEN a Python worker imports a package listed in
PYTHON_ALLOWED_PACKAGES - THEN the import SHALL succeed
Scenario: Blocked import¶
- WHEN a Python worker imports a package not in the allowlist
- THEN the import SHALL fail with an error
Requirement: Worker Contract (IWorker)¶
The IWorker interface SHALL define an asynchronous Process method returning Task<CloudEvent?>. WorkerBase SHALL accept constructor parameters (responseType, source) instead of abstract property overrides for CloudEvent routing.
Scenario: Async worker process¶
- WHEN the WorkManager invokes a worker's Process method
- THEN the call SHALL be awaited asynchronously without blocking threads
Scenario: Constructor-based configuration¶
- WHEN a worker extends WorkerBase or CommandWorker
- THEN it SHALL pass the response CloudEvent type and source URI via the constructor, not via abstract property overrides
Scenario: ApiWorker with pre-configured ApiClient¶
- WHEN a worker extends ApiWorker with
apiHost,apiPort,responseEventType, andsource - THEN the base class SHALL create an
ApiClientat the configured host/port and provide it toProcessAsync(CloudEvent, ApiClient)
Scenario: ApiCommandWorker with pre-configured ApiClient¶
- WHEN a worker extends ApiCommandWorker with
apiHost,apiPort,responseEventType, andsource - THEN the base class SHALL create an
ApiClientat the configured host/port and make it available duringHandleCommandAsync
Requirement: CommandWorker Base Class¶
A CommandWorker abstract class SHALL extend WorkerBase to simplify command-based workers. It SHALL parse the CloudEvent data as JSON, extract a "command" field, and dispatch to the abstract HandleCommandAsync method. Unknown commands SHALL produce automatic error responses.
Scenario: Command dispatch¶
- WHEN a CloudEvent arrives with
{"command": "hello", ...}as data - THEN CommandWorker SHALL parse the JSON, extract
"hello", and callHandleCommandAsync(input, "hello", node)
Scenario: Unknown command¶
- WHEN CommandWorker cannot match the command to any handler
- THEN it SHALL return an error response with
"Unknown command: <command>"
Scenario: Correlation ID propagation¶
- WHEN CommandWorker creates a response via
Response(input, ...)orError(input, ...) - THEN the
correlationidfrom the input CloudEvent SHALL be propagated to the output CloudEvent via theWithCorrelationIdextension method
Requirement: Correlation ID Propagation¶
Worker responses SHALL propagate the correlationid extension attribute from the input CloudEvent to the output CloudEvent. The CloudEvent.WithCorrelationId(input) extension method SHALL handle this as a no-op when absent.
Scenario: Response with correlation ID¶
- WHEN a worker creates a
Response()orError()via WorkerBase helpers - THEN the output CloudEvent SHALL carry the same
correlationidas the input CloudEvent if present
Scenario: Input without correlation ID¶
- WHEN a worker processes an input CloudEvent without a
correlationidextension - THEN the output CloudEvent SHALL NOT have a
correlationidextension
Requirement: Generic CommandWorker Base Class¶
The Virtufin.Worker.DevKit package SHALL provide a generic CommandWorker<T> where T : struct, Enum and a generic ApiCommandWorker<T> where T : struct, Enum as the canonical base classes for command-based workers. The wire protocol of the CloudEvent data.command field SHALL be the lowercase identifier of the enum value (e.g. enum WebSocketManagerCommand.create corresponds to the wire string "create"). The base class SHALL parse the wire string with Enum.TryParse<T>(command, ignoreCase: true, out _) and dispatch to an abstract HandleAsync(CloudEvent, T, JsonNode) method. An unknown wire command SHALL produce an error response. The non-generic CommandWorker / ApiCommandWorker types are removed (alpha, no backward compatibility).
Scenario: Generic dispatch on valid command¶
- WHEN a CloudEvent arrives with
{"command": "create", ...}and the worker isCommandWorker<MyEnum>whereMyEnum.createexists - THEN the base SHALL parse
"create"intoMyEnum.createand invokeHandleAsync(input, MyEnum.create, node)
Scenario: Wire name == identifier (lowercase)¶
- WHEN a worker defines an enum value
WebSocketManagerCommand.create - THEN the corresponding wire command string SHALL be the lowercase identifier
"create"
Scenario: Unknown command¶
- WHEN the wire command string does not match any enum value
- THEN the base SHALL return an error response with message
"Unknown command: <command>"
Scenario: Missing command field¶
- WHEN the CloudEvent data has no
"command"field - THEN the base SHALL return an error response with message
"Missing 'command' field"
Scenario: Malformed JSON¶
- WHEN the CloudEvent data cannot be parsed as JSON or parses to null
- THEN the base SHALL return an error response with message
"Failed to parse command JSON"(or wrap the parser exception in"Internal error: …")
Scenario: Generic ApiCommandWorker¶
- WHEN a worker extends
ApiCommandWorker<T>and theapi_hostfield is present in the command JSON - THEN the base SHALL create or reconfigure the
ApiClientfor the parsed host/port (default port 5002) and make it available via theApiproperty duringHandleAsync
Scenario: Backend invocation via dynamic gateway¶
- WHEN a worker invokes a backend service via
dynamic gateway = Api.Gateway; await gateway.<service>.<method>(requestData); - THEN the
ServiceClientreturned by<service>SHALL resolve to the named service and<method>SHALL invoke the corresponding method via the API Gateway
Scenario: New enum value triggers compiler warning on the switch¶
- WHEN a new value is added to the command enum and the worker's
HandleAsyncswitch expression does not handle it - THEN the C# compiler SHALL emit a warning (CS8509 or equivalent) for the unhandled enum value
Requirement: In-Process DotNet DLL Engine¶
The WorkManager SHALL support an in-process DotNetDllEngine (default for MIME type application/x-dotnet-dll) that loads pre-compiled .NET worker DLLs directly into the AOT-compiled WorkManager process. The engine SHALL embed the .NET runtime (CoreCLR) into the host process on first use via the hostfxr C API and SHALL load the worker DLL into a per-worker, collectible AssemblyLoadContext. The engine SHALL be AOT-compatible: the engine code itself is compiled with <IsAotCompatible>true</IsAotCompatible> and SHALL NOT use reflection on its own types; the only reflection SHALL be on the JIT-loaded worker assembly.
Scenario: First LoadCodeAsync initializes CoreCLR¶
- WHEN the first
LoadCodeAsyncis called for MIME typeapplication/x-dotnet-dll - THEN the engine SHALL load
libhostfxrviaNativeLibrary.Loadand callhostfxr_initialize_for_runtime_configto initializeCoreCLRin the host process; the cost SHALL be paid once per WorkManager process
Scenario: Subsequent LoadCodeAsync in the same process¶
- WHEN a second
LoadCodeAsyncis called afterCoreCLRis already initialized - THEN the engine SHALL skip the
hostfxrinitialization and load the new worker DLL directly
Scenario: Per-worker AssemblyLoadContext¶
- WHEN a
LoadCodeAsyncis called with a worker nupkg - THEN the engine SHALL extract the worker DLL (and its sibling dependencies) from the nupkg's
lib/<tfm>/entries, create a new collectibleAssemblyLoadContext, and load the worker assembly into it
Scenario: DevKit resolution¶
- WHEN the JIT-loaded worker assembly references
Virtufin.Worker.DevKit - THEN the engine SHALL resolve the reference to the AOT-compiled
Virtufin.Worker.DevKitinstance already loaded in the WorkManager process (so the worker and the engine see the same type identity)
Scenario: Direct in-process dispatch¶
- WHEN
ProcessAsyncis called on the engine with a CloudEvent - THEN the engine SHALL call the worker's
IWorker.ProcessAsyncdirectly, with no socket, no JSON serialization, and no subprocess scheduling; the per-call latency SHALL be sub-microsecond
Scenario: Runtime discovery on target¶
- WHEN the WorkManager runs on a target that does not have a compatible .NET runtime installed
- THEN the first
LoadCodeAsyncSHALL fail with a clear error message that names the missing runtime and the discovery paths (DOTNET_ROOT, system install, side-by-sidehost/fxr/<version>/)
Scenario: Roll-forward to a newer runtime¶
- WHEN the WorkManager is built for .NET 10 and the target has a .NET 11+ runtime installed
- THEN the engine SHALL use the newer runtime; the generated
runtimeconfig.jsonSHALL declare"rollForward": "Major"
Requirement: Native DLL Engine¶
The WorkManager SHALL support a NativeDllEngine for executing workers
shipped as a per-architecture native shared library (.so / .dll /
.dylib). The engine SHALL run in-process inside the (NativeAOT-compiled)
WorkManager and SHALL NOT require a separate worker-host subprocess or a
JIT build stage. The engine SHALL be registered for MIME type
application/x-native-dll. Per-architecture entries SHALL live under
runtimes/<rid>/native/ inside the worker zip, matching the NuGet
convention for native packages.
Scenario: Native worker execution¶
- WHEN a worker with MIME type
application/x-native-dllprocesses a CloudEvent - THEN the engine SHALL
NativeLibrary.Loadthe matchingruntimes/<rid>/native/<file>entry from the worker zip, resolve theentry_pointandfree_resultexports declared inmanifest.json, marshal the input CloudEvent to a FlatBuffer, invoke the nativeProcessfunction, and deserialize the returned FlatBuffer into aWorkerResponse.
Scenario: Per-architecture entry selection¶
- WHEN a worker zip contains entries under
runtimes/<rid>/native/for at least one supported RID (linux-x64,linux-arm64) - THEN the engine SHALL select the entry whose
<rid>directory matchesRuntimeInformation.RuntimeIdentifierof the running process and SHALL load the correspondinglib<library>.so,<library>.dll, orlib<library>.dylibper the platform (see Native Worker Library Naming).
Scenario: Unsupported architecture¶
- WHEN the worker zip does not contain a
runtimes/<rid>/native/entry for the running architecture - THEN the engine SHALL throw
NotSupportedExceptionlisting the supported archs (linux-x64,linux-arm64).
Scenario: Missing manifest¶
- WHEN the worker zip does not contain
manifest.jsonat the root - THEN the engine SHALL refuse to load the worker and throw a clear error naming the missing file.
Scenario: ABI version mismatch¶
- WHEN the
manifest.jsondeclares anabi_versionthat is not in the engine's supported set - THEN the engine SHALL refuse to load the worker and throw a clear error naming the supported versions.
Requirement: Native Worker ABI¶
The NativeDllEngine SHALL define and enforce a stable C ABI between the
managed engine and the native worker. The ABI SHALL consist of: (a) a
FlatBuffers schema for the CloudEvent wire format; (b) two C function
exports Process and FreeResult; (c) a VirtufinHost struct carrying
host callbacks; (d) a manifest.json declaring the ABI version, the
library basename, and the export names. The engine SHALL vendor a
virtufin_worker_api.h header for distribution to worker authors.
Scenario: ABI struct layout¶
- WHEN a native worker is loaded
- THEN the engine SHALL pass a
VirtufinHost*whose first field is an opaqueengine_ptr(aGCHandleto the engine instance), whose second field isabi_version(currently1), and whose remaining fields are C function pointerslog,gateway_call, andfree_response.
Scenario: CloudEvent FlatBuffer input¶
- WHEN the engine invokes
Process(host, in_buf, in_len, &out_buf, &out_len) - THEN
in_bufSHALL point to a FlatBuffer-encodedCloudEventmatching the vendoredworker_api.fbsschema. Theextensions[correlationid]field SHALL carry the correlation id from the input CloudEvent when present.
Scenario: WorkerResponse FlatBuffer output¶
- WHEN
Processreturns0 - THEN
*out_bufSHALL point to a FlatBuffer-encodedWorkerResponsein which exactly one ofresult_eventorerror_messageis set. Ifresult_eventis set, the engine SHALL publish it on the reply topic derived from the input CloudEvent. Iferror_messageis set, the engine SHALL surface aworker.errorlifecycle event and publish an error response if the worker did not specify one.
Scenario: FreeResult pairing¶
- WHEN
Processreturns a non-null*out_buf - THEN the engine SHALL call
FreeResult(out_buf)exactly once, including on the error path. The worker SHALL use the same allocator for the*out_bufallocation thatFreeResultreleases.
Scenario: Logging callback¶
- WHEN the native worker calls
host->log(level, message) - THEN the engine SHALL map
level(0=trace,1=debug,2=info,3=warn,4=error) to the correspondingMicrosoft.Extensions.Logging.LogLeveland forwardmessageto the engine's structured logger. The lifetime ofmessageSHALL be limited to the call.
Scenario: Gateway call indirection¶
- WHEN the native worker calls
host->gateway_call(service, method, request, request_len, &response, &response_len) - THEN the engine SHALL deserialize
requestas JSON into aDictionary<string, object?>, invokeApiClient.InvokeAsync(service, method, dict), JSON-serialize the returned dictionary into*response(allocated withMarshal.AllocCoTaskMem), and return0on success or non-zero on internal failure. The worker SHALL free*responseviahost->free_response(response).
Scenario: Manifest format¶
- WHEN the engine parses the worker's
manifest.json - THEN the JSON object SHALL contain
abi_version(integer) andlibrary(non-empty string matching[A-Za-z0-9_.-]+), and MAY containentry_point(string, default"Process") andfree_result(string, default"FreeResult"). Additional fields SHALL be ignored by v1. A missing or emptylibraryfield SHALL cause the engine to refuse the worker with a clear error.
Requirement: Native Worker Identity and Lifecycle¶
The NativeDllEngine SHALL communicate worker identity to the native
worker via process-level environment variables set at LoadCodeAsync time,
and SHALL release the loaded library and all engine-owned resources on
worker stop.
Scenario: Worker identity env vars¶
- WHEN the engine loads a native worker
- THEN it SHALL set
VIRTUFIN_WORKER_IDto the worker GUID,VIRTUFIN_WORKER_GROUPto the worker group if non-null, andVIRTUFIN_WORKER_TOPICto the worker's input topic. The worker SHALL read these once at initialization.
Scenario: Correlation id from input¶
- WHEN the engine marshals the input CloudEvent for a
Processcall - THEN the FlatBuffer's
extensions[correlationid]attribute SHALL carry the input CloudEvent'scorrelationidextension attribute if present. Workers SHALL read it from the FlatBuffer, not from env vars.
Scenario: Concurrent Process calls¶
- WHEN multiple
Processinvocations are made against the same loaded library - THEN the engine SHALL allow them to proceed concurrently. The native worker SHALL be reentrant; any per-worker mutable state SHALL be protected by the worker's own synchronization primitives.
Scenario: Reload under load¶
- WHEN
LoadCodeAsyncis called while aProcessAsyncis in flight - THEN the engine SHALL block the reload until the in-flight call completes. The new code SHALL take effect for subsequent calls only.
Scenario: Cleanup on stop¶
- WHEN the engine is disposed or the worker is stopped
- THEN the engine SHALL call
NativeLibrary.Freeon the loaded library, free theGCHandlefor the engine pointer, delete the temp file holding the DLL, and restore the prior values ofVIRTUFIN_WORKER_ID,VIRTUFIN_WORKER_GROUP,VIRTUFIN_WORKER_TOPICif they were set.
Requirement: Native Worker Caveats¶
The NativeDllEngine SHALL document its isolation and timeout
characteristics in the engine README and the worker author guide.
Scenario: In-process crash isolation caveat¶
- WHEN a native worker raises an unrecoverable error (segmentation fault, illegal instruction, stack overflow from native code)
- THEN the WorkManager process SHALL terminate because the error
cannot be caught in a way that leaves the .NET runtime in a defined
state. Workers SHALL be authored under the assumption that a crash
terminates the entire WorkManager. This caveat is unique to the
NativeDllEngine; the Python and DotNetDll engines isolate their workers in subprocesses.
Scenario: Unmanaged timeout caveat¶
- WHEN the
MessageHandlingTimeoutSecondselapses while a nativeProcesscall is in flight - THEN the
CancellationTokenon the managed side SHALL fire and the engine SHALL surface aworker.errorlifecycle event, but the native call SHALL continue to run until it returns or the process crashes. Workers SHALL be authored to honor the timeout cooperatively (e.g. via periodic checks of a flag set from a watchdog thread, if at all) or SHALL be guaranteed to return within the timeout.
Scenario: ABI version policy¶
- WHEN the WorkManager ships a new major ABI version
- THEN the engine SHALL continue to load workers declaring the previous ABI version for at least one release cycle, and SHALL refuse workers declaring ABIs older than that.
Requirement: Native Worker Library Naming¶
The NativeDllEngine SHALL derive the per-platform shared library
filename from the manifest's library field by appending an OS-specific
prefix and suffix, matching the convention used by NuGet-distributed
native packages (runtimes/<rid>/native/). The engine SHALL pick the
filename based on RuntimeInformation.IsOSPlatform.
Scenario: Linux filename¶
- WHEN the running platform reports
OSPlatform.Linux - THEN the engine SHALL load
lib<library>.sofromruntimes/<rid>/native/.
Scenario: Windows filename¶
- WHEN the running platform reports
OSPlatform.Windows - THEN the engine SHALL load
<library>.dllfromruntimes/<rid>/native/.
Scenario: macOS filename¶
- WHEN the running platform reports
OSPlatform.OSX - THEN the engine SHALL load
lib<library>.dylibfromruntimes/<rid>/native/.
Scenario: Library name validation¶
- WHEN the manifest's
libraryfield is empty or contains characters outside[A-Za-z0-9_.-] - THEN the engine SHALL refuse the worker and throw a clear error naming the offending value.