Skip to content

Workflow.ts

Defines typed durable workflows.

A Workflow has a stable tag, schemas for payload, success, and failure, and an idempotency key used to derive execution ids. Workflow definitions can be executed, discarded, polled, interrupted, resumed, and registered with a handler layer. This module also includes workflow result types, compensation and cleanup helpers, suspension support, and settings for defect capture or failure suspension.

Since v4.0.0



Adds compensation logic to an effect inside a Workflow.

When to use

Use when a top-level workflow step needs compensating cleanup if the overall workflow later fails after the step succeeds.

Details

The compensation finalizer is called if the entire workflow fails, allowing you to perform cleanup or other actions based on the success value and the cause of the workflow failure.

Gotchas

Compensation finalizers are only registered for top-level effects in the workflow and do not work for nested activities.

Signature

declare const withCompensation: {
<A, R2>(
compensation: (value: A, cause: Cause.Cause<unknown>) => Effect.Effect<void, never, R2>
): <E, R>(effect: Effect.Effect<A, E, R>) => Effect.Effect<A, E, R | R2 | WorkflowInstance | Scope.Scope>
<A, E, R, R2>(
effect: Effect.Effect<A, E, R>,
compensation: (value: A, cause: Cause.Cause<unknown>) => Effect.Effect<void, never, R2>
): Effect.Effect<A, E, R | R2 | WorkflowInstance | Scope.Scope>
}

Source

Since v4.0.0

Captures defects for a workflow and includes them in the result of the workflow or its activities.

Details

By default, this annotation is set to true, meaning defects are captured.

Signature

declare const CaptureDefects: Context.Reference<boolean>

Source

Since v4.0.0

Marks a workflow to suspend when it encounters any error.

Details

The suspended execution can later be resumed with the workflow’s resume method, for example MyWorkflow.resume(executionId).

Signature

declare const SuspendOnFailure: Context.Reference<boolean>

Source

Since v4.0.0

Creates a durable workflow definition with schemas, annotations, and deterministic execution IDs derived from the workflow tag and idempotency key.

Signature

declare const make: <
const Tag extends string,
Payload extends Schema.Struct.Fields | AnyStructSchema,
Success extends Schema.Top = Schema.Void,
Error extends Schema.Top = Schema.Never
>(
tag: Tag,
options: {
readonly payload: Payload
readonly idempotencyKey: (
payload: Payload extends Schema.Struct.Fields ? Schema.Struct.Type<Payload> : Payload["Type"]
) => string
readonly success?: Success
readonly error?: Error
readonly suspendedRetrySchedule?: Schedule.Schedule<any, unknown> | undefined
readonly annotations?: Context.Context<never>
}
) => Workflow<Tag, Payload extends Schema.Struct.Fields ? Schema.Struct<Payload> : Payload, Success, Error>

Source

Since v4.0.0

Type-erased workflow shape for APIs that operate on workflows without preserving their specific payload, success, or error types.

Signature

export interface Any {
new (_: never): {}
readonly [TypeId]: typeof TypeId
readonly _tag: string
readonly executionId: (payload: any) => Effect.Effect<string>
readonly payloadSchema: AnyStructSchema
readonly successSchema: Schema.Top
readonly errorSchema: Schema.Top
readonly annotations: Context.Context<never>
readonly idempotencyKey: (payload: any) => string
readonly suspendedRetrySchedule?: Schedule.Schedule<any, unknown> | undefined
}

Source

Since v4.0.0

Type-erased workflow shape that also exposes executable operations needed by workflow proxy and engine helpers.

Signature

export interface AnyWithProps extends Any {
readonly payloadSchema: AnyStructSchema
readonly successSchema: Schema.Top
readonly errorSchema: Schema.Top
readonly execute: (payload: any, options?: { readonly discard?: boolean }) => Effect.Effect<any, any, any>
readonly resume: (executionId: string) => Effect.Effect<void, never, WorkflowEngine>
}

Source

Since v4.0.0

Type-level marker for services associated with a specific workflow execution tag.

Signature

export interface Execution<Tag extends string> {
readonly _: unique symbol
readonly _tag: Tag
}

Source

Since v4.0.0

Extracts the payload schema from a Workflow.

Signature

type PayloadSchema<W> = W extends Workflow<infer _Name, infer _Payload, infer _Success, infer _Error> ? _Payload : never

Source

Since v4.0.0

Computes the schema services required by clients that execute or poll workflows.

Signature

type RequirementsClient<Workflows> =
Workflows extends Workflow<infer _Name, infer _Payload, infer _Success, infer _Error>
? _Payload["EncodingServices"] | _Success["DecodingServices"] | _Error["DecodingServices"]
: never

