Skip to content

@epikodelabs/streamix


@epikodelabs/streamix / createStream

Function: createStream()

createStream<T>(name, generatorFn): Stream<T>

Defined in: projects/libraries/streamix/src/lib/abstractions/stream.ts:271

Creates a multicast Stream from an async generator factory.

The returned Stream starts producing values on the first subscription and delivers each yielded value to all active subscribers.

  • When the last subscriber unsubscribes, the underlying generator is aborted via an AbortSignal.
  • When the generator completes, subscribers are completed and internal receiver references are cleared to avoid memory growth in long-running processes/tests.
  • A new subscription after completion starts a fresh generator run.

Receiver callbacks are executed in a microtask when there is no active emission context, which helps keep delivery ordering consistent and avoids surprising re-entrancy.

Type Parameters

T

T

Value type emitted by the stream.

Parameters

name

string

Human-friendly name (used for debugging/tracing).

generatorFn

(signal?) => AsyncGenerator<T, void, unknown>

Async generator factory. Receives an optional AbortSignal that is aborted when the stream is torn down.

Returns

Stream<T>

A Stream that can be piped, subscribed to, or iterated.

Example

ts
const s = createStream('ticks', async function* (signal) {
  while (!signal?.aborted) {
    yield Date.now();
    await new Promise(r => setTimeout(r, 1000));
  }
});

Released under the MIT License.