Sink.ts
Sink.ts overview
Section titled “Sink.ts overview”Consumes values from a Stream and produces one final result.
A Sink may read no input, a fixed amount of input, or keep reading until a
condition is met. If it reads more than it needs, it can return leftovers so
the stream can continue from those values. Sinks are used to collect, fold,
search, count, or otherwise reduce streamed input, and they can be composed
when a stream needs more than one consuming step.
Since v2.0.0
Exports Grouped by Category
Section titled “Exports Grouped by Category”- Finalization
- collecting
- constructors
- collect
- count
- die
- drain
- every
- fail
- failCause
- failCauseSync
- failSync
- find
- findEffect
- forEach
- forEachArray
- forEachWhile
- forEachWhileArray
- fromChannel
- fromEffect
- fromEffectEnd
- fromPubSub
- fromQueue
- fromTransform
- head
- last
- make
- never
- some
- succeed
- sum
- suspend
- sync
- takeUntil
- takeUntilEffect
- takeWhile
- takeWhileEffect
- takeWhileFilter
- takeWhileFilterEffect
- timed
- toChannel
- unwrap
- error handling
- filtering
- folding
- guards
- mapping
- models
- reducing
- sequencing
- services
- utils
Finalization
Section titled “Finalization”ensuring
Section titled “ensuring”Runs a finalizer effect after this sink completes, fails, or is interrupted.
Details
The original sink result and leftovers are preserved unless the finalizer itself fails.
Signature
declare const ensuring: { <X, E2, R2>( effect: Effect.Effect<X, E2, R2> ): <A, E, In, L, R>(self: Sink<A, In, L, E, R>) => Sink<A, In, L, E | E2, R2 | R> <A, In, L, E, R, X, E2, R2>( self: Sink<A, In, L, E, R>, effect: Effect.Effect<X, E2, R2> ): Sink<A, In, L, E | E2, R | R2>}Since v2.0.0
onExit
Section titled “onExit”Runs an effect after this sink completes, fails, or is interrupted.
Details
The effect receives the sink’s Exit for the result value. The original
sink result and leftovers are preserved unless the finalizer itself fails.
Signature
declare const onExit: { <A, E, X, E2, R2>( f: (exit: Exit.Exit<A, E>) => Effect.Effect<X, E2, R2> ): <In, L, R>(self: Sink<A, In, L, E, R>) => Sink<A, In, L, E | E2, R2 | R> <A, In, L, E, R, X, E2, R2>( self: Sink<A, In, L, E, R>, f: (exit: Exit.Exit<A, E>) => Effect.Effect<X, E2, R2> ): Sink<A, In, L, E | E2, R | R2>}Since v4.0.0
collecting
Section titled “collecting”Collects up to n input elements into an array.
Details
If n is less than or equal to zero, the sink completes with an empty array.
If more elements are pulled than needed, the remaining elements from the same
array are returned as leftovers.
Signature
declare const take: <In>(n: number) => Sink<Array<In>, In, In>Since v2.0.0
constructors
Section titled “constructors”collect
Section titled “collect”Accumulates incoming elements into an array.
When to use
Use when you need a sink result containing all upstream input elements.
See
takefor collecting only a fixed number of input elements
Signature
declare const collect: <In>() => Sink<Array<In>, In>Since v4.0.0
A sink that counts the number of elements fed to it.
When to use
Use to consume input and return only the number of elements received.
Signature
declare const count: Sink<number, unknown, never, never, never>Since v2.0.0
Creates a sink halting with a specified defect.
Example (Dying with a defect)
import { Effect, Sink, Stream } from "effect"
// Create a sink that dies with a defectconst sink = Sink.die(new Error("Defect error"))
// Use it with a streamconst stream = Stream.make(1, 2, 3)const program = Stream.run(stream, sink)
Effect.runPromise(program).catch(console.log)// Output: Error: Defect errorSignature
declare const die: (defect: unknown) => Sink<never>Since v2.0.0
Consumes and ignores all stream inputs.
When to use
Use to consume all upstream input and complete with void when the input values and any aggregate result are not needed.
See
countfor consuming all input while returning the number of elementsforEachfor consuming all input while running an effect for each element
Signature
declare const drain: Sink<void, unknown, never, never, never>Since v2.0.0
A sink that returns whether all elements satisfy the specified predicate.
When to use
Use to reduce a stream to a boolean that is true only when every input satisfies a pure predicate.
See
somefor the dual any-match check
Signature
declare const every: <In>(predicate: Predicate<In>) => Sink<boolean, In, In>Since v2.0.0
A sink that always fails with the specified error.
Example (Failing with an error)
import { Effect, Sink, Stream } from "effect"
// Create a sink that always failsconst sink = Sink.fail(new Error("Sink failed"))
// Use it with a streamconst stream = Stream.make(1, 2, 3)const program = Stream.run(stream, sink)
Effect.runPromise(program).catch(console.log)// Output: Error: Sink failedSignature
declare const fail: <E>(e: E) => Sink<never, unknown, never, E>Since v2.0.0
failCause
Section titled “failCause”Creates a sink halting with a specified Cause.
Example (Failing with a cause)
import { Cause, Effect, Sink, Stream } from "effect"
// Create a sink that fails with a specific causeconst sink = Sink.failCause(Cause.fail(new Error("Custom cause")))
// Use it with a streamconst stream = Stream.make(1, 2, 3)const program = Stream.run(stream, sink)
Effect.runPromise(program).catch(console.log)// Output: Error: Custom causeSignature
declare const failCause: <E>(cause: Cause.Cause<E>) => Sink<never, unknown, never, E>Since v2.0.0
failCauseSync
Section titled “failCauseSync”Creates a sink halting with a specified lazily evaluated Cause.
Example (Failing with a lazy cause)
import { Cause, Effect, Sink, Stream } from "effect"
// Create a sink that fails with a lazy causeconst sink = Sink.failCauseSync(() => Cause.fail(new Error("Lazy cause")))
// Use it with a streamconst stream = Stream.make(1, 2, 3)const program = Stream.run(stream, sink)
Effect.runPromise(program).catch(console.log)// Output: Error: Lazy causeSignature
declare const failCauseSync: <E>(evaluate: LazyArg<Cause.Cause<E>>) => Sink<never, unknown, never, E>Since v2.0.0
failSync
Section titled “failSync”A sink that always fails with the specified lazily evaluated error.
Example (Failing with a lazy error)
import { Effect, Sink, Stream } from "effect"
// Create a sink that fails with a lazy errorconst sink = Sink.failSync(() => new Error("Lazy error"))
// Use it with a streamconst stream = Stream.make(1, 2, 3)const program = Stream.run(stream, sink)
Effect.runPromise(program).catch(console.log)// Output: Error: Lazy errorSignature
declare const failSync: <E>(evaluate: LazyArg<E>) => Sink<never, unknown, never, E>Since v2.0.0
Creates a sink containing the first value matched by a synchronous predicate.
When to use
Use to scan stream input until the first matching element is found and return
that element as an Option.
Details
Returns Option.none if the upstream stream ends before a match is found.
Refinement predicates narrow the returned value type. The matching input is
consumed; any later elements from the same pulled array are returned as
leftovers.
See
findEffectfor an effectful predicate that can fail or require services
Signature
declare const find: { <In, Out extends In>(refinement: Refinement<In, Out>): Sink<Option.Option<Out>, In, In> <In>(predicate: Predicate<In>): Sink<Option.Option<In>, In, In>}Since v4.0.0
findEffect
Section titled “findEffect”Creates a sink containing the first value matched by an effectful predicate.
When to use
Use when you need to run effects, fail, or use services while searching for the first matching input.
Details
Returns Option.some with the first input whose predicate result is true,
or Option.none if the upstream stream ends first. If the predicate effect
fails, the sink fails with the same error.
See
findfor the synchronous predicate variant
Signature
declare const findEffect: <In, E, R>( predicate: (input: In) => Effect.Effect<boolean, E, R>) => Sink<Option.Option<In>, In, In, E, R>Since v2.0.0
forEach
Section titled “forEach”A sink that executes the provided effectful function for every item fed to it.
Example (Running effects for each item)
import { Console, Effect, Sink, Stream } from "effect"
// Create a sink that logs each itemconst sink = Sink.forEach((item: number) => Console.log(`Processing: ${item}`))
// Use it with a streamconst stream = Stream.make(1, 2, 3)const program = Stream.run(stream, sink)
Effect.runPromise(program)// Output:// Processing: 1// Processing: 2// Processing: 3Signature
declare const forEach: <In, X, E, R>(f: (input: In) => Effect.Effect<X, E, R>) => Sink<void, In, never, E, R>Since v2.0.0
forEachArray
Section titled “forEachArray”A sink that executes the provided effectful function for every Chunk fed to it.
Example (Running effects for each chunk)
import { Console, Effect, Sink, Stream } from "effect"
// Create a sink that processes chunksconst sink = Sink.forEachArray((chunk: ReadonlyArray<number>) => Console.log(`Processing chunk of ${chunk.length} items: [${chunk.join(", ")}]`))
// Use it with a streamconst stream = Stream.make(1, 2, 3, 4, 5)const program = Stream.run(stream, sink)
Effect.runPromise(program)// Output: Processing chunk of 5 items: [1, 2, 3, 4, 5]Signature
declare const forEachArray: <In, X, E, R>( f: (input: NonEmptyReadonlyArray<In>) => Effect.Effect<X, E, R>) => Sink<void, In, never, E, R>Since v4.0.0
forEachWhile
Section titled “forEachWhile”Runs an effectful function for each input element while it returns true.
Details
The sink stops consuming input when the function returns false or when the
upstream stream ends, and completes with void.
Signature
declare const forEachWhile: <In, E, R>(f: (input: In) => Effect.Effect<boolean, E, R>) => Sink<void, In, never, E, R>Since v2.0.0
forEachWhileArray
Section titled “forEachWhileArray”Runs an effectful function for each non-empty input array while it returns
true.
Details
The sink stops consuming input when the function returns false or when the
upstream stream ends, and completes with void.
Signature
declare const forEachWhileArray: <In, E, R>( f: (input: NonEmptyReadonlyArray<In>) => Effect.Effect<boolean, E, R>) => Sink<void, In, never, E, R>Since v4.0.0
fromChannel
Section titled “fromChannel”Creates a sink from a Channel.
When to use
Use to create a Sink from a Channel that processes non-empty arrays of
input values.
See
toChannelfor converting aSinkback to aChannel
Signature
declare const fromChannel: <L, In, E, A, R>( channel: Channel.Channel<never, E, End<A, L>, NonEmptyReadonlyArray<In>, never, void, R>) => Sink<A, In, L, E, R>Since v2.0.0
fromEffect
Section titled “fromEffect”Creates a sink that ignores upstream input and completes with the success value of the provided effect.
Details
If the effect fails, the sink fails with the same error.
Signature
declare const fromEffect: <A, E, R>(effect: Effect.Effect<A, E, R>) => Sink<A, unknown, never, E, R>Since v2.0.0
fromEffectEnd
Section titled “fromEffectEnd”Creates a sink that ignores upstream input and completes from an effect that
already returns an End.
When to use
Use when you need to create a sink from an effect that returns both the sink result value and optional leftovers.
Signature
declare const fromEffectEnd: <A, E, R, L = never>(effect: Effect.Effect<End<A, L>, E, R>) => Sink<A, unknown, L, E, R>Since v4.0.0
fromPubSub
Section titled “fromPubSub”Creates a sink that publishes every consumed input element to a PubSub.
Details
The sink completes with void when the upstream stream ends.
Signature
declare const fromPubSub: <A>(pubsub: PubSub.PubSub<A>) => Sink<void, A>Since v2.0.0
fromQueue
Section titled “fromQueue”Creates a sink that offers every consumed input element to a queue.
Details
When the upstream stream ends, the sink ends the queue and completes with
void.
Signature
declare const fromQueue: <A>(queue: Queue.Queue<A, Cause.Done>) => Sink<void, A>Since v2.0.0
fromTransform
Section titled “fromTransform”Creates a Sink from a low-level transform function.
Details
The transform receives the upstream pull of non-empty input arrays and the
active scope, and returns an effect that completes with the sink’s End
value.
Signature
declare const fromTransform: <In, A, E, R, L = never>( transform: ( upstream: Pull.Pull<NonEmptyReadonlyArray<In>, never, void>, scope: Scope.Scope ) => Effect.Effect<End<A, L>, E, R>) => Sink<A, In, L, E, R>Since v4.0.0
Creates a sink containing the first value.
Details
Returns Option.some(first) for non-empty input, or Option.none when the
upstream ends without input. The first element is consumed; later elements
from the same pulled array are emitted as leftovers.
Signature
declare const head: <In>() => Sink<Option.Option<In>, In, In>Since v2.0.0
Creates a sink containing the last value.
When to use
Use when you need to consume all upstream input and keep only the final element.
Details
Returns Option.some(last) with the final input value, or Option.none when
the upstream ends without input.
Gotchas
This sink produces a result only when the upstream ends, so it does not complete for a stream that does not end.
See
headfor taking the first input value instead
Signature
declare const last: <In>() => Sink<Option.Option<In>, In>Since v2.0.0
Creates a pipe-style constructor for sinks over input type In.
Details
The returned function exposes the sink input as a Stream<In>, applies the
provided pipeline, and uses the final effect’s success value as the sink
result.
Signature
declare const make: <In>() => make.Constructor<In>Since v4.0.0
A sink that never completes.
Signature
declare const never: Sink<unknown, unknown, never, never, never>Since v2.0.0
A sink that returns whether an element satisfies the specified predicate.
When to use
Use to reduce a stream to a boolean that is true when any input satisfies a pure predicate.
See
everyfor the all-match check
Signature
declare const some: <In>(predicate: Predicate<In>) => Sink<boolean, In, In>Since v2.0.0
succeed
Section titled “succeed”A sink that immediately ends with the specified value.
Example (Succeeding with a value)
import { Effect, Sink, Stream } from "effect"
// Create a sink that always yields the same valueconst sink = Sink.succeed(42)
// Use it with a streamconst stream = Stream.make(1, 2, 3)const program = Stream.run(stream, sink)
Effect.runPromise(program).then(console.log)// Output: 42Signature
declare const succeed: <A, L = never>(a: A, leftovers?: NonEmptyReadonlyArray<L> | undefined) => Sink<A, unknown, L>Since v2.0.0
Creates a sink which sums up its inputs.
Signature
declare const sum: Sink<number, number, never, never, never>Since v2.0.0
suspend
Section titled “suspend”A sink that is created from a lazily evaluated sink.
Signature
declare const suspend: <A, In, L, E, R>(evaluate: LazyArg<Sink<A, In, L, E, R>>) => Sink<A, In, L, E, R>Since v2.0.0
A sink that immediately ends with the specified lazily evaluated value.
Signature
declare const sync: <A>(a: LazyArg<A>) => Sink<A>Since v2.0.0
takeUntil
Section titled “takeUntil”Collects input elements until the predicate returns true, including the
matching element in the result.
Signature
declare const takeUntil: <In>(predicate: Predicate<In>) => Sink<Array<In>, In, In>Since v4.0.0
takeUntilEffect
Section titled “takeUntilEffect”Collects input elements effectfully until the predicate returns true,
including the matching element in the result.
Details
If the predicate effect fails, the sink fails with the same error.
Signature
declare const takeUntilEffect: <In, E, R>( predicate: (input: In) => Effect.Effect<boolean, E, R>) => Sink<Array<In>, In, In, E, R>Since v4.0.0
takeWhile
Section titled “takeWhile”Collects the longest input prefix whose elements satisfy the predicate or refinement.
Details
The first failing input is consumed and excluded from the result. Any later elements from the same pulled array are returned as leftovers.
Signature
declare const takeWhile: { <In, Out extends In>(refinement: Refinement<In, Out>): Sink<Array<Out>, In, In> <In>(predicate: Predicate<In>): Sink<Array<In>, In, In>}Since v4.0.0
takeWhileEffect
Section titled “takeWhileEffect”Collects input elements effectfully while the predicate succeeds.
Details
The first input for which the predicate returns false is consumed and
excluded from the result. Any later elements from the same pulled array are
returned as leftovers.
Signature
declare const takeWhileEffect: <In, E, R>( predicate: (input: In) => Effect.Effect<boolean, E, R>) => Sink<Array<In>, In, In, E, R>Since v4.0.0
takeWhileFilter
Section titled “takeWhileFilter”Applies a Filter to input elements while it succeeds, collecting each
successful output.
Details
The first input for which the filter fails is consumed and excluded from the result. Any later elements from the same pulled array are returned as leftovers.
Signature
declare const takeWhileFilter: <In, Out, X>(filter: Filter.Filter<In, Out, X>) => Sink<Array<Out>, In, In>Since v4.0.0
takeWhileFilterEffect
Section titled “takeWhileFilterEffect”Applies a FilterEffect to input elements effectfully while it succeeds,
collecting each successful output.
Details
The first input for which the filter fails is consumed and excluded from the result. Any later elements from the same pulled array are returned as leftovers.
Signature
declare const takeWhileFilterEffect: <In, Out, X, E, R>( filter: Filter.FilterEffect<In, Out, X, E, R>) => Sink<Array<Out>, In, In, E, R>Since v4.0.0
A sink that drains all input and returns the elapsed duration.
Signature
declare const timed: Sink<Duration.Duration, unknown, never, never, never>Since v2.0.0
toChannel
Section titled “toChannel”Creates a Channel from a Sink.
Example (Converting a sink to a channel)
import { Sink } from "effect"
// Create a sink and extract its channelconst sink = Sink.succeed(42)const channel = Sink.toChannel(sink)Signature
declare const toChannel: <A, In, L, E, R>( self: Sink<A, In, L, E, R>) => Channel.Channel<never, E, End<A, L>, NonEmptyReadonlyArray<In>, never, void, R>Since v2.0.0
unwrap
Section titled “unwrap”Creates a sink produced from a scoped effect.
Example (Unwrapping a sink effect)
import { Console, Effect, Sink, Stream } from "effect"
// Create a sink from an effect that produces a sinkconst sinkEffect = Effect.succeed(Sink.forEach((item: number) => Console.log(`Item: ${item}`)))const sink = Sink.unwrap(sinkEffect)
// Use it with a streamconst stream = Stream.make(1, 2, 3)const program = Stream.run(stream, sink)
Effect.runPromise(program)// Output:// Item: 1// Item: 2// Item: 3Signature
declare const unwrap: <A, In, L, E, R, R2>( effect: Effect.Effect<Sink<A, In, L, E, R2>, E, R>) => Sink<A, In, L, E, Exclude<R, Scope.Scope> | R2>Since v2.0.0
error handling
Section titled “error handling”Handles typed errors from this sink with an effectful fallback value.
When to use
Use to recover from a typed sink failure by producing the replacement
result with an Effect.
See
catchCausefor recovering from the full failure causeorElsefor recovering by switching to another sink
Signature
declare const catch: { <E, A2, E2, R2>(f: (error: Types.NoInfer<E>) => Effect.Effect<A2, E2, R2>): <A, In, L, R>(self: Sink<A, In, L, E, R>) => Sink<A2 | A, In, L, E, R2 | R>; <A, In, L, E, R, A2, E2, R2>(self: Sink<A, In, L, E, R>, f: (error: E) => Effect.Effect<A2, E2, R2>): Sink<A | A2, In, L, E2, R | R2>; }Since v4.0.0
catchCause
Section titled “catchCause”Handles failures from this sink by inspecting the full Cause.
When to use
Use to recover from a sink failure based on the full Cause instead of only
the typed error value.
Details
When this sink fails, the handler effect is run and its success value becomes the sink result. If the handler fails, the returned sink fails with that error.
See
catchfor recovering from typed errors onlyorElsefor recovering by switching to another sink
Signature
declare const catchCause: { <E, A2, E2, R2>( f: (error: Cause.Cause<Types.NoInfer<E>>) => Effect.Effect<A2, E2, R2> ): <A, In, L, R>(self: Sink<A, In, L, E, R>) => Sink<A2 | A, In, L, E, R2 | R> <A, In, L, E, R, A2, E2, R2>( self: Sink<A, In, L, E, R>, f: (error: Cause.Cause<E>) => Effect.Effect<A2, E2, R2> ): Sink<A | A2, In, L, E2, R | R2>}Since v4.0.0
orElse
Section titled “orElse”Runs a fallback sink if this sink fails with a typed error.
Details
The fallback is built from the error and continues consuming from the same upstream stream. If the upstream stream had already ended, the fallback sees the upstream end instead.
Signature
declare const orElse: { <E, A2, In2, L2, E2, R2>( f: (error: Types.NoInfer<E>) => Sink<A2, In2, L2, E2, R2> ): <A, In, L, R>(self: Sink<A, In, L, E, R>) => Sink<A2 | A, In & In2, L2 | L, E2 | E, R2 | R> <A, In, L, E, R, A2, In2, L2, E2, R2>( self: Sink<A, In, L, E, R>, f: (error: E) => Sink<A2, In2, L2, E2, R2> ): Sink<A | A2, In & In2, L | L2, E | E2, R | R2>}Since v2.0.0
filtering
Section titled “filtering”ignoreLeftover
Section titled “ignoreLeftover”Drops leftovers produced by a sink.
Details
The sink result is preserved, but any leftover elements are discarded instead of being returned to downstream sink composition. This does not continue pulling additional elements from the upstream stream.
Signature
declare const ignoreLeftover: <A, In, L, E, R>(self: Sink<A, In, L, E, R>) => Sink<A, In, never, E, R>Since v2.0.0
folding
Section titled “folding”A sink that folds its inputs with the provided function, termination predicate and initial state.
When to use
Use to accumulate stream input element by element with an effectful step and stop based on the accumulated state.
Details
The initial state is evaluated lazily. Each input element is folded with the
effectful function, and the sink continues while contFn returns true. If
the sink stops in the middle of a pulled array, the remaining elements from
that array are returned as leftovers.
See
foldArrayfor folding each pulled non-empty input array at oncefoldUntilfor folding until a fixed maximum number of elements is consumed
Signature
declare const fold: <S, In, E = never, R = never>( s: LazyArg<S>, contFn: Predicate<S>, f: (s: S, input: In) => Effect.Effect<S, E, R>) => Sink<S, In, In, E, R>Since v2.0.0
foldArray
Section titled “foldArray”Folds non-empty input arrays into state with an effectful function.
When to use
Use to update state with an effectful function once per pulled non-empty input array when batch-level processing is the natural unit.
Details
The initial state is evaluated lazily. After each pulled array is folded,
the sink continues while contFn returns true; otherwise it completes
with the current state.
See
foldfor folding element by element and returning leftovers when stopping mid-arrayreduceWhileArrayEffectfor array-level effectful reducing that checks the predicate before consuming input
Signature
declare const foldArray: <S, In, E = never, R = never>( s: LazyArg<S>, contFn: Predicate<S>, f: (s: S, input: Arr.NonEmptyReadonlyArray<In>) => Effect.Effect<S, E, R>) => Sink<S, In, never, E, R>Since v4.0.0
foldUntil
Section titled “foldUntil”Folds input elements into state until the specified maximum number of elements has been consumed or the upstream stream ends.
Details
If the sink stops in the middle of a pulled array, the remaining elements from that array are returned as leftovers.
Signature
declare const foldUntil: <S, In, E = never, R = never>( s: LazyArg<S>, max: number, f: (s: S, input: In) => Effect.Effect<S, E, R>) => Sink<S, In, In, E, R>Since v2.0.0
guards
Section titled “guards”isSink
Section titled “isSink”Checks whether a value is a Sink.
Example (Checking for a sink)
import { Sink } from "effect"
const sink = Sink.neverconst notStream = { data: [1, 2, 3] }
console.log(Sink.isSink(sink)) // trueconsole.log(Sink.isSink(notStream)) // falseSignature
declare const isSink: (u: unknown) => u is Sink<unknown, never, unknown, unknown, unknown>Since v4.0.0
mapping
Section titled “mapping”Sets the sink’s result to a constant value.
When to use
Use to keep a sink’s input consumption, errors, requirements, and leftovers while replacing only its result with a known value.
See
mapfor computing the replacement from the original result
Signature
declare const as: { <A2>(a2: A2): <A, In, L, E, R>(self: Sink<A, In, L, E, R>) => Sink<A2, In, L, E, R> <A, In, L, E, R, A2>(self: Sink<A, In, L, E, R>, a2: A2): Sink<A2, In, L, E, R>}Since v2.0.0
Transforms this sink’s result.
When to use
Use to compute a new result from the original sink result while preserving the sink’s input consumption behavior.
Details
The transformed sink preserves the original sink’s input type, leftovers, errors, and requirements.
See
mapEffectfor effectful result transformationsasfor replacing the result with a constant valuemapEndfor transforming both the result and leftovers
Signature
declare const map: { <A, A2>(f: (a: A) => A2): <In, L, E, R>(self: Sink<A, In, L, E, R>) => Sink<A2, In, L, E, R> <A, In, L, E, R, A2>(self: Sink<A, In, L, E, R>, f: (a: A) => A2): Sink<A2, In, L, E, R>}Since v2.0.0
mapEffect
Section titled “mapEffect”Transforms this sink’s result effectfully.
When to use
Use when you need a sink result transformation that is effectful, can fail, or requires services.
Details
The transformed sink preserves the original sink’s input consumption and leftovers while adding the errors and requirements of the transformation.
See
mapfor pure result transformationsmapEffectEndfor effectfully transforming both the result and leftoversflatMapfor continuing with another sink based on the result
Signature
declare const mapEffect: { <A, A2, E2, R2>( f: (a: A) => Effect.Effect<A2, E2, R2> ): <In, L, E, R>(self: Sink<A, In, L, E, R>) => Sink<A2, In, L, E2 | E, R2 | R> <A, In, L, E, R, A2, E2, R2>( self: Sink<A, In, L, E, R>, f: (a: A) => Effect.Effect<A2, E2, R2> ): Sink<A2, In, L, E | E2, R | R2>}Since v2.0.0
mapEffectEnd
Section titled “mapEffectEnd”Transforms the full End produced by this sink effectfully.
Details
This can change both the result value and the optional leftovers, and the transformation can fail or require services.
Signature
declare const mapEffectEnd: { <A, L, A2, E2, R2, L2 = never>( f: (end: End<A, L>) => Effect.Effect<End<A2, L2>, E2, R2> ): <In, E, R>(self: Sink<A, In, L, E, R>) => Sink<A2, In, L2, E2 | E, R2 | R> <A, In, L, E, R, A2, E2, R2, L2 = never>( self: Sink<A, In, L, E, R>, f: (end: End<A, L>) => Effect.Effect<End<A2, L2>, E2, R2> ): Sink<A2, In, L2, E | E2, R | R2>}Since v4.0.0
mapEnd
Section titled “mapEnd”Transforms the full End produced by this sink.
Details
This can change both the result value and the optional leftovers.
Signature
declare const mapEnd: { <A, L, A2, L2 = never>( f: (a: End<A, L>) => End<A2, L2> ): <In, E, R>(self: Sink<A, In, L, E, R>) => Sink<A2, In, L2, E, R> <A, In, L, E, R, A2, L2 = never>(self: Sink<A, In, L, E, R>, f: (a: End<A, L>) => End<A2, L2>): Sink<A2, In, L2, E, R>}Since v4.0.0
mapError
Section titled “mapError”Transforms the errors emitted by this sink using f.
Signature
declare const mapError: { <E, E2>(f: (error: E) => E2): <A, In, L, R>(self: Sink<A, In, L, E, R>) => Sink<A, In, L, E2, R> <A, In, L, E, R, E2>(self: Sink<A, In, L, E, R>, f: (error: E) => E2): Sink<A, In, L, E2, R>}Since v2.0.0
mapInput
Section titled “mapInput”Transforms this sink’s input elements.
Signature
declare const mapInput: { <In0, In>(f: (input: In0) => In): <A, L, E, R>(self: Sink<A, In, L, E, R>) => Sink<A, In0, L, E, R> <A, In, L, E, R, In0>(self: Sink<A, In, L, E, R>, f: (input: In0) => In): Sink<A, In0, L, E, R>}Since v2.0.0
mapInputArray
Section titled “mapInputArray”Transforms each non-empty array of upstream input before it is fed to this sink.
Signature
declare const mapInputArray: { <In0, In>( f: (input: Arr.NonEmptyReadonlyArray<In0>) => Arr.NonEmptyReadonlyArray<In> ): <A, L, E, R>(self: Sink<A, In, L, E, R>) => Sink<A, In0, L, E, R> <A, In, L, E, R, In0>( self: Sink<A, In, L, E, R>, f: (input: Arr.NonEmptyReadonlyArray<In0>) => Arr.NonEmptyReadonlyArray<In> ): Sink<A, In0, L, E, R>}Since v4.0.0
mapInputArrayEffect
Section titled “mapInputArrayEffect”Transforms each non-empty array of upstream input effectfully before it is fed to this sink.
Signature
declare const mapInputArrayEffect: { <In0, In, E2, R2>( f: (input: Arr.NonEmptyReadonlyArray<In0>) => Effect.Effect<Arr.NonEmptyReadonlyArray<In>, E2, R2> ): <A, L, E, R>(self: Sink<A, In, L, E, R>) => Sink<A, In0, L, E2 | E, R2 | R> <A, In, L, E, R, In0, E2, R2>( self: Sink<A, In, L, E, R>, f: (input: Arr.NonEmptyReadonlyArray<In0>) => Effect.Effect<Arr.NonEmptyReadonlyArray<In>, E2, R2> ): Sink<A, In0, L, E | E2, R | R2>}Since v4.0.0
mapInputEffect
Section titled “mapInputEffect”Transforms this sink’s input elements effectfully.
Signature
declare const mapInputEffect: { <In0, In, E2, R2>( f: (input: In0) => Effect.Effect<In, E2, R2> ): <A, L, E, R>(self: Sink<A, In, L, E, R>) => Sink<A, In0, L, E2 | E, R2 | R> <A, In, L, E, R, In0, E2, R2>( self: Sink<A, In, L, E, R>, f: (input: In0) => Effect.Effect<In, E2, R2> ): Sink<A, In0, L, E | E2, R | R2>}Since v2.0.0
mapLeftover
Section titled “mapLeftover”Transforms the leftovers emitted by this sink using f.
Signature
declare const mapLeftover: { <L, L2>(f: (leftover: L) => L2): <A, In, E, R>(self: Sink<A, In, L, E, R>) => Sink<A, In, L2, E, R> <A, In, L, E, R, L2>(self: Sink<A, In, L, E, R>, f: (leftover: L) => L2): Sink<A, In, L2, E, R>}Since v2.0.0
summarized
Section titled “summarized”Runs a summary effect when the sink starts and again when it completes.
Signature
declare const summarized: { <A2, E2, R2, A3>( summary: Effect.Effect<A2, E2, R2>, f: (start: A2, end: A2) => A3 ): <A, In, L, E, R>(self: Sink<A, In, L, E, R>) => Sink<[A, A3], In, L, E2 | E, R2 | R> <A, In, L, E, R, A2, E2, R2, A3>( self: Sink<A, In, L, E, R>, summary: Effect.Effect<A2, E2, R2>, f: (start: A2, end: A2) => A3 ): Sink<[A, A3], In, L, E | E2, R | R2>}Since v2.0.0
withDuration
Section titled “withDuration”Returns the sink that executes this one and times its execution.
Signature
declare const withDuration: <A, In, L, E, R>(self: Sink<A, In, L, E, R>) => Sink<[A, Duration.Duration], In, L, E, R>Since v2.0.0
models
Section titled “models”End (type alias)
Section titled “End (type alias)”Tuple returned when a Sink finishes.
Details
The first element is the sink result. The optional second element contains a non-empty array of leftover input that was pulled but not consumed.
Signature
type End<A, L> = readonly [value: A, leftover?: NonEmptyReadonlyArray<L> | undefined]Since v4.0.0
Sink (interface)
Section titled “Sink (interface)”A Sink<A, In, L, E, R> is used to consume elements produced by a Stream.
You can think of a sink as a function that will consume a variable amount of
In elements (could be 0, 1, or many), might fail with an error of type E,
and will eventually yield a value of type A together with a remainder of
type L (i.e. any leftovers).
Example (Running a sink with a stream)
import { Effect, Sink, Stream } from "effect"
// Create a simple sink that always succeeds with a valueconst sink: Sink.Sink<number> = Sink.succeed(42)
// Use the sink to consume a streamconst stream = Stream.make(1, 2, 3)const program = Stream.run(stream, sink)
Effect.runPromise(program).then(console.log)// Output: 42Signature
export interface Sink<out A, in In = unknown, out L = never, out E = never, out R = never> extends Sink.Variance<A, In, L, E, R>, Pipeable { readonly transform: ( upstream: Pull.Pull<NonEmptyReadonlyArray<In>, never, void>, scope: Scope.Scope ) => Effect.Effect<End<A, L>, E, R> [Unify.typeSymbol]?: unknown [Unify.unifySymbol]?: SinkUnify<this> [Unify.ignoreSymbol]?: SinkUnifyIgnore}Since v2.0.0
SinkUnify (interface)
Section titled “SinkUnify (interface)”Type-level unification support for Sink values.
Details
This preserves the result, input, leftover, error, and service type
parameters when Effect’s Unify machinery normalizes generic values that
include sinks. Users normally do not need to reference this interface
directly.
Signature
export interface SinkUnify<A extends { [Unify.typeSymbol]?: any }> extends Effect.EffectUnify<A> { Sink?: () => A[Unify.typeSymbol] extends Sink<infer A, infer In, infer L, infer E, infer R> | infer _ ? Sink<A, In, L, E, R> : never}Since v2.0.0
SinkUnifyIgnore (interface)
Section titled “SinkUnifyIgnore (interface)”Marker used by Effect’s Unify machinery for Sink values.
Details
It prevents the inherited Effect unifier from being selected when
sink-specific unification should preserve the Sink type parameters. Users
normally do not need to reference this interface directly.
Signature
export interface SinkUnifyIgnore { Effect?: true}Since v2.0.0
reducing
Section titled “reducing”reduce
Section titled “reduce”A sink that reduces its inputs using the provided function f starting from
the provided initial state.
Signature
declare const reduce: <S, In>(initial: LazyArg<S>, f: (s: S, input: In) => S) => Sink<S, In>Since v4.0.0
reduceArray
Section titled “reduceArray”A sink that reduces its inputs using the provided function f starting from
the specified initial state.
Signature
declare const reduceArray: <S, In>(initial: LazyArg<S>, f: (s: S, input: NonEmptyReadonlyArray<In>) => S) => Sink<S, In>Since v4.0.0
reduceEffect
Section titled “reduceEffect”A sink that reduces its inputs using the provided effectful function f
starting from the specified initial state.
Signature
declare const reduceEffect: <S, In, E, R>( initial: LazyArg<S>, f: (s: S, input: In) => Effect.Effect<S, E, R>) => Sink<S, In, never, E, R>Since v4.0.0
reduceWhile
Section titled “reduceWhile”A sink that reduces input elements from the provided initial state with
f while the specified predicate returns true.
Signature
declare const reduceWhile: <S, In>( initial: LazyArg<S>, predicate: Predicate<S>, f: (s: S, input: In) => S) => Sink<S, In, In>Since v4.0.0
reduceWhileArray
Section titled “reduceWhileArray”A sink that reduces non-empty input arrays from the provided initial state
with f while the specified predicate returns true.
Signature
declare const reduceWhileArray: <S, In>( initial: LazyArg<S>, contFn: Predicate<S>, f: (s: S, input: NonEmptyReadonlyArray<In>) => S) => Sink<S, In>Since v4.0.0
reduceWhileArrayEffect
Section titled “reduceWhileArrayEffect”A sink that effectfully reduces non-empty input arrays from the provided
initial state with f while the specified predicate returns true.
Signature
declare const reduceWhileArrayEffect: <S, In, E, R>( initial: LazyArg<S>, predicate: Predicate<S>, f: (s: S, input: NonEmptyReadonlyArray<In>) => Effect.Effect<S, E, R>) => Sink<S, In, never, E, R>Since v4.0.0
reduceWhileEffect
Section titled “reduceWhileEffect”A sink that effectfully reduces input elements from the provided initial
state with f while the specified predicate returns true.
Signature
declare const reduceWhileEffect: <S, In, E, R>( initial: LazyArg<S>, predicate: Predicate<S>, f: (s: S, input: In) => Effect.Effect<S, E, R>) => Sink<S, In, In, E, R>Since v4.0.0
sequencing
Section titled “sequencing”flatMap
Section titled “flatMap”Runs this sink until it yields a result, then uses that result to create another sink from the provided function which will continue to run until it yields a result.
When to use
Use to compose sinks when the next sink depends on the result produced by the previous sink.
Details
Leftovers from the first sink are fed to the sink returned by f before more
upstream input is pulled.
See
mapfor transforming the result without switching sinksmapEffectfor effectfully transforming the result without switching sinks
Signature
declare const flatMap: { <A, A1, L, In1 extends L, L1, E1, R1>( f: (a: A) => Sink<A1, In1, L1, E1, R1> ): <In, E, R>(self: Sink<A, In, L, E, R>) => Sink<A1, In & In1, L1 | L, E1 | E, R1 | R> <A, In, L, E, R, A1, In1 extends L, L1, E1, R1>( self: Sink<A, In, L, E, R>, f: (a: A) => Sink<A1, In1, L1, E1, R1> ): Sink<A1, In & In1, L | L1, E | E1, R | R1>}Since v2.0.0
services
Section titled “services”provideContext
Section titled “provideContext”Provides a Context to this sink.
Details
Services contained in the provided context are removed from the sink’s service requirements.
Signature
declare const provideContext: { <Provided>( context: Context.Context<Provided> ): <A, In, L, E, R>(self: Sink<A, In, L, E, R>) => Sink<A, In, L, E, Exclude<R, Provided>> <A, In, L, E, R, Provided>( self: Sink<A, In, L, E, R>, context: Context.Context<Provided> ): Sink<A, In, L, E, Exclude<R, Provided>>}Since v2.0.0
provideService
Section titled “provideService”Provides a single service implementation to this sink.
Details
The service identified by key is removed from the sink’s service
requirements.
Signature
declare const provideService: { <I, S>( key: Context.Key<I, S>, value: Types.NoInfer<S> ): <A, In, L, E, R>(self: Sink<A, In, L, E, R>) => Sink<A, In, L, E, Exclude<R, I>> <A, In, L, E, R, I, S>( self: Sink<A, In, L, E, R>, key: Context.Key<I, S>, value: Types.NoInfer<S> ): Sink<A, In, L, E, Exclude<R, I>>}Since v4.0.0
Sink (namespace)
Section titled “Sink (namespace)”Namespace containing types and interfaces for Sink variance and type relationships.
Since v2.0.0
Variance (interface)
Section titled “Variance (interface)”Type-level variance marker for Sink.
Details
The result A, leftovers L, errors E, and services R are
covariant. The input type In is contravariant because values flow into
the sink.
Signature
export interface Variance<out A, in In, out L, out E, out R> { readonly [TypeId]: VarianceStruct<A, In, L, E, R>}Since v2.0.0
VarianceStruct (interface)
Section titled “VarianceStruct (interface)”Structural encoding used by Sink.Variance to record each Sink type
parameter’s variance.
Details
_A, _L, _E, and _R are covariant markers. _In is a
contravariant marker.
Signature
export interface VarianceStruct<out A, in In, out L, out E, out R> { _A: Types.Covariant<A> _In: Types.Contravariant<In> _L: Types.Covariant<L> _E: Types.Covariant<E> _R: Types.Covariant<R>}Since v2.0.0
make (namespace)
Section titled “make (namespace)”Companion namespace containing overload types for the pipe-style sink
constructor returned by Sink.make.
Since v4.0.0
Constructor (interface)
Section titled “Constructor (interface)”Overloaded function type returned by Sink.make.
Details
The first pipeline function receives the sink input as a Stream<In>. The
final pipeline step must return an Effect, whose success value becomes
the sink result.
Signature
export interface Constructor<In> { <E, R, B = never>(ab: (_: Stream<In>) => Effect.Effect<B, E, R>): Sink<B, In, never, E, Exclude<R, Scope.Scope>> <E, R, B = never, C = never>( ab: (_: Stream<In>) => B, bc: (_: B) => Effect.Effect<C, E, R> ): Sink<C, In, never, E, Exclude<R, Scope.Scope>> <E, R, B = never, C = never, D = never>( ab: (_: Stream<In>) => B, bc: (_: B) => C, cd: (_: C) => Effect.Effect<D, E, R> ): Sink<D, In, never, E, Exclude<R, Scope.Scope>> <E, R, B = never, C = never, D = never, F = never>( ab: (_: Stream<In>) => B, bc: (_: B) => C, cd: (_: C) => D, df: (_: D) => Effect.Effect<F, E, R> ): Sink<F, In, never, E, Exclude<R, Scope.Scope>> <E, R, B = never, C = never, D = never, F = never, G = never>( ab: (_: Stream<In>) => B, bc: (_: B) => C, cd: (_: C) => D, df: (_: D) => F, fg: (_: F) => Effect.Effect<G, E, R> ): Sink<G, In, never, E, Exclude<R, Scope.Scope>> <E, R, B = never, C = never, D = never, F = never, G = never, H = never>( ab: (_: Stream<In>) => B, bc: (_: B) => C, cd: (_: C) => D, df: (_: D) => F, fg: (_: F) => G, gh: (_: G) => Effect.Effect<H, E, R> ): Sink<H, In, never, E, Exclude<R, Scope.Scope>> <E, R, B = never, C = never, D = never, F = never, G = never, H = never, I = never>( ab: (_: Stream<In>) => B, bc: (_: B) => C, cd: (_: C) => D, df: (_: D) => F, fg: (_: F) => G, gh: (_: G) => H, hi: (_: H) => Effect.Effect<I, E, R> ): Sink<I, In, never, E, Exclude<R, Scope.Scope>> <E, R, B = never, C = never, D = never, F = never, G = never, H = never, I = never, J = never>( ab: (_: Stream<In>) => B, bc: (_: B) => C, cd: (_: C) => D, df: (_: D) => F, fg: (_: F) => G, gh: (_: G) => H, hi: (_: H) => I, ij: (_: I) => Effect.Effect<J, E, R> ): Sink<J, In, never, E, Exclude<R, Scope.Scope>> <E, R, B = never, C = never, D = never, F = never, G = never, H = never, I = never, J = never, K = never>( ab: (_: Stream<In>) => B, bc: (_: B) => C, cd: (_: C) => D, df: (_: D) => F, fg: (_: F) => G, gh: (_: G) => H, hi: (_: H) => I, ij: (_: I) => J, jk: (_: J) => Effect.Effect<K, E, R> ): Sink<K, In, never, E, Exclude<R, Scope.Scope>> <E, R, B = never, C = never, D = never, F = never, G = never, H = never, I = never, J = never, K = never, L = never>( ab: (_: Stream<In>) => B, bc: (_: B) => C, cd: (_: C) => D, df: (_: D) => F, fg: (_: F) => G, gh: (_: G) => H, hi: (_: H) => I, ij: (_: I) => J, jk: (_: J) => K, kl: (_: K) => Effect.Effect<L, E, R> ): Sink<L, In, never, E, Exclude<R, Scope.Scope>>}Since v4.0.0