Source

Since v4.0.0

Computes the schema services required by handlers that decode workflow payloads and encode workflow results.

Signature

type RequirementsHandler<Workflows> =
Workflows extends Workflow<infer _Name, infer _Payload, infer _Success, infer _Error>
?
| _Payload["DecodingServices"]
| _Payload["EncodingServices"]
| _Success["DecodingServices"]
| _Success["EncodingServices"]
| _Error["DecodingServices"]
| _Error["EncodingServices"]
: never

Source

Since v4.0.0

Durable workflow definition with typed payload, success, and error schemas plus operations for execution, polling, interruption, resumption, and registration.

Signature

export interface Workflow<
Tag extends string,
Payload extends AnyStructSchema,
Success extends Schema.Top,
Error extends Schema.Top
> {
new (_: never): {}
readonly [TypeId]: typeof TypeId
readonly _tag: Tag
readonly payloadSchema: Payload
readonly successSchema: Success
readonly errorSchema: Error
readonly annotations: Context.Context<never>
readonly idempotencyKey: (payload: Payload["Type"]) => string
readonly suspendedRetrySchedule?: Schedule.Schedule<any, unknown> | undefined
/**
* Add an annotation to the workflow.
*/
annotate<I, S>(key: Context.Key<I, S>, value: S): Workflow<Tag, Payload, Success, Error>
/**
* Merge multiple annotations into the workflow.
*/
annotateMerge<I>(annotations: Context.Context<I>): Workflow<Tag, Payload, Success, Error>
/**
* Execute the workflow with the given payload.
*/
readonly execute: <const Discard extends boolean = false>(
payload: Payload["~type.make.in"],
options?: {
readonly discard?: Discard
}
) => Effect.Effect<
Discard extends true ? string : Success["Type"],
Discard extends true ? never : Error["Type"],
WorkflowEngine | Payload["EncodingServices"] | Success["DecodingServices"] | Error["DecodingServices"]
>
/**
* Poll the current status of a workflow execution.
*/
readonly poll: (
executionId: string
) => Effect.Effect<
Option.Option<Result<Success["Type"], Error["Type"]>>,
never,
WorkflowEngine | Success["DecodingServices"] | Error["DecodingServices"]
>
/**
* Interrupt a workflow execution for the given execution ID.
*/
readonly interrupt: (executionId: string) => Effect.Effect<void, never, WorkflowEngine>
/**
* Manually resume a workflow execution for the given execution ID.
*/
readonly resume: (executionId: string) => Effect.Effect<void, never, WorkflowEngine>
/**
* Create a layer that registers the workflow and provides an effect to
* execute it.
*/
readonly toLayer: <R>(
execute: (payload: Payload["Type"], executionId: string) => Effect.Effect<Success["Type"], Error["Type"], R>
) => Layer.Layer<
never,
never,
| WorkflowEngine
| Exclude<R, WorkflowEngine | WorkflowInstance | Execution<Tag> | Scope.Scope>
| Payload["DecodingServices"]
| Payload["EncodingServices"]
| Success["DecodingServices"]
| Success["EncodingServices"]
| Error["DecodingServices"]
| Error["EncodingServices"]
>
/**
* For the given payload, compute the deterministic execution ID.
*/
readonly executionId: (payload: Payload["~type.make.in"]) => Effect.Effect<string>
/**
* Add compensation logic to an effect inside a Workflow.
*
* **Details**
*
* The compensation finalizer is called if the entire workflow fails, allowing you to perform cleanup or other actions based on the success value and the cause of the workflow failure.
*
* **Gotchas**
*
* Compensation finalizers are only registered for top-level effects in the workflow and do not work for nested activities.
*/
readonly withCompensation: {
<A, R2>(
compensation: (value: A, cause: Cause.Cause<Error["Type"]>) => Effect.Effect<void, never, R2>
): <E, R>(
effect: Effect.Effect<A, E, R>
) => Effect.Effect<A, E, R | R2 | WorkflowInstance | Execution<Tag> | Scope.Scope>
<A, E, R, R2>(
effect: Effect.Effect<A, E, R>,
compensation: (value: A, cause: Cause.Cause<Error["Type"]>) => Effect.Effect<void, never, R2>
): Effect.Effect<A, E, R | R2 | WorkflowInstance | Execution<Tag> | Scope.Scope>
}
}

Source

Since v4.0.0

Adds an exit finalizer to the current workflow scope, preserving the services available when the finalizer is registered.

