TxQueue.ts
TxQueue.ts overview
Section titled “TxQueue.ts overview”Transactional queues whose state changes participate in Effect transactions.
A TxQueue<A, E> stores values of type A, exposes write-only TxEnqueue
and read-only TxDequeue handles, and can complete, fail, or shut down with
causes observed by consumers. Queue operations can retry transactionally when
they cannot proceed, such as taking from an empty open queue or offering to a
full bounded queue. This makes the queue useful for coordinating producers
and consumers alongside other transactional state changes.
Since v4.0.0
Exports Grouped by Category
Section titled “Exports Grouped by Category”combinators
Section titled “combinators”awaitCompletion
Section titled “awaitCompletion”Waits for the queue to complete (either successfully or with failure).
Example (Awaiting queue completion)
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function* () { const queue = yield* TxQueue.bounded<number, string>(10)
// In another fiber, end the queue yield* Effect.forkChild(Effect.delay(TxQueue.interrupt(queue), "100 millis"))
// Wait for completion - succeeds when queue ends yield* TxQueue.awaitCompletion(queue) console.log("Queue completed successfully")})Signature
declare const awaitCompletion: (self: TxQueueState) => Effect.Effect<void>Since v4.0.0
Removes and returns all currently buffered elements without changing the queue state.
Details
If the queue is already done with a Cause.Done error, returns an empty array. If the queue is done for any other cause, including interruption or failure, that cause is propagated.
Example (Clearing queues)
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function* () { const queue = yield* TxQueue.bounded<number>(10) yield* TxQueue.offerAll(queue, [1, 2, 3, 4, 5])
const sizeBefore = yield* TxQueue.size(queue) console.log(sizeBefore) // 5
const cleared = yield* TxQueue.clear(queue) console.log(cleared) // [1, 2, 3, 4, 5]
const sizeAfter = yield* TxQueue.size(queue) console.log(sizeAfter) // 0})Signature
declare const clear: <A, E>(self: TxEnqueue<A, E>) => Effect.Effect<Array<A>, ExcludeDone<E>>Since v4.0.0
Ends a queue by signaling completion with a Cause.Done error.
Details
This is a convenience wrapper around failCause for queues whose error channel can contain Cause.Done. If buffered items remain, the queue enters the closing state and those items may still be consumed before later take or peek operations fail with Cause.Done.
Example (Ending queues)
import { Cause, Effect, TxQueue } from "effect"
const program = Effect.gen(function* () { const queue = yield* TxQueue.bounded<number, Cause.Done>(10)
// Signal the end of the queue const result = yield* TxQueue.end(queue) console.log(result) // true
// All operations will now fail with Done const takeResult = yield* Effect.flip(TxQueue.take(queue)) console.log(Cause.isDone(takeResult)) // true
const peekResult = yield* Effect.flip(TxQueue.peek(queue)) console.log(Cause.isDone(peekResult)) // true})Signature
declare const end: <A, E>(self: TxEnqueue<A, E | Cause.Done>) => Effect.Effect<boolean>Since v4.0.0
Fails the queue with the specified error, discarding any buffered items.
Details
The queue transitions directly to done with Cause.fail(error). Returns false if the queue was already closing or done.
Example (Failing queues)
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function* () { const queue = yield* TxQueue.bounded<number, string>(10)
// Fail the queue with an error const result = yield* TxQueue.fail(queue, "connection lost") console.log(result) // true})Signature
declare const fail: { <E>(error: E): <A>(self: TxEnqueue<A, E>) => Effect.Effect<boolean> <A, E>(self: TxEnqueue<A, E>, error: E): Effect.Effect<boolean>}Since v4.0.0
failCause
Section titled “failCause”Completes the queue with the specified cause.
Details
If the queue is empty, it transitions directly to done. If it still contains items, it enters the closing state so buffered items can be drained before the cause is observed. Returns false if the queue was already closing or done.
Example (Failing queues with causes)
import { Cause, Effect, TxQueue } from "effect"
const program = Effect.gen(function* () { const queue = yield* TxQueue.bounded<number>(10)
// Complete with specific cause const cause = Cause.interrupt() const result = yield* TxQueue.failCause(queue, cause) console.log(result) // true})Signature
declare const failCause: { <E>(cause: Cause.Cause<E>): <A>(self: TxEnqueue<A, E>) => Effect.Effect<boolean> <A, E>(self: TxEnqueue<A, E>, cause: Cause.Cause<E>): Effect.Effect<boolean>}Since v4.0.0
interrupt
Section titled “interrupt”Interrupts the queue gracefully with the current fiber’s interruption cause.
Details
If the queue still contains items, it enters the closing state so buffered items can be drained before consumers observe the interruption. If it is empty, it transitions directly to done. Returns false if the queue was already closing or done.
Example (Interrupting queues)
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function* () { const queue = yield* TxQueue.bounded<number>(10) yield* TxQueue.offer(queue, 42)
// Interrupt gracefully - allows remaining items to be consumed const result = yield* TxQueue.interrupt(queue) console.log(result) // true})Signature
declare const interrupt: <A, E>(self: TxEnqueue<A, E>) => Effect.Effect<boolean>Since v4.0.0
isClosing
Section titled “isClosing”Checks whether the queue is in the closing state.
Example (Checking closing state)
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function* () { const queue = yield* TxQueue.bounded<number>(10) yield* TxQueue.offer(queue, 42)
const closing = yield* TxQueue.isClosing(queue) console.log(closing) // false
yield* TxQueue.interrupt(queue) const nowClosing = yield* TxQueue.isClosing(queue) console.log(nowClosing) // true})Signature
declare const isClosing: (self: TxQueueState) => Effect.Effect<boolean>Since v4.0.0
isDone
Section titled “isDone”Checks whether the queue is done (completed or failed).
Example (Checking done state)
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function* () { const queue = yield* TxQueue.bounded<number>(10)
const done = yield* TxQueue.isDone(queue) console.log(done) // false
yield* TxQueue.interrupt(queue) const nowDone = yield* TxQueue.isDone(queue) console.log(nowDone) // true})Signature
declare const isDone: (self: TxQueueState) => Effect.Effect<boolean>Since v4.0.0
isEmpty
Section titled “isEmpty”Checks whether the queue is empty.
Example (Checking whether a queue is empty)
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function* () { const queue = yield* TxQueue.bounded<number>(10)
const empty = yield* TxQueue.isEmpty(queue) console.log(empty) // true
yield* TxQueue.offer(queue, 42) const stillEmpty = yield* TxQueue.isEmpty(queue) console.log(stillEmpty) // false})Signature
declare const isEmpty: (self: TxQueueState) => Effect.Effect<boolean>Since v2.0.0
isFull
Section titled “isFull”Checks whether the queue is at capacity.
Example (Checking whether a queue is full)
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function* () { const queue = yield* TxQueue.bounded<number>(2)
const full = yield* TxQueue.isFull(queue) console.log(full) // false
yield* TxQueue.offerAll(queue, [1, 2]) const nowFull = yield* TxQueue.isFull(queue) console.log(nowFull) // true})Signature
declare const isFull: (self: TxQueueState) => Effect.Effect<boolean>Since v2.0.0
isOpen
Section titled “isOpen”Checks whether the queue is in the open state.
Example (Checking open state)
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function* () { const queue = yield* TxQueue.bounded<number>(10)
const open = yield* TxQueue.isOpen(queue) console.log(open) // true
yield* TxQueue.interrupt(queue) const stillOpen = yield* TxQueue.isOpen(queue) console.log(stillOpen) // false})Signature
declare const isOpen: (self: TxQueueState) => Effect.Effect<boolean>Since v4.0.0
isShutdown
Section titled “isShutdown”Checks whether the queue is shutdown (legacy compatibility).
Example (Checking shutdown state)
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function* () { const queue = yield* TxQueue.bounded<number>(10)
const isShutdown = yield* TxQueue.isShutdown(queue) console.log(isShutdown) // false
yield* TxQueue.shutdown(queue) const nowShutdown = yield* TxQueue.isShutdown(queue) console.log(nowShutdown) // true})Signature
declare const isShutdown: (self: TxQueueState) => Effect.Effect<boolean>Since v2.0.0
Offers an item to the queue and returns whether it was accepted.
Details
Open unbounded queues always accept; open bounded queues retry while full; dropping queues return false when full; sliding queues evict the oldest item when full. Closing or done queues return false. This function mutates the original TxQueue by adding the item according to the queue’s strategy. It does not return a new TxQueue reference.
Example (Offering a value)
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function* () { const queue = yield* TxQueue.bounded<number>(10)
// Offer an item - returns true if accepted const accepted = yield* TxQueue.offer(queue, 42) console.log(accepted) // true})Signature
declare const offer: { <A, E>(value: A): (self: TxEnqueue<A, E>) => Effect.Effect<boolean> <A, E>(self: TxEnqueue<A, E>, value: A): Effect.Effect<boolean>}Since v2.0.0
offerAll
Section titled “offerAll”Offers multiple items to the queue, returning the items that were not accepted.
Details
Each item follows offer semantics: bounded queues retry while full, dropping queues reject new items when full, sliding queues evict old items to accept new items, and closing or done queues reject all items. This function mutates the original TxQueue by adding items according to the queue’s strategy. It does not return a new TxQueue reference.
Example (Offering multiple values)
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function* () { const queue = yield* TxQueue.bounded<number>(10)
// Offer multiple items - returns rejected items as array const rejected = yield* TxQueue.offerAll(queue, [1, 2, 3, 4, 5]) console.log(rejected) // [] if all accepted console.log(rejected.length) // 0})Signature
declare const offerAll: { <A, E>(values: Iterable<A>): (self: TxEnqueue<A, E>) => Effect.Effect<Array<A>> <A, E>(self: TxEnqueue<A, E>, values: Iterable<A>): Effect.Effect<Array<A>>}Since v2.0.0
Waits transactionally for the next item and returns it without removing it.
Details
If the queue is open but empty, the transaction retries until an item is available or the queue completes. If the queue is done, the queue’s completion cause is propagated through the error channel.
Example (Peeking without removing values)
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function* () { const queue = yield* TxQueue.bounded<number, string>(10) yield* TxQueue.offer(queue, 42)
// Peek at the next item without removing it const item = yield* TxQueue.peek(queue) console.log(item) // 42
// Item is still in the queue const size = yield* TxQueue.size(queue) console.log(size) // 1})
// Error handling exampleconst errorExample = Effect.gen(function* () { const queue = yield* TxQueue.bounded<number, string>(5) yield* TxQueue.fail(queue, "queue failed")
// peek() propagates the queue error through E-channel const result = yield* Effect.flip(TxQueue.peek(queue)) console.log(result) // "queue failed"})Signature
declare const peek: <A, E>(self: TxDequeue<A, E>) => Effect.Effect<A, E>Since v2.0.0
Tries to take an item from the queue without blocking.
Example (Polling without blocking)
import { Effect, Option, TxQueue } from "effect"
const program = Effect.gen(function* () { const queue = yield* TxQueue.bounded<number>(10)
// Poll returns Option.none if empty const maybe = yield* TxQueue.poll(queue) console.log(Option.isNone(maybe)) // true
yield* TxQueue.offer(queue, 42) const item = yield* TxQueue.poll(queue) console.log(Option.getOrNull(item)) // 42})Signature
declare const poll: <A, E>(self: TxDequeue<A, E>) => Effect.Effect<Option.Option<A>>Since v2.0.0
shutdown
Section titled “shutdown”Shuts down the queue immediately by clearing all items and interrupting it (legacy compatibility).
Details
This operation clears all items from the queue using clear, then interrupts the queue using interrupt. This function mutates the original TxQueue by clearing its contents and marking it as shutdown. It does not return a new TxQueue reference.
Example (Shutting down queues)
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function* () { const queue = yield* TxQueue.bounded<number>(10) yield* TxQueue.offerAll(queue, [1, 2, 3, 4, 5])
const sizeBefore = yield* TxQueue.size(queue) console.log(sizeBefore) // 5
yield* TxQueue.shutdown(queue)
const sizeAfter = yield* TxQueue.size(queue) console.log(sizeAfter) // 0 (cleared)
const isShutdown = yield* TxQueue.isShutdown(queue) console.log(isShutdown) // true (interrupted)})Signature
declare const shutdown: <A, E>(self: TxEnqueue<A, E>) => Effect.Effect<boolean>Since v2.0.0
Gets the current size of the queue.
Example (Reading queue size)
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function* () { const queue = yield* TxQueue.bounded<number>(10) yield* TxQueue.offerAll(queue, [1, 2, 3])
const size = yield* TxQueue.size(queue) console.log(size) // 3})Signature
declare const size: (self: TxQueueState) => Effect.Effect<number>Since v2.0.0
Takes the next item from the queue, retrying the transaction while the queue is empty.
Details
If the queue is done, the effect fails with the queue’s completion cause. This function mutates the original TxQueue by removing the first item. It does not return a new TxQueue reference.
Example (Taking a value)
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function* () { const queue = yield* TxQueue.bounded<number, string>(10) yield* TxQueue.offer(queue, 42)
// Take an item - blocks if empty const item = yield* TxQueue.take(queue) console.log(item) // 42
// When queue fails, take fails with the same error yield* TxQueue.fail(queue, "queue error") const result = yield* Effect.flip(TxQueue.take(queue)) console.log(result) // "queue error"})Signature
declare const take: <A, E>(self: TxDequeue<A, E>) => Effect.Effect<A, E>Since v2.0.0
takeAll
Section titled “takeAll”Takes all items from the queue. Blocks if the queue is empty.
Details
If the queue is already in a failed state, the error is propagated through the E-channel. This follows the same patterns as take and waits when there are no elements. It returns a non-empty array because it blocks until at least one item is available. This function mutates the original TxQueue by removing all items. It does not return a new TxQueue reference.
Example (Taking all queued values)
import { Array, Effect, TxQueue } from "effect"
const program = Effect.gen(function* () { const queue = yield* TxQueue.bounded<number, string>(10) yield* TxQueue.offerAll(queue, [1, 2, 3, 4, 5])
// Take all items atomically - returns NonEmptyArray const items = yield* TxQueue.takeAll(queue) console.log(items) // [1, 2, 3, 4, 5] console.log(Array.isArrayNonEmpty(items)) // true})
// Error propagation exampleconst errorExample = Effect.gen(function* () { const queue = yield* TxQueue.bounded<number, string>(5) yield* TxQueue.offerAll(queue, [1, 2]) yield* TxQueue.fail(queue, "processing error")
// takeAll() propagates the queue error through E-channel const result = yield* Effect.flip(TxQueue.takeAll(queue)) console.log(result) // "processing error"})Signature
declare const takeAll: <A, E>(self: TxDequeue<A, E>) => Effect.Effect<Arr.NonEmptyArray<A>, E>Since v2.0.0
Takes up to n items from the queue in a single transaction.
Details
For an open queue, waits until min(n, capacity) items are available, then removes that many items. If n is less than or equal to zero, returns an empty array without modifying the queue. If the queue is closing, drains the currently available items and transitions to Done. If the queue is already done, the effect fails with the queue’s completion cause. This function mutates the original TxQueue by removing the taken items. It does not return a new TxQueue reference.
Example (Taking a fixed number of values)
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function* () { const queue = yield* TxQueue.bounded<number>(5) yield* TxQueue.offerAll(queue, [1, 2, 3, 4])
const items = yield* TxQueue.takeN(queue, 4) console.log(items) // [1, 2, 3, 4]
// This requests more than capacity (5), so takes all available (up to 5) yield* TxQueue.offerAll(queue, [5, 6, 7, 8, 9]) const all = yield* TxQueue.takeN(queue, 10) console.log(all) // [5, 6, 7, 8, 9]})Signature
declare const takeN: { (n: number): <A, E>(self: TxDequeue<A, E>) => Effect.Effect<Array<A>, E> <A, E>(self: TxDequeue<A, E>, n: number): Effect.Effect<Array<A>, E>}Since v2.0.0
constructors
Section titled “constructors”bounded
Section titled “bounded”Creates a new bounded TxQueue with the specified capacity.
Details
This function returns a new TxQueue reference with the specified capacity. No existing TxQueue instances are modified.
Example (Creating bounded queues)
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function* () { // Create a bounded queue (E defaults to never) const queue = yield* TxQueue.bounded<number>(10)
// Create a bounded queue with error channel const faultTolerantQueue = yield* TxQueue.bounded<number, string>(10)
// Offer items - will succeed until capacity is reached yield* TxQueue.offer(queue, 1) yield* TxQueue.offer(queue, 2)
const item = yield* TxQueue.take(queue) console.log(item) // 1})Signature
declare const bounded: <A = never, E = never>(capacity: number) => Effect.Effect<TxQueue<A, E>>Since v2.0.0
dropping
Section titled “dropping”Creates a new dropping TxQueue with the specified capacity that drops new items when full.
Details
This function returns a new TxQueue reference with dropping strategy. No existing TxQueue instances are modified.
Example (Creating dropping queues)
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function* () { // Create a dropping queue with capacity 2 const queue = yield* TxQueue.dropping<number>(2)
// Fill to capacity yield* TxQueue.offer(queue, 1) yield* TxQueue.offer(queue, 2)
// This will be dropped (returns false) const accepted = yield* TxQueue.offer(queue, 3) console.log(accepted) // false})Signature
declare const dropping: <A = never, E = never>(capacity: number) => Effect.Effect<TxQueue<A, E>>Since v2.0.0
sliding
Section titled “sliding”Creates a new sliding TxQueue with the specified capacity that evicts old items when full.
Details
This function returns a new TxQueue reference with sliding strategy. No existing TxQueue instances are modified.
Example (Creating sliding queues)
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function* () { // Create a sliding queue with capacity 2 const queue = yield* TxQueue.sliding<number>(2)
// Fill to capacity yield* TxQueue.offer(queue, 1) yield* TxQueue.offer(queue, 2)
// This will evict item 1 and add 3 yield* TxQueue.offer(queue, 3)
const item = yield* TxQueue.take(queue) console.log(item) // 2 (item 1 was evicted)})Signature
declare const sliding: <A = never, E = never>(capacity: number) => Effect.Effect<TxQueue<A, E>>Since v2.0.0
unbounded
Section titled “unbounded”Creates a new unbounded TxQueue with unlimited capacity.
Details
This function returns a new TxQueue reference with unlimited capacity. No existing TxQueue instances are modified.
Example (Creating unbounded queues)
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function* () { // Create an unbounded queue (E defaults to never) const queue = yield* TxQueue.unbounded<string>()
// Create an unbounded queue with error channel const faultTolerantQueue = yield* TxQueue.unbounded<string, Error>()
// Can offer unlimited items yield* TxQueue.offer(queue, "hello") yield* TxQueue.offer(queue, "world")
const size = yield* TxQueue.size(queue) console.log(size) // 2})Signature
declare const unbounded: <A = never, E = never>() => Effect.Effect<TxQueue<A, E>>Since v2.0.0
guards
Section titled “guards”isTxDequeue
Section titled “isTxDequeue”Checks whether the given value is a TxDequeue.
Example (Checking dequeue handles)
import { TxQueue } from "effect"
declare const someValue: unknown
if (TxQueue.isTxDequeue(someValue)) { // someValue is now typed as TxDequeue<unknown, unknown> console.log("This is a TxDequeue")}Signature
declare const isTxDequeue: <A = unknown, E = unknown>(u: unknown) => u is TxDequeue<A, E>Since v4.0.0
isTxEnqueue
Section titled “isTxEnqueue”Checks whether the given value is a TxEnqueue.
Example (Checking enqueue handles)
import { TxQueue } from "effect"
declare const someValue: unknown
if (TxQueue.isTxEnqueue(someValue)) { // someValue is now typed as TxEnqueue<unknown, unknown> console.log("This is a TxEnqueue")}Signature
declare const isTxEnqueue: <A = unknown, E = unknown>(u: unknown) => u is TxEnqueue<A, E>Since v4.0.0
isTxQueue
Section titled “isTxQueue”Checks whether the given value is a TxQueue.
Example (Checking queue handles)
import { TxQueue } from "effect"
declare const someValue: unknown
if (TxQueue.isTxQueue(someValue)) { // someValue is now typed as TxQueue<unknown, unknown> console.log("This is a TxQueue")}Signature
declare const isTxQueue: <A = unknown, E = unknown>(u: unknown) => u is TxQueue<A, E>Since v4.0.0
models
Section titled “models”State (type alias)
Section titled “State (type alias)”Represents the state of a transactional queue with sophisticated lifecycle management.
Details
The queue progresses through three states:
- Open: Accepting offers and serving takes normally
- Closing: No new offers accepted, serving remaining items until empty
- Done: Terminal state with completion cause, no further operations possible
Example (Inspecting queue lifecycle states)
import type { TxQueue } from "effect"
// State progression exampledeclare const state: TxQueue.State<string, Error>
if (state._tag === "Open") { console.log("Queue is accepting new items")} else if (state._tag === "Closing") { console.log("Queue is draining, cause:", state.cause)} else { console.log("Queue is done, cause:", state.cause)}Signature
type State<_A, E> = | { readonly _tag: "Open" } | { readonly _tag: "Closing" readonly cause: Cause.Cause<E> } | { readonly _tag: "Done" readonly cause: Cause.Cause<E> }Since v4.0.0
TxDequeue (interface)
Section titled “TxDequeue (interface)”A TxDequeue represents the read-only interface of a transactional queue, providing operations for consuming elements (dequeue operations) and inspecting queue state.
Example (Taking values through dequeue handles)
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function* () { // Queue without error channel const queue = yield* TxQueue.bounded<number>(10) yield* TxQueue.offer(queue, 42) const item = yield* TxQueue.take(queue) console.log(item) // 42
// Queue with error channel - errors propagate through E-channel const faultTolerantQueue = yield* TxQueue.bounded<number, string>(10) yield* TxQueue.fail(faultTolerantQueue, "processing failed")
// All dequeue operations now fail with the error directly const takeResult = yield* Effect.flip(TxQueue.take(faultTolerantQueue)) // "processing failed" const peekResult = yield* Effect.flip(TxQueue.peek(faultTolerantQueue)) // "processing failed"})Signature
export interface TxDequeue<out A, out E = never> extends TxQueueState { readonly [DequeueTypeId]: TxDequeue.Variance<A, E>}Since v4.0.0
TxEnqueue (interface)
Section titled “TxEnqueue (interface)”A TxEnqueue represents the write-only interface of a transactional queue, providing operations for adding elements (enqueue operations) and inspecting queue state.
Example (Offering values through enqueue handles)
import { Effect, TxQueue } from "effect"import type { Cause } from "effect"
const program = Effect.gen(function* () { // Queue without error channel const queue = yield* TxQueue.bounded<number>(10) const accepted = yield* TxQueue.offer(queue, 42)
// Queue with error channel for completion signaling const faultTolerantQueue = yield* TxQueue.bounded<number, string>(10) yield* TxQueue.offerAll(faultTolerantQueue, [1, 2, 3]) yield* TxQueue.fail(faultTolerantQueue, "processing complete")
// Works with Done for clean completion const completableQueue = yield* TxQueue.bounded<string, Cause.Done>(5) yield* TxQueue.offer(completableQueue, "task") yield* TxQueue.end(completableQueue)})Signature
export interface TxEnqueue<in A, in E = never> extends TxQueueState { readonly [EnqueueTypeId]: TxEnqueue.Variance<A, E>}Since v4.0.0
TxQueue (interface)
Section titled “TxQueue (interface)”A TxQueue represents a transactional queue data structure that provides both enqueue and dequeue operations with Software Transactional Memory (STM) semantics.
Example (Combining enqueue and dequeue operations)
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function* () { // Create a bounded transactional queue (E defaults to never) const queue = yield* TxQueue.bounded<number>(10)
// Single operations - automatically transactional const accepted = yield* TxQueue.offer(queue, 42) const item = yield* TxQueue.take(queue) // Effect<number, never> console.log(item) // 42
// Queue with error channel const faultTolerantQueue = yield* TxQueue.bounded<number, string>(10)
// Operations can handle queue-level failures yield* TxQueue.fail(faultTolerantQueue, "queue failed") const result = yield* Effect.flip(TxQueue.take(faultTolerantQueue)) console.log(result) // "queue failed"})Signature
export interface TxQueue<in out A, in out E = never> extends TxEnqueue<A, E>, TxDequeue<A, E> { readonly [TypeId]: TxQueue.Variance<A, E>}Since v4.0.0
TxQueueState (interface)
Section titled “TxQueueState (interface)”Represents the shared state of a transactional queue that can be inspected. This interface contains the core properties needed for queue state inspection operations like size, capacity, and completion status.
Signature
export interface TxQueueState extends Inspectable { readonly strategy: "bounded" | "unbounded" | "dropping" | "sliding" readonly capacity: number readonly items: TxChunk.TxChunk<any> readonly stateRef: TxRef.TxRef<State<any, any>>}Since v4.0.0
taking
Section titled “taking”takeBetween
Section titled “takeBetween”Takes between min and max currently available items, waiting for min on
an open queue.
Details
If the queue is closing, drains the currently available items even when fewer than min are available and transitions to Done. Invalid ranges (min <= 0, max <= 0, or min > max) return an empty array. If the queue is already done, the effect fails with the queue’s completion cause.
Example (Taking batches within bounds)
import { Effect, TxQueue } from "effect"
const program = Effect.gen(function* () { const queue = yield* TxQueue.bounded<number>(10) yield* TxQueue.offerAll(queue, [1, 2, 3, 4, 5, 6, 7, 8])
// Take between 2 and 5 items const batch1 = yield* TxQueue.takeBetween(queue, 2, 5) console.log(batch1) // [1, 2, 3, 4, 5] - took 5 (up to max)
// Take between 1 and 10 items (but only 3 remain) const batch2 = yield* TxQueue.takeBetween(queue, 1, 10) console.log(batch2) // [6, 7, 8] - took 3 (all remaining)
// Would wait for at least 1 item to be available // const batch3 = yield* TxQueue.takeBetween(queue, 1, 3)})Signature
declare const takeBetween: { (min: number, max: number): <A, E>(self: TxDequeue<A, E>) => Effect.Effect<Array<A>, E> <A, E>(self: TxDequeue<A, E>, min: number, max: number): Effect.Effect<Array<A>, E>}Since v2.0.0
TxDequeue (namespace)
Section titled “TxDequeue (namespace)”Namespace containing type definitions for TxDequeue variance annotations.
Since v4.0.0
Variance (interface)
Section titled “Variance (interface)”Variance annotation interface for TxDequeue covariance.
Signature
export interface Variance<out A, out E> { readonly _A: Types.Covariant<A> readonly _E: Types.Covariant<E>}Since v4.0.0
TxEnqueue (namespace)
Section titled “TxEnqueue (namespace)”Namespace containing type definitions for TxEnqueue variance annotations.
Since v4.0.0
Variance (interface)
Section titled “Variance (interface)”Variance annotation interface for TxEnqueue contravariance.
Signature
export interface Variance<in A, in E> { readonly _A: Types.Contravariant<A> readonly _E: Types.Contravariant<E>}Since v4.0.0
TxQueue (namespace)
Section titled “TxQueue (namespace)”Namespace containing type definitions for TxQueue variance annotations.
Since v4.0.0
Variance (interface)
Section titled “Variance (interface)”Variance annotation interface for TxQueue invariance.
Signature
export interface Variance<in out A, in out E> { readonly _A: Types.Invariant<A> readonly _E: Types.Invariant<E>}Since v4.0.0