PersistedQueue.ts
PersistedQueue.ts overview
Section titled “PersistedQueue.ts overview”Stores schema-encoded queue work in persistent storage.
A PersistedQueue<A> keeps JSON-encoded values in a named queue and lets
workers take one value at a time inside a scoped processing window. It is
useful for durable handoffs, background jobs, outbox-style integrations, and
work that should retry across fibers, process restarts, or multiple workers.
This module includes a queue factory, store service, id-based de-duplication,
retry handling, and in-memory, Redis, and SQL-backed store layers.
Since v4.0.0
Exports Grouped by Category
Section titled “Exports Grouped by Category”accessors
Section titled “accessors”Accesses PersistedQueueFactory to create a named persisted queue for a
schema.
Signature
declare const make: <S extends Schema.Constraint>(options: { readonly name: string readonly schema: S}) => Effect.Effect< PersistedQueue<S["Type"], S["EncodingServices"] | S["DecodingServices"]>, never, PersistedQueueFactory>Since v4.0.0
constructors
Section titled “constructors”makeFactory
Section titled “makeFactory”Creates a PersistedQueueFactory from the current PersistedQueueStore.
Details
Values are encoded and decoded with the supplied schema, automatically
assigned an id when needed, and acknowledged or retried according to the
take handler’s exit.
Signature
declare const makeFactory: Effect.Effect< { readonly make: <S extends Schema.Constraint>(options: { readonly name: string readonly schema: S }) => Effect.Effect<PersistedQueue<S["Type"], S["EncodingServices"] | S["DecodingServices"]>> }, never, PersistedQueueStore>Since v4.0.0
errors
Section titled “errors”PersistedQueueError (class)
Section titled “PersistedQueueError (class)”Error raised by persisted queue store operations.
Signature
declare class PersistedQueueErrorSince v4.0.0
[ErrorTypeId] (property)
Section titled “[ErrorTypeId] (property)”Marks this value as a persisted queue error for runtime guards.
Signature
readonly [ErrorTypeId]: "~@effect/experimental/PersistedQueue/PersistedQueueError"Since v4.0.0
layers
Section titled “layers”Provides PersistedQueueFactory using the current PersistedQueueStore.
Signature
declare const layer: Layer.Layer<PersistedQueueFactory, never, PersistedQueueStore>Since v4.0.0
models
Section titled “models”PersistedQueue (interface)
Section titled “PersistedQueue (interface)”Persistent queue of schema-encoded values.
Details
offer enqueues values by id, and take processes one value at a time,
marking it complete on success or retrying it until the maximum attempts is
reached.
Signature
export interface PersistedQueue<in out A, out R = never> { readonly [TypeId]: TypeId
/** * Adds an element to the queue and returns the id of the enqueued element. * * **Details** * * If an element with the same id already exists in the queue, it will not be * added again. */ readonly offer: ( value: A, options?: { readonly id: string | undefined } ) => Effect.Effect<string, PersistedQueueError | Schema.SchemaError, R>
/** * Takes an element from the queue, waiting until one is available when the * queue is empty. * * **Details** * * If the returned effect succeeds, the element is marked as processed; * otherwise it will be retried according to the provided options. By default, * max attempts is set to 10. */ readonly take: <XA, XE, XR>( f: ( value: A, metadata: { readonly id: string readonly attempts: number } ) => Effect.Effect<XA, XE, XR>, options?: { readonly maxAttempts?: number | undefined } ) => Effect.Effect<XA, XE | PersistedQueueError | Schema.SchemaError, R | XR>}Since v4.0.0
services
Section titled “services”PersistedQueueFactory (class)
Section titled “PersistedQueueFactory (class)”Service for constructing named PersistedQueue instances from schemas.
Signature
declare class PersistedQueueFactorySince v4.0.0
PersistedQueueStore (class)
Section titled “PersistedQueueStore (class)”Defines the low-level backing store service used by PersistedQueue.
When to use
Use to provide the persistence backend that stores queued elements, scoped takes, retry attempts, and acknowledgements.
Details
The store persists offered elements and returns taken elements in a scope so the finalizer can complete or retry them based on the processing exit.
Signature
declare class PersistedQueueStoreSince v4.0.0
layerStoreMemory
Section titled “layerStoreMemory”Provides an in-memory PersistedQueueStore.
Details
The store is process-local and volatile; failed takes are requeued until the configured maximum attempts is reached.
Signature
declare const layerStoreMemory: Layer.Layer<PersistedQueueStore, never, never>Since v4.0.0
layerStoreRedis
Section titled “layerStoreRedis”Provides a Redis-backed PersistedQueueStore using makeStoreRedis.
Signature
declare const layerStoreRedis: ( options?: | { readonly prefix?: string | undefined readonly pollInterval?: Duration.Input | undefined readonly lockRefreshInterval?: Duration.Input | undefined readonly lockExpiration?: Duration.Input | undefined } | undefined) => Layer.Layer<PersistedQueueStore, never, Redis.Redis>Since v4.0.0
layerStoreSql
Section titled “layerStoreSql”Provides a SQL-backed PersistedQueueStore using makeStoreSql.
Signature
declare const layerStoreSql: ( options?: | { readonly tableName?: string | undefined readonly pollInterval?: Duration.Input | undefined readonly lockRefreshInterval?: Duration.Input | undefined readonly lockExpiration?: Duration.Input | undefined } | undefined) => Layer.Layer<PersistedQueueStore, SqlError, SqlClient.SqlClient>Since v4.0.0
makeStoreRedis
Section titled “makeStoreRedis”Creates a Redis-backed PersistedQueueStore.
Details
The store uses Redis lists and hashes with worker locks, periodically refreshes locks while items are being processed, and moves exhausted items to a failed queue.
Signature
declare const makeStoreRedis: ( options?: | { readonly prefix?: string | undefined readonly pollInterval?: Duration.Input | undefined readonly lockRefreshInterval?: Duration.Input | undefined readonly lockExpiration?: Duration.Input | undefined } | undefined) => Effect.Effect< { readonly offer: (options: { readonly name: string readonly id: string readonly element: unknown readonly isCustomId: boolean }) => Effect.Effect<void, PersistedQueueError> readonly take: (options: { readonly name: string readonly maxAttempts: number }) => Effect.Effect< { readonly id: string; readonly attempts: number; readonly element: unknown }, PersistedQueueError, Scope.Scope > }, never, Scope.Scope | Redis.Redis>Since v4.0.0
makeStoreSql
Section titled “makeStoreSql”Creates a SQL-backed PersistedQueueStore.
Details
The store creates the queue table and indexes, acquires rows with per-worker locks, refreshes active locks while scoped takes are running, and retries or completes rows according to the processing exit.
Signature
declare const makeStoreSql: ( options?: | { readonly tableName?: string | undefined readonly pollInterval?: Duration.Input | undefined readonly lockRefreshInterval?: Duration.Input | undefined readonly lockExpiration?: Duration.Input | undefined } | undefined) => Effect.Effect<PersistedQueueStore["Service"], SqlError, SqlClient.SqlClient | Scope.Scope>Since v4.0.0
type IDs
Section titled “type IDs”ErrorTypeId
Section titled “ErrorTypeId”Runtime type identifier for PersistedQueueError.
Signature
declare const ErrorTypeId: "~@effect/experimental/PersistedQueue/PersistedQueueError"Since v4.0.0
ErrorTypeId (type alias)
Section titled “ErrorTypeId (type alias)”Type-level identifier used to brand PersistedQueueError values.
Signature
type ErrorTypeId = "~@effect/experimental/PersistedQueue/PersistedQueueError"Since v4.0.0
TypeId
Section titled “TypeId”Runtime type identifier for PersistedQueue values.
Signature
declare const TypeId: "~effect/persistence/PersistedQueue"Since v4.0.0
TypeId (type alias)
Section titled “TypeId (type alias)”Type-level identifier used to brand PersistedQueue values.
Signature
type TypeId = "~effect/persistence/PersistedQueue"Since v4.0.0