Queue.ts
Queue.ts overview
Section titled “Queue.ts overview”Passes values asynchronously between fibers.
A Queue<A, E> accepts values, hands each value to one consumer in offer
order, and can complete, fail, interrupt, or shut down. Queues can be bounded
or unbounded, and bounded queues can suspend, drop, or slide values when
producers are faster than consumers.
Since v3.8.0
Exports Grouped by Category
Section titled “Exports Grouped by Category”Offering
Section titled “Offering”Adds a message to the queue. Returns false if the queue is done.
Details
For bounded queues, this operation may suspend if the queue is at capacity, depending on the backpressure strategy. For dropping/sliding queues, it may return false or succeed immediately by dropping/sliding existing messages.
Example (Offering a value)
import { Effect, Queue } from "effect"
const program = Effect.gen(function* () { const queue = yield* Queue.bounded<number>(3)
// Successfully add messages to queue const success1 = yield* Queue.offer(queue, 1) const success2 = yield* Queue.offer(queue, 2) console.log(success1, success2) // true, true
// Queue state const size = yield* Queue.size(queue) console.log(size) // 2})Signature
declare const offer: <A, E>(self: Enqueue<A, E>, message: Types.NoInfer<A>) => Effect<boolean>Since v2.0.0
offerAll
Section titled “offerAll”Adds multiple messages to the queue. Returns the remaining messages that were not added.
When to use
Use when producers can submit a batch at once and need to know which messages did not fit under the queue’s capacity strategy.
Details
For bounded queues, this operation may suspend if the queue doesn’t have enough capacity. The operation returns an array of messages that couldn’t be added (empty array means all messages were successfully added).
Example (Offering multiple values)
import { Effect, Queue } from "effect"
const program = Effect.gen(function* () { const queue = yield* Queue.dropping<number>(3)
// Try to add more messages than capacity without suspending const remaining1 = yield* Queue.offerAll(queue, [1, 2, 3, 4, 5]) console.log(remaining1) // [4, 5] - couldn't fit the last 2})Signature
declare const offerAll: <A, E>(self: Enqueue<A, E>, messages: Iterable<A>) => Effect<Array<A>>Since v2.0.0
offerAllUnsafe
Section titled “offerAllUnsafe”Adds multiple messages to the queue synchronously. Returns the remaining messages that were not added.
When to use
Use when queue internals or a performance boundary need a synchronous batch offer and can handle any messages that do not fit.
Gotchas
This is an unsafe operation that directly modifies the queue without Effect wrapping.
Example (Offering multiple values synchronously)
import { Cause, Effect, Queue } from "effect"
// Create a bounded queue and use unsafe APIconst program = Effect.gen(function* () { const queue = yield* Queue.bounded<number>(3)
// Try to add 5 messages to capacity-3 queue using unsafe API const remaining = Queue.offerAllUnsafe(queue, [1, 2, 3, 4, 5]) console.log(remaining) // [4, 5] - couldn't fit the last 2
// Check what's in the queue const size = Queue.sizeUnsafe(queue) console.log(size) // 3})Signature
declare const offerAllUnsafe: <A, E>(self: Enqueue<A, E>, messages: Iterable<A>) => Array<A>Since v4.0.0
offerUnsafe
Section titled “offerUnsafe”Adds a message to the queue synchronously. Returns false if the queue is done.
When to use
Use when you are already in synchronous queue internals or a performance
boundary where wrapping the mutation in Effect is intentionally avoided.
Gotchas
This is an unsafe operation that directly modifies the queue without Effect wrapping. Use this only when you’re certain about the synchronous nature of the operation.
Example (Offering a value synchronously)
import { Cause, Effect, Queue } from "effect"
// Create a queue effect and extract the queue for unsafe operationsconst program = Effect.gen(function* () { const queue = yield* Queue.bounded<number>(3)
// Add messages synchronously using unsafe API const success1 = Queue.offerUnsafe(queue, 1) const success2 = Queue.offerUnsafe(queue, 2) console.log(success1, success2) // true, true
// Check current size const size = Queue.sizeUnsafe(queue) console.log(size) // 2})Signature
declare const offerUnsafe: <A, E>(self: Enqueue<A, E>, message: Types.NoInfer<A>) => booleanSince v4.0.0
completion
Section titled “completion”Waits until a queue reaches the Done state.
When to use
Use to suspend a fiber until no further values can be taken from the queue and its terminal outcome is known.
Details
The effect succeeds with void for normal Done completion. Other
terminal causes are preserved, so failures and interruptions complete this
effect with the same terminal outcome.
Gotchas
A queue can be closing before it is done. await resumes at Done, not at
the first completion signal, so buffered messages may need to be drained
first.
See
endfor signaling normal completion while preserving buffered messages for consumersfailfor signaling an error while preserving buffered messages for consumersinterruptfor graceful interruption after buffered messages are drainedshutdownfor immediately discarding buffered messages and resuming pending operations
Signature
declare const await: <A, E>(self: Dequeue<A, E>) => Effect<void, Exclude<E, Done>>Since v4.0.0
Signals queue completion.
When to use
Use to stop accepting new offers while allowing already queued messages to be consumed.
Details
Returns false if the queue is already done.
Example (Ending queues)
import { Cause, Effect, Queue } from "effect"
const program = Effect.gen(function* () { const queue = yield* Queue.bounded<number, Cause.Done>(10)
// Add some messages yield* Queue.offer(queue, 1) yield* Queue.offer(queue, 2)
// Signal completion - no more messages will be accepted const ended = yield* Queue.end(queue) console.log(ended) // true
// Trying to offer more messages will return false const offerResult = yield* Queue.offer(queue, 3) console.log(offerResult) // false
// But we can still take existing messages const message = yield* Queue.take(queue) console.log(message) // 1})Signature
declare const end: <A, E>(self: Enqueue<A, E | Done>) => Effect<boolean>Since v4.0.0
endUnsafe
Section titled “endUnsafe”Signals queue completion synchronously.
When to use
Use when implementing low-level queue integrations that must complete a queue
without wrapping the operation in Effect.
Details
Returns false if the queue is already done.
Gotchas
This is an unsafe operation that directly modifies the queue without Effect wrapping.
Example (Ending queues synchronously)
import { Cause, Effect, Queue } from "effect"
// Create a queue and use unsafe operationsconst program = Effect.gen(function* () { const queue = yield* Queue.bounded<number, Cause.Done>(10)
// Add some messages Queue.offerUnsafe(queue, 1) Queue.offerUnsafe(queue, 2)
// End the queue synchronously const ended = Queue.endUnsafe(queue) console.log(ended) // true
// Existing messages can still be consumed while the queue is closing console.log(queue.state._tag) // "Closing"
Queue.takeUnsafe(queue) Queue.takeUnsafe(queue)
// After buffered messages are consumed, the queue is done console.log(queue.state._tag) // "Done"})Signature
declare const endUnsafe: <A, E>(self: Enqueue<A, E | Done>) => booleanSince v4.0.0
Fails the queue with an error. If the queue is already done, false is
returned.
Example (Failing queues with an error)
import { Effect, Queue } from "effect"
const program = Effect.gen(function* () { const queue = yield* Queue.bounded<number, string>(10)
// Fail the queue with an error const failed = yield* Queue.fail(queue, "Something went wrong") console.log(failed) // true
// Taking from the failed queue fails with the error const error = yield* Effect.flip(Queue.take(queue)) console.log(error) // "Something went wrong"})Signature
declare const fail: <A, E>(self: Enqueue<A, E>, error: E) => Effect<boolean, never, never>Since v4.0.0
failCause
Section titled “failCause”Fails the queue with a cause. If the queue is already done, false is
returned.
Example (Failing queues with a cause)
import { Cause, Effect, Queue } from "effect"
const program = Effect.gen(function* () { const queue = yield* Queue.bounded<number, string>(10)
// Create a cause and fail the queue const cause = Cause.fail("Queue processing failed") const failed = yield* Queue.failCause(queue, cause) console.log(failed) // true
// The queue is now done with the specified failure cause console.log(queue.state._tag) // "Done"})Signature
declare const failCause: { <E>(cause: Cause<E>): <A>(self: Enqueue<A, E>) => Effect<boolean> <A, E>(self: Enqueue<A, E>, cause: Cause<E>): Effect<boolean>}Since v4.0.0
failCauseUnsafe
Section titled “failCauseUnsafe”Fails the queue with a cause synchronously. If the queue is already done, false is
returned.
When to use
Use when queue completion must be driven from synchronous internals while
preserving the full failure Cause.
Gotchas
This is an unsafe operation that directly modifies the queue without Effect wrapping.
Example (Failing queues with a cause synchronously)
import { Cause, Effect, Queue } from "effect"
const program = Effect.gen(function* () { const queue = yield* Queue.bounded<number, string>(10)
// Create a cause and fail the queue synchronously const cause = Cause.fail("Processing error") const failed = Queue.failCauseUnsafe(queue, cause) console.log(failed) // true
// The queue is now done with the specified failure cause console.log(queue.state._tag) // "Done"})Signature
declare const failCauseUnsafe: <A, E>(self: Enqueue<A, E>, cause: Cause<E>) => booleanSince v4.0.0
interrupt
Section titled “interrupt”Interrupts the queue gracefully, transitioning it to a closing state.
Details
This operation stops accepting new offers but allows existing messages to be consumed. Once all messages are drained, the queue transitions to the Done state with an interrupt cause.
Example (Interrupting queues gracefully)
import { Cause, Effect, Queue } from "effect"
const program = Effect.gen(function* () { const queue = yield* Queue.bounded<number>(10)
// Add some messages yield* Queue.offer(queue, 1) yield* Queue.offer(queue, 2)
// Interrupt gracefully - no more offers accepted, but messages can be consumed const interrupted = yield* Queue.interrupt(queue) console.log(interrupted) // true
// Trying to offer more messages will return false const offerResult = yield* Queue.offer(queue, 3) console.log(offerResult) // false
// But we can still take existing messages const message1 = yield* Queue.take(queue) console.log(message1) // 1
const message2 = yield* Queue.take(queue) console.log(message2) // 2
// After all messages are consumed, queue is done const isDone = queue.state._tag === "Done" console.log(isDone) // true})Signature
declare const interrupt: <A, E>(self: Enqueue<A, E>) => Effect<boolean>Since v4.0.0
Runs an Effect into a Queue, where success ends the queue and failure
fails the queue.
Example (Running effects into queues)
import { Cause, Effect, Queue } from "effect"
const program = Effect.gen(function* () { const queue = yield* Queue.bounded<number, Cause.Done>(10)
// Create an effect that succeeds const dataProcessing = Effect.gen(function* () { yield* Effect.sleep("100 millis") return "Processing completed successfully" })
// Pipe the effect into the queue // If dataProcessing succeeds, queue ends successfully // If dataProcessing fails, queue fails with the error const effectIntoQueue = Queue.into(queue)(dataProcessing)
const wasCompleted = yield* effectIntoQueue console.log("Queue operation completed:", wasCompleted) // true
// Queue state now reflects the effect's outcome console.log("Queue state:", queue.state._tag) // "Done"})Signature
declare const into: { <A, E>(self: Enqueue<A, E | Done>): <AX, EX extends E, RX>(effect: Effect<AX, EX, RX>) => Effect<boolean, never, RX> <AX, E, EX extends E, RX, A>(effect: Effect<AX, EX, RX>, self: Enqueue<A, E | Done>): Effect<boolean, never, RX>}Since v4.0.0
shutdown
Section titled “shutdown”Shuts down the queue immediately, discarding buffered messages and resuming pending operations.
Details
The operation is idempotent and returns true, including when the queue has
already been shut down or completed.
Example (Shutting down queues)
import { Effect, Queue } from "effect"
const program = Effect.gen(function* () { const queue = yield* Queue.bounded<number>(2)
// Add messages yield* Queue.offer(queue, 1) yield* Queue.offer(queue, 2)
// Shutdown clears buffered messages and prevents further offers const wasShutdown = yield* Queue.shutdown(queue) console.log(wasShutdown) // true
// Queue is now done and cleared const size = yield* Queue.size(queue) console.log(size) // 0})Signature
declare const shutdown: <A, E>(self: Enqueue<A, E>) => Effect<boolean>Since v2.0.0
constructors
Section titled “constructors”bounded
Section titled “bounded”Creates a bounded queue with the specified capacity that uses backpressure strategy.
Details
When the queue reaches capacity, producers will be suspended until space becomes available. This ensures all messages are processed but may slow down producers.
Example (Creating bounded queues)
import { Effect, Queue } from "effect"
const program = Effect.gen(function* () { const queue = yield* Queue.bounded<string>(5)
// This will succeed as queue has capacity yield* Queue.offer(queue, "first") yield* Queue.offer(queue, "second")
const size = yield* Queue.size(queue) console.log(size) // 2})Signature
declare const bounded: <A, E = never>(capacity: number) => Effect<Queue<A, E>>Since v2.0.0
dropping
Section titled “dropping”Creates a bounded queue with dropping strategy. When the queue reaches capacity, new elements are dropped and the offer operation returns false.
When to use
Use when you need producer offers not to block while preserving existing queued messages, even if new messages may be dropped when the queue is full.
Example (Creating dropping queues)
import { Effect, Queue } from "effect"
const program = Effect.gen(function* () { const queue = yield* Queue.dropping<number>(2)
// Fill the queue to capacity const success1 = yield* Queue.offer(queue, 1) const success2 = yield* Queue.offer(queue, 2) console.log(success1, success2) // true, true
// This will be dropped const success3 = yield* Queue.offer(queue, 3) console.log(success3) // false
const all = yield* Queue.takeAll(queue) console.log(all) // [1, 2] - element 3 was dropped})Signature
declare const dropping: <A, E = never>(capacity: number) => Effect<Queue<A, E>>Since v2.0.0
Creates a Queue with optional capacity and overflow strategy.
Details
By default the queue is unbounded and uses the "suspend" strategy. Provide
capacity for a bounded queue and choose "suspend", "dropping", or
"sliding" to control what happens when the queue is full. The returned
queue can be offered to, taken from, failed, ended, interrupted, or shut down.
Example (Creating queues)
import { Cause, Effect, Queue } from "effect"
Effect.gen(function* () { const queue = yield* Queue.make<number, string | Cause.Done>()
// add messages to the queue yield* Queue.offer(queue, 1) yield* Queue.offer(queue, 2) yield* Queue.offerAll(queue, [3, 4, 5])
// take messages from the queue const messages = yield* Queue.takeAll(queue) console.log(messages) // [1, 2, 3, 4, 5]
// signal that the queue is done yield* Queue.end(queue) const done = yield* Effect.flip(Queue.take(queue)) console.log(Cause.isDone(done)) // true
// signal that another queue has failed const failedQueue = yield* Queue.make<number, string>() const failed = yield* Queue.fail(failedQueue, "boom") console.log(failed) // true})Signature
declare const make: <A, E = never>( options?: | { readonly capacity?: number | undefined; readonly strategy?: "suspend" | "dropping" | "sliding" | undefined } | undefined) => Effect<Queue<A, E>>Since v4.0.0
sliding
Section titled “sliding”Creates a bounded queue with sliding strategy. When the queue reaches capacity, new elements are added and the oldest elements are dropped.
When to use
Use when you need producer offers not to block and can accept dropping the oldest messages, such as when maintaining a rolling window of recent values.
Example (Creating sliding queues)
import { Effect, Queue } from "effect"
const program = Effect.gen(function* () { const queue = yield* Queue.sliding<number>(3)
// Fill the queue to capacity yield* Queue.offer(queue, 1) yield* Queue.offer(queue, 2) yield* Queue.offer(queue, 3)
// This will succeed, dropping the oldest element (1) yield* Queue.offer(queue, 4)
const all = yield* Queue.takeAll(queue) console.log(all) // [2, 3, 4] - oldest element (1) was dropped})Signature
declare const sliding: <A, E = never>(capacity: number) => Effect<Queue<A, E>>Since v2.0.0
unbounded
Section titled “unbounded”Creates an unbounded queue that can grow to any size without blocking producers.
When to use
Use when you need producers to add messages without backpressure and accept unbounded memory growth.
Example (Creating unbounded queues)
import { Effect, Queue } from "effect"
const program = Effect.gen(function* () { const queue = yield* Queue.unbounded<string>()
// Producers can always add messages without blocking yield* Queue.offer(queue, "message1") yield* Queue.offer(queue, "message2") yield* Queue.offerAll(queue, ["message3", "message4", "message5"])
// Check current size const size = yield* Queue.size(queue) console.log(size) // 5
// Take all messages const messages = yield* Queue.takeAll(queue) console.log(messages) // ["message1", "message2", "message3", "message4", "message5"]})Signature
declare const unbounded: <A, E = never>() => Effect<Queue<A, E>>Since v2.0.0
converting
Section titled “converting”asDequeue
Section titled “asDequeue”Narrows a Queue to a Dequeue, exposing the consumer side of the queue.
When to use
Use to pass a queue to code that should consume values while keeping producer-side operations out of that code’s TypeScript type.
Gotchas
This is a type-level narrowing operation. It returns the same queue object and does not create a runtime wrapper.
See
asEnqueuefor narrowing a queue to its producer sideDequeuefor the consumer-side queue handle returned by this function
Signature
declare const asDequeue: <A, E>(self: Queue<A, E>) => Dequeue<A, E>Since v4.0.0
asEnqueue
Section titled “asEnqueue”Converts a Queue to its write-only Enqueue interface.
When to use
Use to expose only the producer side of a Queue to code that should offer
values or signal queue lifecycle.
Gotchas
This is a type-level capability restriction. It returns the same queue object, so it does not hide read operations at runtime.
See
asDequeuefor exposing only the read side of aQueueEnqueuefor the write-only queue handle returned by this conversion
Signature
declare const asEnqueue: <A, E>(self: Queue<A, E>) => Enqueue<A, E>Since v4.0.0
guards
Section titled “guards”isDequeue
Section titled “isDequeue”Type guard to check if a value is a Dequeue.
When to use
Use to narrow an unknown value before passing it to read-side queue operations.
See
Dequeuefor the read-side queue handle checked by this guardisQueuefor checking for a full read-write queue handleisEnqueuefor checking for the write side of a queueasDequeuefor narrowing an existingQueueto its read-only interface
Signature
declare const isDequeue: <A = unknown, E = unknown>(u: unknown) => u is Dequeue<A, E>Since v2.0.0
isEnqueue
Section titled “isEnqueue”Type guard to check if a value is an Enqueue.
When to use
Use to narrow an unknown value before calling queue operations that require write-side access.
Gotchas
A full Queue also satisfies this guard because every queue includes the
enqueue side.
See
isQueuefor checking for a full read-write queue handleisDequeuefor checking for the read side of a queueasEnqueuefor narrowing an existingQueueto its write-only interface
Signature
declare const isEnqueue: <A = unknown, E = unknown>(u: unknown) => u is Enqueue<A, E>Since v2.0.0
isQueue
Section titled “isQueue”Type guard to check if a value is a Queue.
When to use
Use to narrow an unknown value to a full Queue before passing it to APIs
that need both offering and taking capabilities.
See
isEnqueuefor checking values that only need write accessisDequeuefor checking values that only need read access
Signature
declare const isQueue: <A = unknown, E = unknown>(u: unknown) => u is Queue<A, E>Since v2.0.0
models
Section titled “models”Dequeue (interface)
Section titled “Dequeue (interface)”A Dequeue is a queue that can be taken from.
Details
This interface represents the read-only part of a Queue, allowing you to take elements from the queue but not offer elements to it.
Example (Taking through dequeue handles)
import { Effect, Queue } from "effect"
const program = Effect.gen(function* () { const queue = yield* Queue.bounded<string, never>(10)
// A Dequeue can only take elements const dequeue: Queue.Dequeue<string> = queue
// Pre-populate the queue yield* Queue.offerAll(queue, ["a", "b", "c"])
// Take elements using dequeue interface const item = yield* Queue.take(dequeue) console.log(item) // "a"})Signature
export interface Dequeue<out A, out E = never> extends Inspectable { readonly [DequeueTypeId]: Dequeue.Variance<A, E> readonly strategy: "suspend" | "dropping" | "sliding" readonly dispatcher: SchedulerDispatcher capacity: number messages: MutableList.MutableList<any> state: Queue.State<any, any> scheduleRunning: boolean}Since v2.0.0
Enqueue (interface)
Section titled “Enqueue (interface)”An Enqueue is a queue that can be offered to.
Details
This interface represents the write-only part of a Queue, allowing you to offer elements to the queue but not take elements from it.
Example (Offering through enqueue handles)
import { Effect, Queue } from "effect"
// Function that only needs write access to a queueconst producer = (enqueue: Queue.Enqueue<string>) => Effect.gen(function* () { yield* Queue.offer(enqueue, "hello") yield* Queue.offerAll(enqueue, ["world", "!"]) })
const program = Effect.gen(function* () { const queue = yield* Queue.bounded<string>(10) yield* producer(queue)})Signature
export interface Enqueue<in A, in E = never> extends Inspectable { readonly [EnqueueTypeId]: Enqueue.Variance<A, E> readonly strategy: "suspend" | "dropping" | "sliding" readonly dispatcher: SchedulerDispatcher capacity: number messages: MutableList.MutableList<any> state: Queue.State<any, any> scheduleRunning: boolean}Since v2.0.0
Queue (interface)
Section titled “Queue (interface)”A Queue is an asynchronous queue that can be offered to and taken from.
Details
It also supports signaling that it is done or failed.
Example (Offering and taking queue values)
import { Effect, Queue } from "effect"
const program = Effect.gen(function* () { // Create a bounded queue const queue = yield* Queue.bounded<string>(10)
// Producer: offer items to the queue yield* Queue.offer(queue, "hello") yield* Queue.offerAll(queue, ["world", "!"])
// Consumer: take items from the queue const item1 = yield* Queue.take(queue) const item2 = yield* Queue.take(queue) const item3 = yield* Queue.take(queue)
console.log([item1, item2, item3]) // ["hello", "world", "!"]})Signature
export interface Queue<in out A, in out E = never> extends Enqueue<A, E>, Dequeue<A, E> { readonly [TypeId]: Queue.Variance<A, E>}Since v2.0.0
isFull
Section titled “isFull”Checks whether the queue is full.
Example (Checking if queues are full)
import { Cause, Effect, Option, Queue } from "effect"
const program = Effect.gen(function* () { const queue = yield* Queue.bounded<number, Cause.Done>(3)
console.log(yield* Queue.isFull(queue)) // false
// Add some messages yield* Queue.offerAll(queue, [1, 2, 3])
console.log(yield* Queue.isFull(queue)) // true})Signature
declare const isFull: <A, E>(self: Dequeue<A, E>) => Effect<boolean>Since v2.0.0
isFullUnsafe
Section titled “isFullUnsafe”Checks whether the queue is full synchronously.
When to use
Use when an immediate Queue capacity snapshot is needed outside effectful
code and racing queue changes are acceptable.
Example (Checking fullness synchronously)
import { Cause, Effect, Option, Queue } from "effect"
const program = Effect.gen(function* () { const queue = yield* Queue.bounded<number, Cause.Done>(3)
console.log(Queue.isFullUnsafe(queue)) // false
// Add some messages yield* Queue.offerAll(queue, [1, 2, 3])
console.log(Queue.isFullUnsafe(queue)) // true})Signature
declare const isFullUnsafe: <A, E>(self: Dequeue<A, E>) => booleanSince v4.0.0
Returns the current number of buffered messages in the queue.
Details
Completed queues report a size of 0.
Example (Checking queue size)
import { Cause, Effect, Option, Queue } from "effect"
const program = Effect.gen(function* () { const queue = yield* Queue.bounded<number, Cause.Done>(10)
// Check size of empty queue const size1 = yield* Queue.size(queue) console.log(size1) // 0
// Add some messages yield* Queue.offerAll(queue, [1, 2, 3, 4, 5])
// Check size after adding messages const size2 = yield* Queue.size(queue) console.log(size2) // 5
// End the queue yield* Queue.end(queue)
// Size of ended queue is 0 const size3 = yield* Queue.size(queue) console.log(size3) // 0})Signature
declare const size: <A, E>(self: Dequeue<A, E>) => Effect<number>Since v2.0.0
sizeUnsafe
Section titled “sizeUnsafe”Returns the current number of buffered messages in the queue synchronously.
When to use
Use when you need an immediate Queue size snapshot for diagnostics or
internals and do not need the read wrapped in Effect.
Details
Completed queues report a size of 0. This unsafe operation reads the queue
state directly without Effect wrapping.
Example (Checking queue size synchronously)
import { Cause, Effect, Option, Queue } from "effect"
const program = Effect.gen(function* () { const queue = yield* Queue.bounded<number, Cause.Done>(10)
// Check size of empty queue const size1 = Queue.sizeUnsafe(queue) console.log(size1) // 0
// Add some messages Queue.offerUnsafe(queue, 1) Queue.offerUnsafe(queue, 2) Queue.offerUnsafe(queue, 3)
// Check size after adding messages const size2 = Queue.sizeUnsafe(queue) console.log(size2) // 3
// End the queue Queue.endUnsafe(queue)
// Size of ended queue is 0 const size3 = Queue.sizeUnsafe(queue) console.log(size3) // 0})Signature
declare const sizeUnsafe: <A, E>(self: Dequeue<A, E>) => numberSince v4.0.0
taking
Section titled “taking”Takes and returns all currently buffered messages without waiting for more.
Details
Returns an empty array when the queue is empty or has completed normally. If the queue has failed, the effect fails with the queue’s error.
Example (Clearing queued values)
import { Cause, Effect, Queue } from "effect"
const program = Effect.gen(function* () { const queue = yield* Queue.bounded<number>(10)
// Add several messages yield* Queue.offerAll(queue, [1, 2, 3, 4, 5])
// Clear all messages from the queue const messages = yield* Queue.clear(queue) console.log(messages) // [1, 2, 3, 4, 5]
// Queue is now empty const size = yield* Queue.size(queue) console.log(size) // 0
// Clearing empty queue returns empty array const empty = yield* Queue.clear(queue) console.log(empty) // []})Signature
declare const clear: <A, E>(self: Dequeue<A, E>) => Effect<Array<A>, Pull.ExcludeDone<E>>Since v4.0.0
collect
Section titled “collect”Takes all messages from the queue, until the queue has errored or is done.
Example (Collecting values until completion)
import { Cause, Effect, Queue } from "effect"
const program = Effect.gen(function* () { const queue = yield* Queue.bounded<number, Cause.Done>(5)
// Add several messages yield* Queue.offerAll(queue, [1, 2, 3, 4, 5]) // Some time later, end the queue yield* Effect.forkChild(Queue.end(queue))
// Collect all available messages const messages = yield* Queue.collect(queue) console.log(messages) // [1, 2, 3, 4, 5]})Signature
declare const collect: <A, E>(self: Dequeue<A, E | Done>) => Effect<Array<A>, Pull.ExcludeDone<E>>Since v4.0.0
Peeks at the next item without removing it.
Details
Blocks until an item is available. If the queue is done or fails, the error is propagated.
Example (Peeking at the next value)
import { Cause, Effect, Queue } from "effect"
const program = Effect.gen(function* () { const queue = yield* Queue.bounded<number>(10) yield* Queue.offer(queue, 42)
// Peek at the next item without removing it const item = yield* Queue.peek(queue) console.log(item) // 42})Signature
declare const peek: <A, E>(self: Dequeue<A, E>) => Effect<A, E>Since v4.0.0
Attempts to take one item from the queue without waiting.
Details
Returns Option.some when an item is immediately available. Returns
Option.none when no item is available, when the queue is done, or when the
immediate take observes a queue failure.
Example (Polling without blocking)
import { Effect, Option, Queue } from "effect"
const program = Effect.gen(function* () { const queue = yield* Queue.bounded<number>(10)
// Poll returns Option.none if empty const maybe1 = yield* Queue.poll(queue) console.log(Option.isNone(maybe1)) // true
// Add an item yield* Queue.offer(queue, 42)
// Poll returns Option.some with the item const maybe2 = yield* Queue.poll(queue) console.log(Option.getOrNull(maybe2)) // 42})Signature
declare const poll: <A, E>(self: Dequeue<A, E>) => Effect<Option.Option<A>>Since v2.0.0
Takes a single message from the queue, or wait for a message to be available.
Details
If the queue is done, it will fail with Done. If the
queue fails, the Effect will fail with the error.
Example (Taking one value)
import { Cause, Effect, Queue } from "effect"
const program = Effect.gen(function* () { const queue = yield* Queue.bounded<string, Cause.Done>(3)
// Add some messages yield* Queue.offer(queue, "first") yield* Queue.offer(queue, "second")
// Take messages one by one const msg1 = yield* Queue.take(queue) const msg2 = yield* Queue.take(queue) console.log(msg1, msg2) // "first", "second"
// End the queue yield* Queue.end(queue)
// Taking from an ended queue fails with Done const result = yield* Effect.match(Queue.take(queue), { onFailure: (error: Cause.Done) => true, onSuccess: (value: string) => false }) console.log("Queue ended:", result) // true})Signature
declare const take: <A, E>(self: Dequeue<A, E>) => Effect<A, E>Since v2.0.0
takeAll
Section titled “takeAll”Takes all currently available messages, waiting until at least one message is available when the queue is empty.
When to use
Use when consumers should process the next non-empty batch of buffered messages instead of repeatedly taking one message at a time.
Details
Returns a non-empty array. If the queue completes or fails before a message can be taken, the effect fails with the queue’s terminal error.
Example (Taking all available values)
import { Cause, Effect, Queue } from "effect"
const program = Effect.gen(function* () { const queue = yield* Queue.bounded<number, Cause.Done>(5)
// Add several messages yield* Queue.offerAll(queue, [1, 2, 3, 4, 5])
// Take all available messages const messages1 = yield* Queue.takeAll(queue) console.log(messages1) // [1, 2, 3, 4, 5]})Signature
declare const takeAll: <A, E>(self: Dequeue<A, E>) => Effect<Arr.NonEmptyArray<A>, E>Since v2.0.0
takeBetween
Section titled “takeBetween”Takes between min and max messages from the queue.
Details
The operation waits when fewer than the required minimum messages are
available. It returns at most max messages. If the queue completes or fails
before the minimum can be satisfied, the effect fails with the queue’s
terminal error.
Example (Taking a bounded batch of values)
import { Cause, Effect, Queue } from "effect"
const program = Effect.gen(function* () { const queue = yield* Queue.bounded<number>(10)
// Add several messages yield* Queue.offerAll(queue, [1, 2, 3, 4, 5, 6, 7, 8])
// Take between 2 and 5 messages const batch1 = yield* Queue.takeBetween(queue, 2, 5) console.log(batch1) // [1, 2, 3, 4, 5] - took 5 (up to max)
// Take between 1 and 10 messages (but only 3 remain) const batch2 = yield* Queue.takeBetween(queue, 1, 10) console.log(batch2) // [6, 7, 8] - took 3 (all remaining)
// No more messages available, will wait or return done // const batch3 = yield* Queue.takeBetween(queue, 1, 3)})Signature
declare const takeBetween: <A, E>(self: Dequeue<A, E>, min: number, max: number) => Effect<Array<A>, E>Since v2.0.0
Takes up to n messages from the queue.
Details
The operation may wait until enough messages are available to satisfy the
queue’s batching rules. If n is less than or equal to zero, it succeeds
with an empty array. If the queue completes or fails before messages can be
taken, the effect fails with the queue’s terminal error.
Example (Taking a fixed number of values)
import { Cause, Effect, Queue } from "effect"
const program = Effect.gen(function* () { const queue = yield* Queue.bounded<number, Cause.Done>(10)
// Add several messages yield* Queue.offerAll(queue, [1, 2, 3, 4, 5, 6, 7])
// Take exactly 3 messages const first3 = yield* Queue.takeN(queue, 3) console.log(first3) // [1, 2, 3]
// Take exactly 2 more messages const next2 = yield* Queue.takeN(queue, 2) console.log(next2) // [4, 5]
// Take remaining messages const remaining = yield* Queue.takeN(queue, 2) console.log(remaining) // [6, 7]})Signature
declare const takeN: <A, E>(self: Dequeue<A, E>, n: number) => Effect<Array<A>, E>Since v2.0.0
takeUnsafe
Section titled “takeUnsafe”Attempts to take one message from the queue synchronously.
When to use
Use when polling queue internals must not suspend or register a waiting taker,
and undefined is an acceptable result for an empty queue.
Details
Returns an Exit for an immediately available message or for the queue’s
terminal state. Returns undefined when no message is immediately available.
This operation does not wait or register a taker.
Example (Taking one value synchronously)
import { Effect, Queue } from "effect"
// Create a queue and use unsafe operationsconst program = Effect.gen(function* () { const queue = yield* Queue.bounded<number>(10)
// Add some messages Queue.offerUnsafe(queue, 1) Queue.offerUnsafe(queue, 2)
// Take a message synchronously const result1 = Queue.takeUnsafe(queue) console.log(result1) // Success(1) or Exit containing value 1
const result2 = Queue.takeUnsafe(queue) console.log(result2) // Success(2)
// No more messages - returns undefined const result3 = Queue.takeUnsafe(queue) console.log(result3) // undefined})Signature
declare const takeUnsafe: <A, E>(self: Dequeue<A, E>) => Exit<A, E> | undefinedSince v4.0.0
Dequeue (namespace)
Section titled “Dequeue (namespace)”Companion namespace containing type-level metadata for the Dequeue
read-only queue interface.
Since v2.0.0
Variance (interface)
Section titled “Variance (interface)”Type-level variance marker for Dequeue.
Details
Dequeue is covariant in both the taken value type A and failure type
E, because values and failures are observed through this handle.
Signature
export interface Variance<A, E> { _A: Types.Covariant<A> _E: Types.Covariant<E>}Since v4.0.0
Enqueue (namespace)
Section titled “Enqueue (namespace)”Companion namespace containing type-level metadata for the Enqueue
write-only queue interface.
Since v2.0.0
Variance (interface)
Section titled “Variance (interface)”Type-level variance marker for Enqueue.
Details
Enqueue is contravariant in both its offered value type A and failure
type E, because values and failures flow into the queue through this
handle.
Signature
export interface Variance<A, E> { _A: Types.Contravariant<A> _E: Types.Contravariant<E>}Since v4.0.0
Queue (namespace)
Section titled “Queue (namespace)”Companion namespace containing type-level metadata and low-level state types
for Queue.
Since v2.0.0
Variance (interface)
Section titled “Variance (interface)”Type-level variance marker for Queue.
Details
A full Queue is invariant in both A and E because the same handle can
both produce and consume values and failures.
Signature
export interface Variance<A, E> { _A: Types.Invariant<A> _E: Types.Invariant<E>}Since v4.0.0
State (type alias)
Section titled “State (type alias)”Tagged state of a Queue.
Details
Open queues can accept offers and takers, Closing queues are
completing with a stored failure exit, and Done queues have finished.
This is low-level metadata exposed by the queue model; most users should
inspect queues through the public operations.
Signature
type State<A, E> = | { readonly _tag: "Open" readonly takers: Set<(_: Effect<void, E>) => void> readonly offers: Set<OfferEntry<A>> readonly awaiters: Set<(_: Effect<void, E>) => void> } | { readonly _tag: "Closing" readonly takers: Set<(_: Effect<void, E>) => void> readonly offers: Set<OfferEntry<A>> readonly awaiters: Set<(_: Effect<void, E>) => void> readonly exit: Failure<never, E> } | { readonly _tag: "Done" readonly exit: Failure<never, E> }Since v4.0.0
OfferEntry (type alias)
Section titled “OfferEntry (type alias)”Represents a suspended offer waiting to be admitted to a bounded queue.
Details
An entry is either a single message or a batch with an offset into its remaining messages, plus a resume callback that completes the suspended offer when the queue can accept more input.
Signature
type OfferEntry<A> = | { readonly _tag: "Array" readonly remaining: Array<A> offset: number readonly resume: (_: Effect<Array<A>>) => void } | { readonly _tag: "Single" readonly message: A readonly resume: (_: Effect<boolean>) => void }Since v4.0.0