Signature

declare const addFinalizer: <R>(
f: (exit: Exit.Exit<unknown, unknown>) => Effect.Effect<void, never, R>
) => Effect.Effect<void, never, WorkflowInstance | R>

Source

Since v4.0.0

Provides the workflow scope to the given effect, and closes the scope only when the workflow execution fully completes.

Signature

declare const provideScope: <A, E, R>(
effect: Effect.Effect<A, E, R>
) => Effect.Effect<A, E, Exclude<R, Scope.Scope> | WorkflowInstance>

Source

Since v4.0.0

Accesses the workflow scope, which is only closed when the workflow execution fully completes.

Signature

declare const scope: Effect.Effect<Scope.Scope, never, WorkflowInstance>

Source

Since v4.0.0

Represents a completed workflow execution with its success or failure Exit.

Signature

declare class Complete<A, E>

Source

Since v4.0.0

Builds the schema for completed workflow results from success and error schemas.

Signature

declare const Schema: <Success extends Schema.Constraint, Error extends Schema.Constraint>(options: {
readonly success: Success
readonly error: Error
}) => CompleteSchema<Success, Error>

Source

Since v4.0.0

Marks this value as a workflow result for runtime guards.

Signature

readonly [ResultTypeId]: "~effect/workflow/Workflow/Result"

Source

Since v4.0.0

Encoded representation of a completed workflow result containing an encoded Exit.

Signature

export interface CompleteEncoded<A, E> {
readonly _tag: "Complete"
readonly exit: ExitEncoded<A, E>
}

Source

Since v4.0.0

Creates a schema for workflow results using the supplied success and error schemas.

Signature

declare const Result: <Success extends Schema.Constraint, Error extends Schema.Constraint>(options: {
readonly success: Success
readonly error: Error
}) => Schema.Union<readonly [CompleteSchema<Success, Error>, typeof Suspended]>

Source

Since v4.0.0

Result of a workflow execution, either a completed exit or a suspended workflow state.

Signature

type Result<A, E> = Complete<A, E> | Suspended

Source

Since v4.0.0

Schema for encoded workflow results with generic success and error payloads.

Signature

declare const ResultEncoded: Schema.Codec<ResultEncoded<any, any>, ResultEncoded<any, any>, never, never>

Source

Since v4.0.0

Encoded representation of a workflow Result.

Signature

type ResultEncoded<A, E> = CompleteEncoded<A, E> | typeof Suspended.Encoded

Source

Since v4.0.0

Represents a suspended workflow execution, optionally carrying the cause that triggered suspension.

Signature

declare class Suspended

Source

Since v4.0.0

Marks this value as a workflow result for runtime guards.

Signature

readonly [ResultTypeId]: "~effect/workflow/Workflow/Result"

Source

Since v4.0.0

Runs an effect as a workflow execution and converts its outcome into a Result, handling suspension, defect capture, interruption, and workflow scope finalization.

Signature

declare const intoResult: <A, E, R>(
effect: Effect.Effect<A, E, R>
) => Effect.Effect<Result<A, E>, never, Exclude<R, Scope.Scope> | WorkflowInstance>

Source

Since v4.0.0

Returns true when a value is a workflow Result.

Signature

declare const isResult: <A = unknown, E = unknown>(u: unknown) => u is Result<A, E>

Source

Since v4.0.0

Marks a workflow instance as suspended and interrupts the current fiber to stop execution until it is resumed.

Signature

declare const suspend: (instance: WorkflowInstance["Service"]) => Effect.Effect<never>

Source

Since v4.0.0

Wraps an activity-like effect so workflow suspension waits for currently running activities to finish or suspend.

Signature

declare const wrapActivityResult: <A, E, R>(
effect: Effect.Effect<A, E, R>,
isSuspend: (value: A) => boolean
) => Effect.Effect<A, E, R | WorkflowInstance>

Source

Since v4.0.0

Schema constraint for workflow payload schemas that expose struct fields.

Signature

export interface AnyStructSchema extends Schema.Top {
readonly fields: Schema.Struct.Fields
}

Source

Since v4.0.0

Schema constructor for Complete workflow results using the supplied success and error schemas.

Signature

export interface CompleteSchema<
Success extends Schema.Constraint,
Error extends Schema.Constraint
> extends Schema.declareConstructor<
Complete<Success["Type"], Error["Type"]>,
Complete<Success["Encoded"], Error["Encoded"]>,
readonly [Schema.Exit<Success, Error, Schema.Defect>]
> {
readonly success: Success
readonly error: Error
}

Source

Since v4.0.0