Skip to content

PersistedQueue.ts

Stores schema-encoded queue work in persistent storage.

A PersistedQueue<A> keeps JSON-encoded values in a named queue and lets workers take one value at a time inside a scoped processing window. It is useful for durable handoffs, background jobs, outbox-style integrations, and work that should retry across fibers, process restarts, or multiple workers. This module includes a queue factory, store service, id-based de-duplication, retry handling, and in-memory, Redis, and SQL-backed store layers.

Since v4.0.0



Accesses PersistedQueueFactory to create a named persisted queue for a schema.

Signature

declare const make: <S extends Schema.Constraint>(options: {
readonly name: string
readonly schema: S
}) => Effect.Effect<
PersistedQueue<S["Type"], S["EncodingServices"] | S["DecodingServices"]>,
never,
PersistedQueueFactory
>

Source

Since v4.0.0

Creates a PersistedQueueFactory from the current PersistedQueueStore.

Details

Values are encoded and decoded with the supplied schema, automatically assigned an id when needed, and acknowledged or retried according to the take handler’s exit.

Signature

declare const makeFactory: Effect.Effect<
{
readonly make: <S extends Schema.Constraint>(options: {
readonly name: string
readonly schema: S
}) => Effect.Effect<PersistedQueue<S["Type"], S["EncodingServices"] | S["DecodingServices"]>>
},
never,
PersistedQueueStore
>

Source

Since v4.0.0

Error raised by persisted queue store operations.

Signature

declare class PersistedQueueError

Source

Since v4.0.0

Marks this value as a persisted queue error for runtime guards.

Signature

readonly [ErrorTypeId]: "~@effect/experimental/PersistedQueue/PersistedQueueError"

Source

Since v4.0.0

Provides PersistedQueueFactory using the current PersistedQueueStore.

Signature

declare const layer: Layer.Layer<PersistedQueueFactory, never, PersistedQueueStore>

Source

Since v4.0.0

Persistent queue of schema-encoded values.

Details

offer enqueues values by id, and take processes one value at a time, marking it complete on success or retrying it until the maximum attempts is reached.

Signature

export interface PersistedQueue<in out A, out R = never> {
readonly [TypeId]: TypeId
/**
* Adds an element to the queue and returns the id of the enqueued element.
*
* **Details**
*
* If an element with the same id already exists in the queue, it will not be
* added again.
*/
readonly offer: (
value: A,
options?: {
readonly id: string | undefined
}
) => Effect.Effect<string, PersistedQueueError | Schema.SchemaError, R>
/**
* Takes an element from the queue, waiting until one is available when the
* queue is empty.
*
* **Details**
*
* If the returned effect succeeds, the element is marked as processed;
* otherwise it will be retried according to the provided options. By default,
* max attempts is set to 10.
*/
readonly take: <XA, XE, XR>(
f: (
value: A,
metadata: {
readonly id: string
readonly attempts: number
}
) => Effect.Effect<XA, XE, XR>,
options?: {
readonly maxAttempts?: number | undefined
}
) => Effect.Effect<XA, XE | PersistedQueueError | Schema.SchemaError, R | XR>
}

Source

Since v4.0.0

Service for constructing named PersistedQueue instances from schemas.

Signature

declare class PersistedQueueFactory

Source

Since v4.0.0

Defines the low-level backing store service used by PersistedQueue.

When to use

Use to provide the persistence backend that stores queued elements, scoped takes, retry attempts, and acknowledgements.

Details

The store persists offered elements and returns taken elements in a scope so the finalizer can complete or retry them based on the processing exit.

Signature

declare class PersistedQueueStore

Source

Since v4.0.0

Provides an in-memory PersistedQueueStore.

Details

The store is process-local and volatile; failed takes are requeued until the configured maximum attempts is reached.

Signature

declare const layerStoreMemory: Layer.Layer<PersistedQueueStore, never, never>

Source

Since v4.0.0

Provides a Redis-backed PersistedQueueStore using makeStoreRedis.

Signature

declare const layerStoreRedis: (
options?:
| {
readonly prefix?: string | undefined
readonly pollInterval?: Duration.Input | undefined
readonly lockRefreshInterval?: Duration.Input | undefined
readonly lockExpiration?: Duration.Input | undefined
}
| undefined
) => Layer.Layer<PersistedQueueStore, never, Redis.Redis>

Source

Since v4.0.0

Provides a SQL-backed PersistedQueueStore using makeStoreSql.

Signature

declare const layerStoreSql: (
options?:
| {
readonly tableName?: string | undefined
readonly pollInterval?: Duration.Input | undefined
readonly lockRefreshInterval?: Duration.Input | undefined
readonly lockExpiration?: Duration.Input | undefined
}
| undefined
) => Layer.Layer<PersistedQueueStore, SqlError, SqlClient.SqlClient>

Source

Since v4.0.0

Creates a Redis-backed PersistedQueueStore.

Details

The store uses Redis lists and hashes with worker locks, periodically refreshes locks while items are being processed, and moves exhausted items to a failed queue.

Signature

declare const makeStoreRedis: (
options?:
| {
readonly prefix?: string | undefined
readonly pollInterval?: Duration.Input | undefined
readonly lockRefreshInterval?: Duration.Input | undefined
readonly lockExpiration?: Duration.Input | undefined
}
| undefined
) => Effect.Effect<
{
readonly offer: (options: {
readonly name: string
readonly id: string
readonly element: unknown
readonly isCustomId: boolean
}) => Effect.Effect<void, PersistedQueueError>
readonly take: (options: {
readonly name: string
readonly maxAttempts: number
}) => Effect.Effect<
{ readonly id: string; readonly attempts: number; readonly element: unknown },
PersistedQueueError,
Scope.Scope
>
},
never,
Scope.Scope | Redis.Redis
>

Source

Since v4.0.0

Creates a SQL-backed PersistedQueueStore.

Details

The store creates the queue table and indexes, acquires rows with per-worker locks, refreshes active locks while scoped takes are running, and retries or completes rows according to the processing exit.

Signature

declare const makeStoreSql: (
options?:
| {
readonly tableName?: string | undefined
readonly pollInterval?: Duration.Input | undefined
readonly lockRefreshInterval?: Duration.Input | undefined
readonly lockExpiration?: Duration.Input | undefined
}
| undefined
) => Effect.Effect<PersistedQueueStore["Service"], SqlError, SqlClient.SqlClient | Scope.Scope>

Source

Since v4.0.0

Runtime type identifier for PersistedQueueError.

Signature

declare const ErrorTypeId: "~@effect/experimental/PersistedQueue/PersistedQueueError"

Source

Since v4.0.0

Type-level identifier used to brand PersistedQueueError values.

Signature

type ErrorTypeId = "~@effect/experimental/PersistedQueue/PersistedQueueError"

Source

Since v4.0.0

Runtime type identifier for PersistedQueue values.

Signature

declare const TypeId: "~effect/persistence/PersistedQueue"

Source

Since v4.0.0

Type-level identifier used to brand PersistedQueue values.

Signature

type TypeId = "~effect/persistence/PersistedQueue"

Source

Since v4.0.0