Skip to content

Commit

Permalink
feat: merging functionality from minipass-json-stream
Browse files Browse the repository at this point in the history
  • Loading branch information
wraithgar authored and shmam committed Jun 12, 2024
1 parent 9a3e7e8 commit 29712af
Show file tree
Hide file tree
Showing 5 changed files with 519 additions and 47 deletions.
2 changes: 1 addition & 1 deletion lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const { HttpErrorAuthOTP } = require('./errors.js')
const checkResponse = require('./check-response.js')
const getAuth = require('./auth.js')
const fetch = require('make-fetch-happen')
const JSONStream = require('minipass-json-stream')
const JSONStream = require('./json-stream')
const npa = require('npm-package-arg')
const qs = require('querystring')
const url = require('url')
Expand Down
223 changes: 223 additions & 0 deletions lib/json-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
const Parser = require('jsonparse')
const { Minipass } = require('minipass')

class JSONStreamError extends Error {
constructor (err, caller) {
super(err.message)
Error.captureStackTrace(this, caller || this.constructor)
}

get name () {
return 'JSONStreamError'
}
}

const check = (x, y) =>
typeof x === 'string' ? String(y) === x
: x && typeof x.test === 'function' ? x.test(y)
: typeof x === 'boolean' || typeof x === 'object' ? x
: typeof x === 'function' ? x(y)
: false

class JSONStream extends Minipass {
#count = 0
#ending = false
#footer = null
#header = null
#map = null
#onTokenOriginal
#parser
#path = null
#root = null

constructor (opts) {
super({
...opts,
objectMode: true,
})

const parser = this.#parser = new Parser()
parser.onValue = value => this.#onValue(value)
this.#onTokenOriginal = parser.onToken
parser.onToken = (token, value) => this.#onToken(token, value)
parser.onError = er => this.#onError(er)

this.#path = typeof opts.path === 'string'
? opts.path.split('.').map(e =>
e === '$*' ? { emitKey: true }
: e === '*' ? true
: e === '' ? { recurse: true }
: e)
: Array.isArray(opts.path) && opts.path.length ? opts.path
: null

if (typeof opts.map === 'function') {
this.#map = opts.map
}
}

#setHeaderFooter (key, value) {
// header has not been emitted yet
if (this.#header !== false) {
this.#header = this.#header || {}
this.#header[key] = value
}

// footer has not been emitted yet but header has
if (this.#footer !== false && this.#header === false) {
this.#footer = this.#footer || {}
this.#footer[key] = value
}
}

#onError (er) {
// error will always happen during a write() call.
const caller = this.#ending ? this.end : this.write
this.#ending = false
return this.emit('error', new JSONStreamError(er, caller))
}

#onToken (token, value) {
const parser = this.#parser
this.#onTokenOriginal.call(this.#parser, token, value)
if (parser.stack.length === 0) {
if (this.#root) {
const root = this.#root
if (!this.#path) {
super.write(root)
}
this.#root = null
this.#count = 0
}
}
}

#onValue (value) {
const parser = this.#parser
// the LAST onValue encountered is the root object.
// just overwrite it each time.
this.#root = value

if (!this.#path) {
return
}

let i = 0 // iterates on path
let j = 0 // iterates on stack
let emitKey = false
while (i < this.#path.length) {
const key = this.#path[i]
j++

if (key && !key.recurse) {
const c = (j === parser.stack.length) ? parser : parser.stack[j]
if (!c) {
return
}
if (!check(key, c.key)) {
this.#setHeaderFooter(c.key, value)
return
}
emitKey = !!key.emitKey
i++
} else {
i++
if (i >= this.#path.length) {
return
}
const nextKey = this.#path[i]
if (!nextKey) {
return
}
while (true) {
const c = (j === parser.stack.length) ? parser : parser.stack[j]
if (!c) {
return
}
if (check(nextKey, c.key)) {
i++
if (!Object.isFrozen(parser.stack[j])) {
parser.stack[j].value = null
}
break
} else {
this.#setHeaderFooter(c.key, value)
}
j++
}
}
}

// emit header
if (this.#header) {
const header = this.#header
this.#header = false
this.emit('header', header)
}
if (j !== parser.stack.length) {
return
}

