diff --git a/lib/index.js b/lib/index.js index bce6e6b1..898c8125 100644 --- a/lib/index.js +++ b/lib/index.js @@ -4,7 +4,7 @@ const { HttpErrorAuthOTP } = require('./errors.js') const checkResponse = require('./check-response.js') const getAuth = require('./auth.js') const fetch = require('make-fetch-happen') -const JSONStream = require('minipass-json-stream') +const JSONStream = require('./json-stream') const npa = require('npm-package-arg') const qs = require('querystring') const url = require('url') diff --git a/lib/json-stream.js b/lib/json-stream.js new file mode 100644 index 00000000..36b05ad4 --- /dev/null +++ b/lib/json-stream.js @@ -0,0 +1,223 @@ +const Parser = require('jsonparse') +const { Minipass } = require('minipass') + +class JSONStreamError extends Error { + constructor (err, caller) { + super(err.message) + Error.captureStackTrace(this, caller || this.constructor) + } + + get name () { + return 'JSONStreamError' + } +} + +const check = (x, y) => + typeof x === 'string' ? String(y) === x + : x && typeof x.test === 'function' ? x.test(y) + : typeof x === 'boolean' || typeof x === 'object' ? x + : typeof x === 'function' ? x(y) + : false + +class JSONStream extends Minipass { + #count = 0 + #ending = false + #footer = null + #header = null + #map = null + #onTokenOriginal + #parser + #path = null + #root = null + + constructor (opts) { + super({ + ...opts, + objectMode: true, + }) + + const parser = this.#parser = new Parser() + parser.onValue = value => this.#onValue(value) + this.#onTokenOriginal = parser.onToken + parser.onToken = (token, value) => this.#onToken(token, value) + parser.onError = er => this.#onError(er) + + this.#path = typeof opts.path === 'string' + ? opts.path.split('.').map(e => + e === '$*' ? { emitKey: true } + : e === '*' ? true + : e === '' ? { recurse: true } + : e) + : Array.isArray(opts.path) && opts.path.length ? opts.path + : null + + if (typeof opts.map === 'function') { + this.#map = opts.map + } + } + + #setHeaderFooter (key, value) { + // header has not been emitted yet + if (this.#header !== false) { + this.#header = this.#header || {} + this.#header[key] = value + } + + // footer has not been emitted yet but header has + if (this.#footer !== false && this.#header === false) { + this.#footer = this.#footer || {} + this.#footer[key] = value + } + } + + #onError (er) { + // error will always happen during a write() call. + const caller = this.#ending ? this.end : this.write + this.#ending = false + return this.emit('error', new JSONStreamError(er, caller)) + } + + #onToken (token, value) { + const parser = this.#parser + this.#onTokenOriginal.call(this.#parser, token, value) + if (parser.stack.length === 0) { + if (this.#root) { + const root = this.#root + if (!this.#path) { + super.write(root) + } + this.#root = null + this.#count = 0 + } + } + } + + #onValue (value) { + const parser = this.#parser + // the LAST onValue encountered is the root object. + // just overwrite it each time. + this.#root = value + + if (!this.#path) { + return + } + + let i = 0 // iterates on path + let j = 0 // iterates on stack + let emitKey = false + while (i < this.#path.length) { + const key = this.#path[i] + j++ + + if (key && !key.recurse) { + const c = (j === parser.stack.length) ? parser : parser.stack[j] + if (!c) { + return + } + if (!check(key, c.key)) { + this.#setHeaderFooter(c.key, value) + return + } + emitKey = !!key.emitKey + i++ + } else { + i++ + if (i >= this.#path.length) { + return + } + const nextKey = this.#path[i] + if (!nextKey) { + return + } + while (true) { + const c = (j === parser.stack.length) ? parser : parser.stack[j] + if (!c) { + return + } + if (check(nextKey, c.key)) { + i++ + if (!Object.isFrozen(parser.stack[j])) { + parser.stack[j].value = null + } + break + } else { + this.#setHeaderFooter(c.key, value) + } + j++ + } + } + } + + // emit header + if (this.#header) { + const header = this.#header + this.#header = false + this.emit('header', header) + } + if (j !== parser.stack.length) { + return + } + + this.#count++ + const actualPath = parser.stack.slice(1) + .map(e => e.key).concat([parser.key]) + if (value !== null && value !== undefined) { + const data = this.#map ? this.#map(value, actualPath) : value + if (data !== null && data !== undefined) { + const emit = emitKey ? { value: data } : data + if (emitKey) { + emit.key = parser.key + } + super.write(emit) + } + } + + if (parser.value) { + delete parser.value[parser.key] + } + + for (const k of parser.stack) { + k.value = null + } + } + + write (chunk, encoding) { + if (typeof chunk === 'string') { + chunk = Buffer.from(chunk, encoding) + } else if (!Buffer.isBuffer(chunk)) { + return this.emit('error', new TypeError( + 'Can only parse JSON from string or buffer input')) + } + this.#parser.write(chunk) + return this.flowing + } + + end (chunk, encoding) { + this.#ending = true + if (chunk) { + this.write(chunk, encoding) + } + + const h = this.#header + this.#header = null + const f = this.#footer + this.#footer = null + if (h) { + this.emit('header', h) + } + if (f) { + this.emit('footer', f) + } + return super.end() + } + + static get JSONStreamError () { + return JSONStreamError + } + + static parse (path, map) { + return new JSONStream({ path, map }) + } +} + +module.exports = JSONStream diff --git a/test/index.js b/test/index.js index 4a165718..f989ea4a 100644 --- a/test/index.js +++ b/test/index.js @@ -1,8 +1,7 @@ -'use strict' +const t = require('tap') const { Minipass } = require('minipass') const ssri = require('ssri') -const t = require('tap') const zlib = require('zlib') const defaultOpts = require('../lib/default-opts.js') const tnock = require('./util/tnock.js') @@ -273,50 +272,6 @@ t.test('query string with ?write=true', t => { .then(res => t.strictSame(res, { write: 'go for it' })) }) -t.test('fetch.json.stream()', async t => { - tnock(t, defaultOpts.registry).get('/hello').reply(200, { - a: 1, - b: 2, - c: 3, - }) - const data = await fetch.json.stream('/hello', '$*', OPTS).collect() - t.same(data, [ - { key: 'a', value: 1 }, - { key: 'b', value: 2 }, - { key: 'c', value: 3 }, - ], 'got a streamed JSON body') -}) - -t.test('fetch.json.stream opts.mapJSON', async t => { - tnock(t, defaultOpts.registry).get('/hello').reply(200, { - a: 1, - b: 2, - c: 3, - }) - const data = await fetch.json.stream('/hello', '*', { - ...OPTS, - mapJSON (value, [key]) { - return [key, value] - }, - }).collect() - t.same(data, [ - ['a', 1], - ['b', 2], - ['c', 3], - ], 'data mapped') -}) - -t.test('fetch.json.stream gets fetch error on stream', async t => { - await t.rejects(fetch.json.stream('/hello', '*', { - ...OPTS, - body: Promise.reject(new Error('no body for you')), - method: 'POST', - gzip: true, // make sure we don't gzip the promise, lol! - }).collect(), { - message: 'no body for you', - }) -}) - t.test('opts.ignoreBody', async t => { tnock(t, defaultOpts.registry) .get('/hello') diff --git a/test/json-stream.js b/test/json-stream.js new file mode 100644 index 00000000..db617bf4 --- /dev/null +++ b/test/json-stream.js @@ -0,0 +1,46 @@ +const t = require('tap') +const { JSONStreamError, parse } = require('../lib/json-stream.js') + +t.test('JSONStream', (t) => { + t.test('JSONStreamError constructor', (t) => { + const error = new JSONStreamError(new Error('error')) + t.equal(error.message, 'error') + t.equal(error.name, 'JSONStreamError') + t.end() + }) + + t.test('JSONStream.write', (t) => { + t.test('JSONStream write error from numerical (not string not buffer)', async (t) => { + const stream = parse('*', {}) + try { + stream.write(5) + } catch (error) { + t.equal(error.message, 'Can only parse JSON from string or buffer input') + t.equal(error.name, 'TypeError') + } + t.end() + }) + + t.end() + }) + + t.test('JSONStream.end', (t) => { + t.test( + 'JSONStream end invalid chunk throws JSONStreamError from parser', + (t) => { + const stream = parse('*', {}) + try { + stream.end('not a valid chunk') + } catch (error) { + t.equal(error.name, 'JSONStreamError') + t.equal(error.message, 'Unexpected "o" at position 1 in state STOP') + } + t.end() + } + ) + + t.end() + }) + + t.end() +}) diff --git a/test/stream.js b/test/stream.js new file mode 100644 index 00000000..f0a8f501 --- /dev/null +++ b/test/stream.js @@ -0,0 +1,248 @@ +const t = require('tap') + +const tnock = require('./util/tnock.js') +const defaultOpts = require('../lib/default-opts.js') +defaultOpts.registry = 'https://mock.reg/' + +const fetch = require('..') + +const OPTS = { + timeout: 0, + retry: { + retries: 1, + factor: 1, + minTimeout: 1, + maxTimeout: 10, + }, +} + +t.test('json.stream', (t) => { + t.test('fetch.json.stream()', async (t) => { + tnock(t, defaultOpts.registry).get('/hello').reply(200, { + a: 1, + b: 2, + c: 3, + something: null, + }) + const data = await fetch.json.stream('/hello', '$*', OPTS).collect() + t.same( + data, + [ + { key: 'a', value: 1 }, + { key: 'b', value: 2 }, + { key: 'c', value: 3 }, + ], + 'got a streamed JSON body' + ) + }) + + t.test('fetch.json.stream opts.mapJSON', async (t) => { + tnock(t, defaultOpts.registry).get('/hello').reply(200, { + a: 1, + b: 2, + c: 3, + }) + const data = await fetch.json + .stream('/hello', '*', { + ...OPTS, + mapJSON (value, [key]) { + return [key, value] + }, + }) + .collect() + t.same( + data, + [ + ['a', 1], + ['b', 2], + ['c', 3], + ], + 'data mapped' + ) + }) + + t.test('fetch.json.stream opts.mapJSON that returns null', async (t) => { + tnock(t, defaultOpts.registry).get('/hello').reply(200, { + a: 1, + b: 2, + c: 3, + }) + const data = await fetch.json + .stream('/hello', '*', { + ...OPTS, + // eslint-disable-next-line no-unused-vars + mapJSON (_value, [_key]) { + return null + }, + }) + .collect() + t.same(data, []) + }) + + t.test('fetch.json.stream gets fetch error on stream', async (t) => { + await t.rejects( + fetch.json + .stream('/hello', '*', { + ...OPTS, + body: Promise.reject(new Error('no body for you')), + method: 'POST', + gzip: true, // make sure we don't gzip the promise, lol! + }) + .collect(), + { + message: 'no body for you', + } + ) + }) + + t.test('fetch.json.stream() sets header and footer', async (t) => { + tnock(t, defaultOpts.registry).get('/hello').reply(200, { + a: 1, + b: 2, + c: 3, + }) + const data = await fetch.json + .stream('/hello', 'something-random', OPTS) + .collect() + t.same(data, [], 'no data') + }) + + t.test('fetch.json.stream() with recursive JSON', async (t) => { + tnock(t, defaultOpts.registry) + .get('/hello') + .reply(200, { + dogs: [ + { + name: 'george', + owner: { + name: 'bob', + }, + }, + { + name: 'fred', + owner: { + name: 'alice', + }, + }, + { + name: 'jill', + owner: { + name: 'fred', + }, + }, + ], + }) + + const data = await fetch.json + .stream('/hello', 'dogs..name', OPTS) + .collect() + t.same(data, ['george', 'bob', 'fred', 'alice', 'jill', 'fred']) + }) + + t.test('fetch.json.stream() with undefined path', async (t) => { + tnock(t, defaultOpts.registry).get('/hello').reply(200, { + a: 1, + }) + const data = await fetch.json.stream('/hello', undefined, OPTS).collect() + t.same(data, [{ a: 1 }]) + }) + + t.test('fetch.json.stream() with empty path', async (t) => { + tnock(t, defaultOpts.registry).get('/hello').reply(200, { + a: 1, + }) + const data = await fetch.json.stream('/hello', '', OPTS).collect() + t.same(data, []) + }) + + t.test('fetch.json.stream() with path with function', async (t) => { + tnock(t, defaultOpts.registry).get('/hello').reply(200, { + a: { + b: { + c: 1, + }, + d: 2, + }, + }) + const data = await fetch.json + .stream('/hello', [ + (a) => a, + { + test: (a) => a, + }, + ]) + .collect() + t.same(data, [{ c: 1 }, 2]) + }) + + t.test('fetch.json.stream() with path array with number in path', async (t) => { + tnock(t, defaultOpts.registry).get('/hello').reply(200, { + a: 1, + }) + const data = await fetch.json.stream('/hello', [1], OPTS).collect() + t.same(data, []) + }) + + t.test( + 'fetch.json.stream() with path array with recursive and undefined value', + async (t) => { + tnock(t, defaultOpts.registry).get('/hello').reply(200, { + a: { + b: { + c: 1, + }, + d: 2, + }, + }) + const data = await fetch.json + .stream('/hello', ['a', '', undefined], OPTS) + .collect() + t.same(data, []) + } + ) + + t.test('fetch.json.stream() emitKey in path', async (t) => { + tnock(t, defaultOpts.registry).get('/hello').reply(200, { + a: { + b: 1, + }, + }) + const data = await fetch.json.stream('/hello', ['a', { emitKey: true }], OPTS).collect() + t.same(data, [{ key: 'b', value: 1 }]) + }) + + t.test('fetch.json.stream with recursive path followed by valid key', async (t) => { + tnock(t, defaultOpts.registry).get('/hello').reply(200, { + a: { + b: 1, + }, + }) + const data = await fetch.json.stream('/hello', ['', 'a'], OPTS).collect() + t.same(data, [{ b: 1 }]) + }) + + t.test('fetch.json.stream encounters malformed json', async (t) => { + tnock(t, defaultOpts.registry).get('/hello').reply(200, '{') + const data = await fetch.json.stream('/hello', '*', OPTS).collect() + + t.same(data, []) + }) + + t.test('fetch.json.stream encounters not json string data', async (t) => { + tnock(t, defaultOpts.registry).get('/hello').reply(200, 'not json') + + // catch rejected promise + t.rejects(fetch.json.stream('/hello', '*', OPTS).collect(), { + message: 'Unexpected "o" at position 1 in state STOP', + }) + }) + + t.test('fetch.json.stream encounters not json numerical data', async (t) => { + tnock(t, defaultOpts.registry).get('/hello').reply(200, 555) + + const data = await fetch.json.stream('/hello', '*', OPTS).collect() + t.same(data, []) + }) + + t.end() +})