Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 27 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

An iteration of the Node.js core streams with a series of improvements.

```
npm install streamx
```sh
$ npm install streamx
```

[![Build Status](https://github.com/streamxorg/streamx/workflows/Build%20Status/badge.svg)](https://github.com/streamxorg/streamx/actions?query=workflow%3A%22Build+Status%22)
Expand Down Expand Up @@ -55,15 +55,15 @@ improvements above.

streamx has a much smaller footprint when compiled for the browser:

```
```sh
$ for x in stream{,x}; do echo $x: $(browserify -r $x | wc -c) bytes; done
stream: 173844 bytes
streamx: 46943 bytes
```

With optimizations turned on, the difference is even more stark:

```
```sh
$ for x in stream{,x}; do echo $x: $(browserify -r $x -p tinyify | wc -c) bytes; done
stream: 62649 bytes
streamx: 8460 bytes
Expand Down Expand Up @@ -105,7 +105,7 @@ Create a new readable stream.

Options include:

```
```js
{
highWaterMark: 16384, // max buffer size in bytes
map: (data) => data, // optional function to map input data
Expand All @@ -118,6 +118,12 @@ Options include:
In addition you can pass the `open`, `read`, and `destroy` functions as shorthands in
the constructor instead of overwrite the methods below.

`open`, `read` and `destroy` have the same signature as the `._open`, `._read` and `._destroy`
methods but also support async variants. For example: You can pass in `async read() {}` instead
of `read(cb) {}`

All passed-in functions will be executed with the readable stream as `this`.

The default byteLength function returns the byte length of buffers and `1024`
for any other object. This means the buffer will contain around 16 non buffers
or buffers worth 16kb when full if the defaults are used.
Expand Down Expand Up @@ -263,7 +269,7 @@ Create a new writable stream.

Options include:

```
```js
{
highWaterMark: 16384, // max buffer size in bytes
map: (data) => data, // optional function to map input data
Expand All @@ -272,8 +278,14 @@ Options include:
}
```

In addition you can pass the `open`, `write`, `final`, and `destroy` functions as shorthands in
the constructor instead of overwrite the methods below.
In addition you can pass the `open`, `write`, `writev`, `final`, and `destroy` functions as
shorthands in the constructor instead of overwrite the methods below.

`open`, `write`, `writev`, `final` and `destroy` have the same signature as the `._open`,
`._write`, `._writev`. `._final` and `._destroy` methods but also support async variants.
For example: You can pass in `async write(data) {}` instead of `write(data, cb) {}`.

All passed-in functions will be executed with the writable stream as `this`.

The default byteLength function returns the byte length of buffers and `1024`
for any other object. This means the buffer will contain around 16 non buffers
Expand Down Expand Up @@ -400,7 +412,13 @@ in `read` or `write`/`writev` or to override the corresponding `._read`, `._writ

A transform stream is a duplex stream that maps the data written to it and emits that as readable data.

Has the same options as a duplex stream except you can provide a `transform` function also.
Has the same options as a duplex stream except you can provide a `transform` and `flush` function also.

The `transform` and `flush` operations have the same signature as the `._transform` and `._flush` but
`._transform` also support an async variant. For example:
You can pass in `async transform (data) {}` instead of `transform (data, cb) {}`.

All passed-in functions will be executed with the duplex stream as `this`.

#### `ts._transform(data, callback)`

Expand Down
50 changes: 39 additions & 11 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,34 @@ function afterTransform (err, data) {
this._writableState.afterWrite(err)
}

function mapAsync (name, asyncOrNot) {
if (asyncOrNot.length === 0) {
return function (cb) {
const p = asyncOrNot.call(this)
if (!p) return cb(new Error(`Async template .${name} is expected to return a Promise.`))
p.then(
data => cb(null, data),
cb
)
}
}
return asyncOrNot
}

function mapAsync1 (name, asyncOrNot) {
if (asyncOrNot.length === 1) {
return function (arg, cb) {
const p = asyncOrNot.call(this, arg)
if (!p) return cb(new Error(`Async template .${name} is expected to return a Promise.`))
p.then(
data => cb(null, data),
cb
)
}
}
return asyncOrNot
}

class Stream extends EventEmitter {
constructor (opts) {
super()
Expand All @@ -518,8 +546,8 @@ class Stream extends EventEmitter {
this._writableState = null

if (opts) {
if (opts.open) this._open = opts.open
if (opts.destroy) this._destroy = opts.destroy
if (opts.open) this._open = mapAsync('open', opts.open)
if (opts.destroy) this._destroy = mapAsync('destroy', opts.destroy)
if (opts.predestroy) this._predestroy = opts.predestroy
if (opts.signal) {
opts.signal.addEventListener('abort', abort.bind(this))
Expand Down Expand Up @@ -602,7 +630,7 @@ class Readable extends Stream {
this._readableState = new ReadableState(this, opts)

if (opts) {
if (opts.read) this._read = opts.read
if (opts.read) this._read = mapAsync('read', opts.read)
if (opts.eagerOpen) this.resume().pause()
}
}
Expand Down Expand Up @@ -759,9 +787,9 @@ class Writable extends Stream {
this._writableState = new WritableState(this, opts)

if (opts) {
if (opts.writev) this._writev = opts.writev
if (opts.write) this._write = opts.write
if (opts.final) this._final = opts.final
if (opts.writev) this._writev = mapAsync1('writev', opts.writev)
if (opts.write) this._write = mapAsync1('write', opts.write)
if (opts.final) this._final = mapAsync('final', opts.final)
}
}

Expand Down Expand Up @@ -801,9 +829,9 @@ class Duplex extends Readable { // and Writable
this._writableState = new WritableState(this, opts)

if (opts) {
if (opts.writev) this._writev = opts.writev
if (opts.write) this._write = opts.write
if (opts.final) this._final = opts.final
if (opts.writev) this._writev = mapAsync1('writev', opts.writev)
if (opts.write) this._write = mapAsync1('write', opts.write)
if (opts.final) this._final = mapAsync('final', opts.final)
}
}

Expand Down Expand Up @@ -837,8 +865,8 @@ class Transform extends Duplex {
this._transformState = new TransformState(this)

if (opts) {
if (opts.transform) this._transform = opts.transform
if (opts.flush) this._flush = opts.flush
if (opts.transform) this._transform = mapAsync1('transform', opts.transform)
if (opts.flush) this._flush = mapAsync('flush', opts.flush)
}
}

Expand Down
2 changes: 1 addition & 1 deletion test/duplex.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ tape('if open does not end, it should stall', function (t) {
t.plan(1)

const d = new Duplex({
open () {
open (_cb) {
t.pass('open called')
},
read () {
Expand Down
29 changes: 29 additions & 0 deletions test/passthrough.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,32 @@ tape('passthrough', t => {
r.push('bar')
r.push(null)
})

tape('async transform option', async function (t) {
const r = Readable.from([1, 2, 3]).pipe(new PassThrough({
async transform (a) {
return a.toString()
}
}))

const result = []
for await (const entry of r) {
result.push(entry)
}
t.same(result, ['1', '2', '3'])
t.end()
})

tape('async final option', async function (t) {
const r = Readable.from([1, 2, 3]).pipe(new PassThrough({
flush () {
return new Promise(resolve => setTimeout(resolve, 30))
}
}))
const start = Date.now()
r.on('close', () => {
t.ok((Date.now() - start) > 25)
t.end()
})
r.resume()
})
2 changes: 1 addition & 1 deletion test/pipe.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ tape('simple pipe', function (t) {
cb(null)
},

final () {
async final () {
t.pass('final called')
t.same(buffered, ['hello', 'world'])
t.end()
Expand Down
57 changes: 57 additions & 0 deletions test/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -231,3 +231,60 @@ tape('use mapReadable to map data', async function (t) {
t.deepEquals(obj, { foo: 1 })
}
})

tape('async read option', async function (t) {
let index = 0
const data = ['a', 'b', 'c', null]
const r = new Readable({
async read () {
this.push(data[index++])
}
})
const res = []
for await (const entry of r) {
res.push(entry)
}
t.same(res, ['a', 'b', 'c'])
t.end()
})

tape('async open option', function (t) {
const r = new Readable({
open () {
return new Promise(resolve => setTimeout(resolve, 30))
}
})
const start = Date.now()
r.on('data', () => {
t.ok((Date.now() - start) > 25)
t.end()
})
r.push(1)
r.push(null)
})

tape('async destroy option', function (t) {
const r = new Readable({
destroy () {
return new Promise(resolve => setTimeout(resolve, 30))
}
})
r.push(null)
const start = Date.now()
r.on('close', () => {
t.ok((Date.now() - start) > 25)
t.end()
})
})

tape('error when no promise is returned by async .read template', function (t) {
t.plan(1)
const r = new Readable({
read () {}
})
r.on('error', error => {
t.ok(error instanceof Error)
t.end()
})
r.read()
})
Loading