⚡ Subjects
Subjects are hot, push-based streams that behave like every other Stream while letting you imperatively next, complete, or error. A single commit loop stamps each emission, keeps ordering deterministic, and makes sure late joins learn the terminal state instantly.
🚦 Core characteristics
- 🔁 Multicast broadcasting – every emission is sent once through the shared commit loop so all active subscribers observe it simultaneously.
- 🚦 Global readiness gate – the reporter only dequeues the next item when
ready.size === receivers.size, which means no new emission starts until every receiver has finished processing the previous one (including any async work that re-adds the receiver toreadyupon resolution). - ⏸️ Per-receiver flow control – each subscriber runs through
createReceiver, so slow handlers buffer values locally while the subject keeps advancing for the rest; once an async handler resolves, it rejoins the ready set and lets the next stamp commit. - 🧱 Imperative producer API –
next,complete, anderrorpush stamped queue entries throughtryCommit, so producers never race with delivery. - 🏁 Late terminal replay – subscribers that register after completion or an error immediately see the stored terminal stamp and notification before returning their
Subscription.
🧭 Emission stamping and delivery
Every producer call records a monotonic stamp. createTryCommit clears entries only when every receiver is ready and reenters the commit loop once asynchronous reactions re-add themselves to ready. This keeps delivery deterministic even if downstream handlers return promises.
🔗 Subscription lifecycle
- 📥 Subscribe with a callback or full
Receiverto get aSubscriptionthat canunsubscribe(). - 🧵 Per-receiver queuing –
createReceiverserializesnextcalls, buffers values when the handler is running, defers completion until the queue drains, and defers each handler call viaqueueMicrotask. - 🧹 Unsubscribe cleanup – removing a receiver triggers
complete()inside a stamped emission so cleanup sees a deterministic stop. - 🕒 Late subscribers – new receivers connect either to the pending queue or immediately replay the terminal stamp (complete/error) if the subject already finished.
🌀 Lazy async iterator with true backpressure
for await (const value of subject) {
// Buffered values are stamped and delivered in order.
}- 🧾 Lazy registration – the iterator only subscribes on the first
next()invocation. - ↔️ Iterator-level buffering – the iterator manages its own backpressure while the subject keeps emitting for other consumers.
- ✅ Clean termination – breaking or returning from the iterator detaches it without completing the subject, so other subscribers remain live.
📦 Value helpers
- 🔍
valuegetter exposes the latest emission (orundefinedbefore anything emits). - 🎯
query()acts likefirstValueFrom, resolving with the next emission and immediately unsubscribing.
⚠️ Error handling
- 🔗 Receiver errors stay local; calling
receiver.error()runs the handler without moving the subject into a terminal state unlesserror(err)was explicitly invoked. - 🧨 Explicit
error(err)stamps the terminal state just like any other emission, ensuring late subscribers immediately see the stored exception. - 🌙 Unhandled error logging – errors thrown inside user handlers are caught, logged, and routed through the stamped lifecycle so the commit loop stays consistent.
🌱 createBehaviorSubject(initialValue)
BehaviorSubject seeds the stream with a value, keeps latestValue up to date, and replays the seed (and every new value) synchronously to each new subscriber before letting it join the live commit loop.
- 🤲
valuenever becomesundefinedbecause the subject always retains the seeded state. - 🔁 Late subscribers immediately receive the current snapshot before seeing future emissions.
- 💡 Ideal for propagating shared state where every consumer needs a warm start.
🔄 createReplaySubject(capacity = Infinity)
ReplaySubject keeps a sliding buffer of recent { value, stamp } entries and replays them in order before handing the live flow back to the supplier.
- 📚 Replayed history drains in sequence; async handlers resolve in stamp order so replay respects their pacing.
- 📦 Capacity keeps the buffer bounded, trimming the oldest values when needed.
- 🚨 Terminal replays deliver completion or errors after the buffer drains, even for subscribers that join after the terminal stamp.
Use replay subjects when new subscribers must catch up before rejoining live emissions.
🧭 Usage patterns
const events = createSubject<{ type: string }>();
events.subscribe(event => console.log("Logger:", event));
events.subscribe(event => sendToAnalytics(event));
events.next({ type: "ready" });
const state = createBehaviorSubject({ user: null });
state.subscribe(s => updateUi(s));
state.next({ user: "alice" });
console.log(state.value); // { user: "alice" }
const logger = createReplaySubject<string>(3);
logger.next("a");
logger.next("b");
logger.subscribe(async v => {
await writeToDisk(v);
});Ready to stream? Get started with Streamix today! ⚙️
Install from NPM ⚡ View on GitHub 📦 Give Feedback 💬
Choose your tools wisely, keep it simple, and may your reactive pipelines be pragmatic and interoperable. 🤝