streamix
Reactive streams built on async generators. Small bundle, pull-based execution, and a familiar operator API.

Give a Star on GitHub
If streamix helps you, please give it a star: https://github.com/epikodelabs/streamix
✨ Why Streamix
streamix is a reactive streams library built on async generators. It focuses on a small bundle size and pull-based execution while keeping an API that feels familiar to RxJS users, and normalizing async operations toward an iterator-first workflow keeps each stream predictable, which makes debugging and testing easier whether you are building a dashboard, a CLI, or a background job processor.
Highlights
- Pull-based execution so values are computed when requested
- Async iterator first, designed for
for await...of - Async callbacks are supported in
subscribehandlers query()retrieves actual emitted value as a promise- Operators for mapping, filtering, combination, and control flow
- Subjects for manual emission and multicasting
- Optional HTTP client and DOM observation utilities
📦 Installation
# npm
npm install @epikodelabs/streamix
# yarn
yarn add @epikodelabs/streamix
# pnpm
pnpm add @epikodelabs/streamix⚡️ Quick start
The quick start below shows how to lift generators or ranged sequences into operator pipelines; you can iterate them directly or fall back to subscribe when you need push-style delivery.
Basic stream operations
import { range, map, filter, take } from '@epikodelabs/streamix';
const stream = range(1, 100)
.pipe(
map(x => x * 2),
filter(x => x % 3 === 0),
take(5)
);
for await (const value of stream) {
console.log(value); // 6, 12, 18, 24, 30
}Handling user events
import { fromEvent, debounce, map, filter } from '@epikodelabs/streamix';
const searchInput = document.getElementById('search');
const searchStream = fromEvent(searchInput, 'input')
.pipe(
map(event => event.target.value),
debounce(300),
filter(text => text.length > 2)
);
for await (const searchTerm of searchStream) {
console.log('Searching for:', searchTerm);
}🧠 Core concepts
Streams
Streams are sequences of values over time, implemented as async generators:
import { createStream } from '@epikodelabs/streamix';
async function* numberStream() {
for (let i = 0; i < 10; i++) {
yield i;
await new Promise(resolve => setTimeout(resolve, 100));
}
}
const stream = createStream('numbers', numberStream);🏭 Available factories
The library ships a range of helper factories so you can stand up common sources without calling createStream directly:
combineLatest(...sources)- join the latest values from multiple streams.concat(...sources)- run sources sequentially, one after another.defer(factory)- build a fresh stream by invoking the factory per subscription.EMPTY()- a stream that immediately completes without emitting anything.forkJoin(...sources)- emit once with the final values after all sources complete.from(source)- lift arrays, iterables, async generators, or promises into a stream.fromEvent(target, event)- convert DOM/Node-style events into a stream.fromPromise(promise)- wrap a promise-producing operation so it emits once and completes.iif(condition, trueSource, falseSource)- branch between two creator callbacks.interval(ms)- emit an increasing counter everymsmilliseconds.loop(factory)- repeat a factory-based generator while it keeps yielding.merge(...sources)- interleave concurrent emissions from multiple sources.of(...values)- emit the provided values in order and then complete.race(...sources)- mirror the first source to emit and cancel the rest.range(start, count)- emit a fixed range of sequential numbers.retry(source, attempts)- repeat a source when it errors, up toattemptstimes.timer(delay, period?)- emit after an initial delay and optionally repeat.zip(...sources)- pair emissions from sources by matching indexes.
🛠️ Available operators
Operators compose async generators with familiar transformations so you can restructure logic without nested blocks.
stream.pipe(
map(x => x * 2),
filter(x => x > 10),
take(5),
debounce(100)
)Operators handle sync and async callbacks transparently:
const magicShow = from(storyBook)
.pipe(
map(page => page.length),
map(async length => {
await thinkAboutIt(1000);
return length * 2;
}),
filter(num => num > 10)
);Full operator catalog: audit, buffer, bufferCount, bufferUntil, bufferWhile,catchError, concatMap, debounce, defaultIfEmpty, delay, delayUntil, distinctUntilChanged, distinctUntilKeyChanged, endWith, exhaustMap, expand, filter, finalize, first, fork, groupBy, ignoreElements, last, map, mergeMap, observeOn, partition, reduce, sample, scan, select, shareReplay, skip, skipUntil, skipWhile, slidingPair, startWith, switchMap, take, takeUntil, takeWhile, tap, throttle, throwError, toArray, withLatestFrom.
Build custom operators
Every built-in operator you already know is just a wrapper around createOperator. It lets you capture the underlying iterator and return a new async iterator that applies whatever scheduling, buffering, or branching logic you need before handing values to the downstream consumer.
import { createOperator, DONE, NEXT } from '@epikodelabs/streamix';
const evenOnly = () =>
createOperator<number, number>('evenOnly', function (source) {
return {
async next() {
while (true) {
const result = await source.next();
if (result.done) return DONE;
if (result.value % 2 === 0) return NEXT(result.value);
}
},
return: source.return?.bind(source),
throw: source.throw?.bind(source),
};
});Now you can mix evenOnly() into any pipeline just like the built-ins:
const stream = from([1, 2, 3, 4]).pipe(evenOnly(), map(n => n * 10));Because createOperator works directly with async iterators, you get the same pull-based backpressure behavior that powers the rest of the library and can freely interleave async callbacks, metadata, and cancellation hooks.
Subjects
Manually control stream emissions:
import { createSubject } from '@epikodelabs/streamix';
const subject = createSubject<string>();
for await (const value of subject) {
console.log('Received:', value);
}
subject.next('Hello');
subject.next('World');
subject.complete();Query the first value
query() retrieves the actual emitted value as a promise, then automatically unsubscribes.
import { interval, take, map } from '@epikodelabs/streamix';
const stream = interval(1000).pipe(take(1));
const first = await stream.query();
console.log('first:', first);
const transformed = interval(500).pipe(
map(value => value * 10),
take(1)
);
const result = await transformed.query();
console.log('result:', result);🌐 HTTP client
streamix includes an HTTP client that composes well with streams:
import { map, retry } from '@epikodelabs/streamix';
import {
createHttpClient,
readJson,
useBase,
useLogger,
useTimeout
} from '@epikodelabs/streamix/networking';
const client = createHttpClient().withDefaults(
useBase("https://api.example.com"),
useLogger(),
useTimeout(5000)
);
const dataStream = retry(() => client.get("/users", readJson), 3)
.pipe(
map(users => users.filter(user => user.active))
);
for await (const activeUsers of dataStream) {
console.log('Active users:', activeUsers);
}🧪 Real-world example
Live search with API calls and basic error handling:
import {
fromEvent,
fromPromise,
debounce,
map,
filter,
switchMap,
startWith
} from '@epikodelabs/streamix';
const searchInput = document.getElementById('search');
const resultsDiv = document.getElementById('results');
const searchResults = fromEvent(searchInput, 'input')
.pipe(
map(e => e.target.value.trim()),
debounce(300),
filter(query => query.length > 2),
switchMap(query => fromPromise(
fetch(`/api/search?q=${query}`)
.then(r => r.json())
.catch(() => ({ error: 'Search failed', query }))
)),
startWith({ results: [], query: '' })
);
for await (const result of searchResults) {
if (result.error) {
resultsDiv.innerHTML = `<p class="error">${result.error}</p>`;
} else {
resultsDiv.innerHTML = result.results
.map(item => `<div class="result">${item.title}</div>`)
.join('');
}
}🎬 Live demos
🧬 Generator-based architecture
Unlike push-based streams, streamix uses pull-based async generators:
import { createStream, take } from '@epikodelabs/streamix';
async function* expensiveStream() {
for (let i = 0; i < 1000000; i++) {
yield expensiveComputation(i);
}
}
const stream = createStream('calculations', expensiveStream)
.pipe(take(10));This enables:
- On-demand computation
- Lower memory usage per stream
- Natural backpressure from the consumer
⚖️ Streamix vs RxJS
| Feature | Streamix | RxJS |
|---|---|---|
| Bundle size | Small, generator-based core | Larger, broad operator set |
| Learning curve | Moderate, smaller API surface | Steeper, larger surface area |
| Execution model | Pull-based | Push-based |
| Async/await | Native | Limited |
| Backpressure | Consumer-driven | Requires patterns |
📚 Documentation and resources
🤝 Contributing
We welcome issues and pull requests. If you are new to the codebase:
- Open an issue with a minimal reproduction for bugs
- Propose features with a short problem statement and example
- Improve docs with focused changes
📜 License
MIT License
Get started
Install from NPM - View on GitHub - Give Feedback
API Reference
Check the detailed API Reference here.