Skip to content

DurableQueue.ts

Durable workflow queues delegate work to persisted background workers and resume the waiting workflow with the worker result.

A workflow calls process to encode a payload, offer it to a named PersistedQueue, attach a DurableDeferred token, and suspend. A worker created with makeWorker or worker takes the item, runs the handler, and records the handler’s Exit through that token so the original workflow can continue with the typed success or error.

Since v4.0.0



Adds an item to the queue and wait for a worker to process it.

Signature

declare const process: <Payload extends Schema.Top, Success extends Schema.Top, Error extends Schema.Top>(
self: DurableQueue<Payload, Success, Error>,
payload: Payload["~type.make.in"],
options?: { readonly retrySchedule?: Schedule.Schedule<any, PersistedQueue.PersistedQueueError> | undefined }
) => Effect.Effect<
Success["Type"],
Error["Type"],
| WorkflowEngine
| WorkflowInstance
| PersistedQueue.PersistedQueueFactory
| Payload["EncodingServices"]
| Payload["DecodingServices"]
| Success["DecodingServices"]
| Error["DecodingServices"]
>

Source

Since v4.0.0

Create a worker effect that processes items from the durable queue.

Signature

declare const makeWorker: <Payload extends Schema.Top, Success extends Schema.Top, Error extends Schema.Top, R>(
self: DurableQueue<Payload, Success, Error>,
f: (payload: Payload["Type"]) => Effect.Effect<Success["Type"], Error["Type"], R>,
options?: { readonly concurrency?: number | undefined } | undefined
) => Effect.Effect<
never,
never,
| WorkflowEngine
| PersistedQueue.PersistedQueueFactory
| R
| Payload["EncodingServices"]
| Payload["DecodingServices"]
| Success["EncodingServices"]
| Error["EncodingServices"]
>

Source

Since v4.0.0

Create a layer that runs workers for the durable queue.

Signature

declare const worker: <Payload extends Schema.Top, Success extends Schema.Top, Error extends Schema.Top, R>(
self: DurableQueue<Payload, Success, Error>,
f: (payload: Payload["Type"]) => Effect.Effect<Success["Type"], Error["Type"], R>,
options?: { readonly concurrency?: number | undefined } | undefined
) => Layer.Layer<
never,
never,
| WorkflowEngine
| PersistedQueue.PersistedQueueFactory
| R
| Payload["EncodingServices"]
| Payload["DecodingServices"]
| Success["EncodingServices"]
| Error["EncodingServices"]
>

Source

Since v4.0.0

Creates a DurableQueue that waits for persisted items to finish processing using a DurableDeferred.

Example (Defining a durable queue with workers)

import { Effect, Schema } from "effect"
import { DurableQueue, Workflow } from "effect/unstable/workflow"
// Define a DurableQueue that can be used to derive workers and offer items for
// processing.
const ApiQueue = DurableQueue.make({
name: "ApiQueue",
payload: {
id: Schema.String
},
success: Schema.Void,
error: Schema.Never,
idempotencyKey(payload) {
return payload.id
}
})
const MyWorkflow = Workflow.make("MyWorkflow", {
payload: {
id: Schema.String
},
idempotencyKey: ({ id }) => id
})
const MyWorkflowLayer = MyWorkflow.toLayer(
Effect.fnUntraced(function* () {
// Add an item to the DurableQueue defined above.
//
// When the worker has finished processing the item, the workflow will
// resume.
//
yield* DurableQueue.process(ApiQueue, { id: "api-call-1" })
yield* Effect.log("Workflow succeeded!")
})
)
// Define a worker layer that can process items from the DurableQueue.
const ApiWorker = DurableQueue.worker(
ApiQueue,
Effect.fnUntraced(function* ({ id }) {
yield* Effect.log(`Worker processing API call with id: ${id}`)
}),
{ concurrency: 5 } // Process up to 5 items concurrently
)

Signature

declare const make: <
Payload extends Schema.Top | Schema.Struct.Fields,
Success extends Schema.Top = Schema.Void,
Error extends Schema.Top = Schema.Never
>(options: {
readonly name: string
readonly payload: Payload
readonly idempotencyKey: (
payload: Payload extends Schema.Struct.Fields ? Schema.Struct.Type<Payload> : Payload["Type"]
) => string
readonly success?: Success | undefined
readonly error?: Error | undefined
}) => DurableQueue<Payload extends Schema.Struct.Fields ? Schema.Struct<Payload> : Payload, Success, Error>

Source

Since v4.0.0

Durable workflow queue definition containing a payload schema, idempotency key, and deferred used to await worker results.

Signature

export interface DurableQueue<
Payload extends Schema.Top,
Success extends Schema.Top = Schema.Void,
Error extends Schema.Top = Schema.Never
> {
readonly [TypeId]: TypeId
readonly name: string
readonly payloadSchema: Payload
readonly idempotencyKey: (payload: Payload["Type"]) => string
readonly deferred: DurableDeferred.DurableDeferred<Success, Error>
}

Source

Since v4.0.0

Runtime identifier attached to DurableQueue values.

Signature

declare const TypeId: "~effect/workflow/DurableQueue"

Source

Since v4.0.0

Type-level identifier used to recognize DurableQueue values.

Signature

type TypeId = "~effect/workflow/DurableQueue"

Source

Since v4.0.0