TxPubSub.ts
TxPubSub.ts overview
Section titled “TxPubSub.ts overview”Broadcasts values to subscribers inside Effect transactions.
A TxPubSub<A> is a transactional publish/subscribe hub. Each subscriber
owns a TxQueue, and each published value is offered to the subscriber
queues that are registered at the time of publication. This module includes
bounded, dropping, sliding, and unbounded hubs, publishing helpers, scoped
subscriptions, shutdown operations, and a guard.
Since v4.0.0
Exports Grouped by Category
Section titled “Exports Grouped by Category”constructors
Section titled “constructors”bounded
Section titled “bounded”Creates a bounded TxPubSub with the specified capacity. When a subscriber’s queue is full, the publisher will retry the transaction until space is available.
Example (Creating a bounded pub/sub)
import { Effect, TxPubSub, TxQueue } from "effect"
const program = Effect.gen(function* () { const hub = yield* TxPubSub.bounded<number>(16)
yield* Effect.scoped( Effect.gen(function* () { const sub = yield* TxPubSub.subscribe(hub) yield* TxPubSub.publish(hub, 42) const value = yield* TxQueue.take(sub) console.log(value) // 42 }) )})Signature
declare const bounded: <A = never>(capacity: number) => Effect.Effect<TxPubSub<A>>Since v2.0.0
dropping
Section titled “dropping”Creates a dropping TxPubSub with the specified capacity. When a subscriber’s queue is full, the message is dropped for that subscriber.
Example (Creating a dropping pub/sub)
import { Effect, TxPubSub, TxQueue } from "effect"
const program = Effect.gen(function* () { const hub = yield* TxPubSub.dropping<number>(2)
yield* Effect.scoped( Effect.gen(function* () { const sub = yield* TxPubSub.subscribe(hub) yield* TxPubSub.publish(hub, 1) yield* TxPubSub.publish(hub, 2) yield* TxPubSub.publish(hub, 3) // dropped const v1 = yield* TxQueue.take(sub) const v2 = yield* TxQueue.take(sub) console.log(v1, v2) // 1 2 }) )})Signature
declare const dropping: <A = never>(capacity: number) => Effect.Effect<TxPubSub<A>>Since v2.0.0
sliding
Section titled “sliding”Creates a sliding TxPubSub with the specified capacity. When a subscriber’s queue is full, the oldest message in that subscriber’s queue is dropped.
Example (Creating a sliding pub/sub)
import { Effect, TxPubSub, TxQueue } from "effect"
const program = Effect.gen(function* () { const hub = yield* TxPubSub.sliding<number>(2)
yield* Effect.scoped( Effect.gen(function* () { const sub = yield* TxPubSub.subscribe(hub) yield* TxPubSub.publish(hub, 1) yield* TxPubSub.publish(hub, 2) yield* TxPubSub.publish(hub, 3) // evicts 1 const v1 = yield* TxQueue.take(sub) console.log(v1) // 2 }) )})Signature
declare const sliding: <A = never>(capacity: number) => Effect.Effect<TxPubSub<A>>Since v2.0.0
unbounded
Section titled “unbounded”Creates an unbounded TxPubSub with unlimited capacity. Messages are always accepted.
Example (Creating an unbounded pub/sub)
import { Effect, TxPubSub, TxQueue } from "effect"
const program = Effect.gen(function* () { const hub = yield* TxPubSub.unbounded<string>()
yield* Effect.scoped( Effect.gen(function* () { const sub = yield* TxPubSub.subscribe(hub) yield* TxPubSub.publish(hub, "msg") const msg = yield* TxQueue.take(sub) console.log(msg) // "msg" }) )})Signature
declare const unbounded: <A = never>() => Effect.Effect<TxPubSub<A>>Since v2.0.0
getters
Section titled “getters”capacity
Section titled “capacity”Returns the capacity of the TxPubSub.
Example (Reading pub/sub capacity)
import { Effect, TxPubSub } from "effect"
const program = Effect.gen(function* () { const hub = yield* TxPubSub.bounded<number>(16) console.log(TxPubSub.capacity(hub)) // 16})Signature
declare const capacity: <A>(self: TxPubSub<A>) => numberSince v2.0.0
isEmpty
Section titled “isEmpty”Checks whether the TxPubSub has no pending messages (all subscriber queues are empty).
Example (Checking whether a pub/sub is empty)
import { Effect, TxPubSub } from "effect"
const program = Effect.gen(function* () { const hub = yield* TxPubSub.unbounded<number>() const empty = yield* TxPubSub.isEmpty(hub) console.log(empty) // true})Signature
declare const isEmpty: <A>(self: TxPubSub<A>) => Effect.Effect<boolean>Since v2.0.0
isFull
Section titled “isFull”Checks whether any subscriber queue is at capacity.
Example (Checking whether a pub/sub is full)
import { Effect, TxPubSub } from "effect"
const program = Effect.gen(function* () { const hub = yield* TxPubSub.bounded<number>(2) const full = yield* TxPubSub.isFull(hub) console.log(full) // false})Signature
declare const isFull: <A>(self: TxPubSub<A>) => Effect.Effect<boolean>Since v2.0.0
isShutdown
Section titled “isShutdown”Checks whether the TxPubSub has been shut down.
Example (Checking whether a pub/sub is shut down)
import { Effect, TxPubSub } from "effect"
const program = Effect.gen(function* () { const hub = yield* TxPubSub.unbounded<number>() console.log(yield* TxPubSub.isShutdown(hub)) // false yield* TxPubSub.shutdown(hub) console.log(yield* TxPubSub.isShutdown(hub)) // true})Signature
declare const isShutdown: <A>(self: TxPubSub<A>) => Effect.Effect<boolean>Since v2.0.0
Returns the current number of messages across all subscriber queues (the max).
Example (Reading subscriber queue size)
import { Effect, TxPubSub, TxQueue } from "effect"
const program = Effect.gen(function* () { const hub = yield* TxPubSub.unbounded<number>()
yield* Effect.scoped( Effect.gen(function* () { const sub = yield* TxPubSub.subscribe(hub) yield* TxPubSub.publish(hub, 1) yield* TxPubSub.publish(hub, 2) const s = yield* TxPubSub.size(hub) console.log(s) // 2 }) )})Signature
declare const size: <A>(self: TxPubSub<A>) => Effect.Effect<number>Since v2.0.0
guards
Section titled “guards”isTxPubSub
Section titled “isTxPubSub”Checks whether the given value is a TxPubSub.
Example (Checking for a TxPubSub)
import { TxPubSub } from "effect"
declare const someValue: unknown
if (TxPubSub.isTxPubSub(someValue)) { console.log("This is a TxPubSub")}Signature
declare const isTxPubSub: (u: unknown) => u is TxPubSub<unknown>Since v4.0.0
models
Section titled “models”TxPubSub (interface)
Section titled “TxPubSub (interface)”A TxPubSub represents a transactional publish/subscribe hub that broadcasts messages to all current subscribers using Software Transactional Memory (STM) semantics.
Example (Subscribing to a transactional pub/sub)
import { Effect, TxPubSub, TxQueue } from "effect"
const program = Effect.gen(function* () { const hub = yield* TxPubSub.unbounded<string>()
yield* Effect.scoped( Effect.gen(function* () { const sub = yield* TxPubSub.subscribe(hub) yield* TxPubSub.publish(hub, "hello") const msg = yield* TxQueue.take(sub) console.log(msg) // "hello" }) )})Signature
export interface TxPubSub<in out A> extends Inspectable, Pipeable { readonly [TypeId]: typeof TypeId /** @internal */ readonly subscribersRef: TxRef.TxRef<Array<TxQueue.TxQueue<A>>> /** @internal */ readonly shutdownRef: TxRef.TxRef<boolean> readonly strategy: "bounded" | "unbounded" | "dropping" | "sliding" readonly capacity: number}Since v4.0.0
mutations
Section titled “mutations”acquireSubscriber
Section titled “acquireSubscriber”Creates a subscriber queue and registers it with the pub/sub.
When to use
Use to create and register a subscriber queue inside a larger transaction when registration must be atomic with other Tx operations.
Details
This is the transactional acquire step of subscribe, exposed so that callers can compose it with other Tx operations in a single transaction, such as TxSubscriptionRef.changes.
See
subscribefor the scoped acquire and release wrapper when no custom transaction composition is neededreleaseSubscriberto remove and shut down a queue returned byacquireSubscriber
Signature
declare const acquireSubscriber: <A>(self: TxPubSub<A>) => Effect.Effect<TxQueue.TxQueue<A>, never, Effect.Transaction>Since v4.0.0
awaitShutdown
Section titled “awaitShutdown”Waits for the TxPubSub to be shut down.
Example (Waiting for shutdown)
import { Effect, TxPubSub } from "effect"
const program = Effect.gen(function* () { const hub = yield* TxPubSub.unbounded<number>()
const fiber = yield* Effect.forkChild(TxPubSub.awaitShutdown(hub)) yield* TxPubSub.shutdown(hub) yield* fiber.await})Signature
declare const awaitShutdown: <A>(self: TxPubSub<A>) => Effect.Effect<void>Since v2.0.0
publish
Section titled “publish”Publishes a message to all current subscribers.
Details
Returns true if the message was delivered to all subscribers, or false if the hub is shut down or the message was dropped for any subscriber. For the bounded strategy, the transaction retries if any subscriber queue is full. For the sliding strategy, full subscriber queues drop their oldest messages. For the dropping strategy, full subscriber queues drop the new message and the operation returns false.
Example (Publishing a message to subscribers)
import { Effect, TxPubSub, TxQueue } from "effect"
const program = Effect.gen(function* () { const hub = yield* TxPubSub.unbounded<string>()
// No subscribers - publish is a no-op const r1 = yield* TxPubSub.publish(hub, "no one listening") console.log(r1) // true
yield* Effect.scoped( Effect.gen(function* () { const sub = yield* TxPubSub.subscribe(hub) yield* TxPubSub.publish(hub, "hello") const msg = yield* TxQueue.take(sub) console.log(msg) // "hello" }) )})Signature
declare const publish: { <A>(value: A): (self: TxPubSub<A>) => Effect.Effect<boolean> <A>(self: TxPubSub<A>, value: A): Effect.Effect<boolean>}Since v2.0.0
publishAll
Section titled “publishAll”Publishes all messages from an iterable to all current subscribers.
Details
Returns true if all messages were delivered to all subscribers.
Example (Publishing multiple messages to subscribers)
import { Effect, TxPubSub, TxQueue } from "effect"
const program = Effect.gen(function* () { const hub = yield* TxPubSub.unbounded<number>()
yield* Effect.scoped( Effect.gen(function* () { const sub = yield* TxPubSub.subscribe(hub) yield* TxPubSub.publishAll(hub, [1, 2, 3]) const v1 = yield* TxQueue.take(sub) const v2 = yield* TxQueue.take(sub) const v3 = yield* TxQueue.take(sub) console.log(v1, v2, v3) // 1 2 3 }) )})Signature
declare const publishAll: { <A>(values: Iterable<A>): (self: TxPubSub<A>) => Effect.Effect<boolean> <A>(self: TxPubSub<A>, values: Iterable<A>): Effect.Effect<boolean>}Since v2.0.0
releaseSubscriber
Section titled “releaseSubscriber”Removes a subscriber queue from the pub/sub and shuts it down.
When to use
Use to release a manually acquired subscriber queue inside a larger transaction, removing it from the pub/sub and shutting it down together with related transactional cleanup.
Details
This is the transactional release step of subscribe, exposed so that callers can compose it with other Tx operations in a single transaction.
Gotchas
The supplied queue is shut down after being removed, so callers should pass a queue acquired for this pub/sub.
See
acquireSubscriberfor the matching transactional acquire stepsubscribefor the scoped acquire and release wrapper
Signature
declare const releaseSubscriber: { <A>(queue: TxQueue.TxQueue<A>): (self: TxPubSub<A>) => Effect.Effect<void, never, Effect.Transaction> <A>(self: TxPubSub<A>, queue: TxQueue.TxQueue<A>): Effect.Effect<void, never, Effect.Transaction>}Since v4.0.0
shutdown
Section titled “shutdown”Shuts down the TxPubSub and all subscriber queues registered at the time of shutdown.
Details
After shutdown, publish and publishAll return false, and awaitShutdown completes. The operation is idempotent.
Gotchas
Subscribers acquired after shutdown are not automatically shut down by this call.
Example (Shutting down a pub/sub)
import { Effect, TxPubSub } from "effect"
const program = Effect.gen(function* () { const hub = yield* TxPubSub.unbounded<number>() yield* TxPubSub.shutdown(hub)
const shut = yield* TxPubSub.isShutdown(hub) console.log(shut) // true
const accepted = yield* TxPubSub.publish(hub, 1) console.log(accepted) // false})Signature
declare const shutdown: <A>(self: TxPubSub<A>) => Effect.Effect<void>Since v2.0.0
subscribe
Section titled “subscribe”Subscribes to the TxPubSub, returning a scoped TxQueue for messages published after subscription.
Details
The returned queue uses the hub’s capacity strategy: bounded subscriptions backpressure publishers when full, dropping subscriptions may miss new messages when full, and sliding subscriptions may evict older queued messages. The subscription is automatically removed when the scope is closed.
Example (Subscribing multiple queues)
import { Effect, TxPubSub, TxQueue } from "effect"
const program = Effect.gen(function* () { const hub = yield* TxPubSub.unbounded<string>()
yield* Effect.scoped( Effect.gen(function* () { const sub1 = yield* TxPubSub.subscribe(hub) const sub2 = yield* TxPubSub.subscribe(hub)
yield* TxPubSub.publish(hub, "broadcast")
const msg1 = yield* TxQueue.take(sub1) const msg2 = yield* TxQueue.take(sub2) console.log(msg1, msg2) // "broadcast" "broadcast" }) )})Signature
declare const subscribe: <A>(self: TxPubSub<A>) => Effect.Effect<TxQueue.TxQueue<A>, never, Scope.Scope>Since v2.0.0