this.#count++
const actualPath = parser.stack.slice(1)
.map(e => e.key).concat([parser.key])
if (value !== null && value !== undefined) {
const data = this.#map ? this.#map(value, actualPath) : value
if (data !== null && data !== undefined) {
const emit = emitKey ? { value: data } : data
if (emitKey) {
emit.key = parser.key
}
super.write(emit)
}
}

if (parser.value) {
delete parser.value[parser.key]
}

for (const k of parser.stack) {
k.value = null
}
}

write (chunk, encoding) {
if (typeof chunk === 'string') {
chunk = Buffer.from(chunk, encoding)
} else if (!Buffer.isBuffer(chunk)) {
return this.emit('error', new TypeError(
'Can only parse JSON from string or buffer input'))
}
this.#parser.write(chunk)
return this.flowing
}

end (chunk, encoding) {
this.#ending = true
if (chunk) {
this.write(chunk, encoding)
}

const h = this.#header
this.#header = null
const f = this.#footer
this.#footer = null
if (h) {
this.emit('header', h)
}
if (f) {
this.emit('footer', f)
}
return super.end()
}

static get JSONStreamError () {
return JSONStreamError
}

static parse (path, map) {
return new JSONStream({ path, map })
}
}

module.exports = JSONStream
47 changes: 1 addition & 46 deletions test/index.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
'use strict'
const t = require('tap')

const { Minipass } = require('minipass')
const ssri = require('ssri')
const t = require('tap')
const zlib = require('zlib')
const defaultOpts = require('../lib/default-opts.js')
const tnock = require('./util/tnock.js')
Expand Down Expand Up @@ -273,50 +272,6 @@ t.test('query string with ?write=true', t => {
.then(res => t.strictSame(res, { write: 'go for it' }))
})

t.test('fetch.json.stream()', async t => {
tnock(t, defaultOpts.registry).get('/hello').reply(200, {
a: 1,
b: 2,
c: 3,
})
const data = await fetch.json.stream('/hello', '$*', OPTS).collect()
t.same(data, [
{ key: 'a', value: 1 },
{ key: 'b', value: 2 },
{ key: 'c', value: 3 },
], 'got a streamed JSON body')
})

t.test('fetch.json.stream opts.mapJSON', async t => {
tnock(t, defaultOpts.registry).get('/hello').reply(200, {
a: 1,
b: 2,
c: 3,
})
const data = await fetch.json.stream('/hello', '*', {
...OPTS,
mapJSON (value, [key]) {
return [key, value]
},
}).collect()
t.same(data, [
['a', 1],
['b', 2],
['c', 3],
], 'data mapped')
})

t.test('fetch.json.stream gets fetch error on stream', async t => {
await t.rejects(fetch.json.stream('/hello', '*', {
...OPTS,
body: Promise.reject(new Error('no body for you')),
method: 'POST',
gzip: true, // make sure we don't gzip the promise, lol!
}).collect(), {
message: 'no body for you',
})
})

t.test('opts.ignoreBody', async t => {
tnock(t, defaultOpts.registry)
.get('/hello')
Expand Down
46 changes: 46 additions & 0 deletions test/json-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
const t = require('tap')
const { JSONStreamError, parse } = require('../lib/json-stream.js')

t.test('JSONStream', (t) => {
t.test('JSONStreamError constructor', (t) => {
const error = new JSONStreamError(new Error('error'))
t.equal(error.message, 'error')
t.equal(error.name, 'JSONStreamError')
t.end()
})

t.test('JSONStream.write', (t) => {
t.test('JSONStream write error from numerical (not string not buffer)', async (t) => {
const stream = parse('*', {})
try {
stream.write(5)
} catch (error) {
t.equal(error.message, 'Can only parse JSON from string or buffer input')
t.equal(error.name, 'TypeError')
}
t.end()
})

t.end()
})

t.test('JSONStream.end', (t) => {
t.test(
'JSONStream end invalid chunk throws JSONStreamError from parser',
(t) => {
const stream = parse('*', {})
try {
stream.end('not a valid chunk')
} catch (error) {
t.equal(error.name, 'JSONStreamError')
t.equal(error.message, 'Unexpected "o" at position 1 in state STOP')
}
t.end()
}
)

t.end()
})

t.end()
})
Loading

0 comments on commit 29712af

Please sign in to comment.